Branch data Line data Source code
1 : : // Copyright (C) 2009 Internet Systems Consortium, Inc. ("ISC")
2 : : //
3 : : // Permission to use, copy, modify, and/or distribute this software for any
4 : : // purpose with or without fee is hereby granted, provided that the above
5 : : // copyright notice and this permission notice appear in all copies.
6 : : //
7 : : // THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
8 : : // REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
9 : : // AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
10 : : // INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
11 : : // LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
12 : : // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
13 : : // PERFORMANCE OF THIS SOFTWARE.
14 : :
15 : : #include <config.h>
16 : : #include <cc/session_config.h>
17 : : #include <cc/logger.h>
18 : :
19 : : #include <stdint.h>
20 : :
21 : : // XXX: there seems to be a strange dependency between ASIO and std library
22 : : // definitions. On some platforms if we include std headers before ASIO
23 : : // headers unexpected behaviors will happen.
24 : : // A middle term solution is to generalize our local wrapper interface
25 : : // (currently only available for the auth server), where all such portability
26 : : // issues are hidden, and to have other modules use the wrapper.
27 : : #include <unistd.h> // for some IPC/network system calls
28 : : #include <asio.hpp>
29 : : #include <asio/error_code.hpp>
30 : : #include <asio/deadline_timer.hpp>
31 : : #include <asio/system_error.hpp>
32 : :
33 : : #include <cstdio>
34 : : #include <vector>
35 : : #include <iostream>
36 : : #include <sstream>
37 : :
38 : : #include <sys/un.h>
39 : :
40 : : #include <boost/bind.hpp>
41 : : #include <boost/optional.hpp>
42 : : #include <boost/function.hpp>
43 : : #include <boost/date_time/posix_time/posix_time_types.hpp>
44 : :
45 : : #include <exceptions/exceptions.h>
46 : :
47 : : #include <cc/data.h>
48 : : #include <cc/session.h>
49 : :
50 : : using namespace std;
51 : : using namespace isc::cc;
52 : : using namespace isc::data;
53 : :
54 : : // some of the asio names conflict with socket API system calls
55 : : // (e.g. write(2)) so we don't import the entire asio namespace.
56 : : using asio::io_service;
57 : :
58 : : namespace {
59 : : /// \brief Sets the given Optional 'result' to the given error code
60 : : /// Used as a callback for emulating sync reads with async calls
61 : : /// \param result Pointer to the optional to set
62 : : /// \param err The error code to set it to
63 : : void
64 : 67 : setResult(boost::optional<asio::error_code>* result,
65 : : const asio::error_code& err)
66 : : {
67 : 67 : result->reset(err);
68 : 67 : }
69 : : }
70 : :
71 : : namespace isc {
72 : : namespace cc {
73 : :
74 [ + - ]: 14 : class SessionImpl {
75 : : public:
76 : : SessionImpl(io_service& io_service) :
77 : : sequence_(-1), queue_(Element::createList()),
78 : : io_service_(io_service), socket_(io_service_), data_length_(0),
79 [ + - ]: 7 : timeout_(MSGQ_DEFAULT_TIMEOUT)
80 : : {}
81 : : void establish(const char& socket_file);
82 : : void disconnect();
83 : : void writeData(const void* data, size_t datalen);
84 : : size_t readDataLength();
85 : : // Blocking read. Will throw a SessionTimeout if the timeout value
86 : : // (in seconds) is thrown. If timeout is 0 it will block forever
87 : : void readData(void* data, size_t datalen);
88 : : void startRead(boost::function<void()> user_handler);
89 : 0 : void setTimeout(size_t seconds) { timeout_ = seconds; };
90 : 0 : size_t getTimeout() const { return timeout_; };
91 : :
92 : : long int sequence_; // the next sequence number to use
93 : : std::string lname_;
94 : : ElementPtr queue_;
95 : :
96 : : private:
97 : : void internalRead(const asio::error_code& error,
98 : : size_t bytes_transferred);
99 : :
100 : : private:
101 : : io_service& io_service_;
102 : : asio::local::stream_protocol::socket socket_;
103 : : uint32_t data_length_;
104 : : boost::function<void()> user_handler_;
105 : : asio::error_code error_;
106 : : size_t timeout_;
107 : :
108 : : // By default, unless changed or disabled, blocking reads on
109 : : // the msgq channel will time out after 4 seconds in this
110 : : // implementation.
111 : : // This number is chosen to be low enough so that whatever
112 : : // component is blocking does not seem to be hanging, but
113 : : // still gives enough time for other modules to respond if they
114 : : // are busy. If this choice turns out to be a bad one, we can
115 : : // change it later.
116 : : static const size_t MSGQ_DEFAULT_TIMEOUT = 4000;
117 : : };
118 : :
119 : : void
120 : 7 : SessionImpl::establish(const char& socket_file) {
121 : : try {
122 [ + - ][ + - ]: 7 : LOG_DEBUG(logger, DBG_TRACE_BASIC, CC_ESTABLISH).arg(&socket_file);
[ + - ][ + - ]
[ + - ]
123 : : socket_.connect(asio::local::stream_protocol::endpoint(&socket_file),
124 [ + - ]: 6 : error_);
125 [ + - ][ + - ]: 6 : LOG_DEBUG(logger, DBG_TRACE_BASIC, CC_ESTABLISHED);
[ + - ][ + - ]
126 [ - + ]: 2 : } catch(const asio::system_error& se) {
127 [ - + ][ + - ]: 1 : LOG_FATAL(logger, CC_CONN_ERROR).arg(se.what());
[ - + ][ - + ]
[ - + ]
128 [ - + ][ - + ]: 2 : isc_throw(SessionError, se.what());
[ - + ]
129 : : }
130 [ - + ]: 6 : if (error_) {
131 [ # # ][ # # ]: 0 : LOG_FATAL(logger, CC_NO_MSGQ).arg(error_.message());
[ # # ][ # # ]
132 [ # # ][ # # ]: 0 : isc_throw(SessionError, "Unable to connect to message queue: " <<
[ # # ][ # # ]
133 : : error_.message());
134 : : }
135 : 6 : }
136 : :
137 : : void
138 : 2 : SessionImpl::disconnect() {
139 [ + - ]: 2 : LOG_DEBUG(logger, DBG_TRACE_BASIC, CC_DISCONNECT);
140 : 2 : socket_.close();
141 : 2 : data_length_ = 0;
142 : 2 : }
143 : :
144 : : void
145 : 18 : SessionImpl::writeData(const void* data, size_t datalen) {
146 : : try {
147 : 18 : asio::write(socket_, asio::buffer(data, datalen));
148 [ # # ]: 0 : } catch (const asio::system_error& asio_ex) {
149 [ # # ][ # # ]: 0 : LOG_FATAL(logger, CC_WRITE_ERROR).arg(asio_ex.what());
[ # # ][ # # ]
[ # # ]
150 [ # # ][ # # ]: 0 : isc_throw(SessionError, "ASIO write failed: " << asio_ex.what());
[ # # ][ # # ]
151 : : }
152 : 18 : }
153 : :
154 : : size_t
155 : 15 : SessionImpl::readDataLength() {
156 : 15 : size_t ret_len = data_length_;
157 : :
158 [ + + ]: 15 : if (ret_len == 0) {
159 : 11 : readData(&data_length_, sizeof(data_length_));
160 [ - + ]: 8 : if (data_length_ == 0) {
161 [ # # ]: 0 : LOG_ERROR(logger, CC_LENGTH_NOT_READY);
162 [ # # ][ # # ]: 0 : isc_throw(SessionError, "ASIO read: data length is not ready");
163 : : }
164 [ - + ]: 8 : ret_len = ntohl(data_length_);
165 : : }
166 : :
167 : 12 : data_length_ = 0;
168 : 12 : return (ret_len);
169 : : }
170 : :
171 : : void
172 : 35 : SessionImpl::readData(void* data, size_t datalen) {
173 : : boost::optional<asio::error_code> read_result;
174 : : boost::optional<asio::error_code> timer_result;
175 : :
176 : : try {
177 : : asio::async_read(socket_, asio::buffer(data, datalen),
178 : 35 : boost::bind(&setResult, &read_result, _1));
179 [ + - ]: 35 : asio::deadline_timer timer(socket_.io_service());
180 : :
181 [ + + ]: 35 : if (getTimeout() != 0) {
182 : 32 : timer.expires_from_now(boost::posix_time::milliseconds(getTimeout()));
183 : : timer.async_wait(boost::bind(&setResult, &timer_result, _1));
184 : : }
185 : :
186 : : // wait until either we have read the data we want, the
187 : : // timer expires, or one of the two is triggered with an error.
188 : : // When one of them has a result, cancel the other, and wait
189 : : // until the cancel is processed before we continue
190 [ + + ][ - + ]: 81 : while (!read_result && !timer_result) {
[ + + ]
191 [ + - ]: 46 : socket_.io_service().run_one();
192 : :
193 : : // Don't cancel the timer if we haven't set it
194 [ + + ][ + + ]: 46 : if (read_result && getTimeout() != 0) {
[ + + ]
195 : : timer.cancel();
196 [ + + ]: 60 : while (!timer_result) {
197 [ + - ]: 30 : socket_.io_service().run_one();
198 : : }
199 [ + + ]: 16 : } else if (timer_result) {
200 : 2 : socket_.cancel();
201 [ + + ]: 48 : while (!read_result) {
202 [ + - ]: 2 : socket_.io_service().run_one();
203 : : }
204 : : }
205 : : }
206 : :
207 : : // asio::error_code evaluates to false if there was no error
208 [ + + ]: 35 : if (*read_result) {
209 [ + + ]: 3 : if (*read_result == asio::error::operation_aborted) {
210 [ + - ][ + - ]: 2 : LOG_ERROR(logger, CC_TIMEOUT);
[ + - ][ + - ]
211 [ + - ][ + - ]: 4 : isc_throw(SessionTimeout,
212 : : "Timeout while reading data from cc session");
213 : : } else {
214 [ + - ][ + - ]: 2 : LOG_ERROR(logger, CC_READ_ERROR).arg(read_result->message());
[ + - ][ + - ]
[ + - ][ + - ]
215 [ + - ][ + - ]: 3 : isc_throw(SessionError,
[ + - ][ + - ]
[ + - ]
216 : : "Error while reading data from cc session: " <<
217 : : read_result->message());
218 : : }
219 : : }
220 [ + - ]: 3 : } catch (const asio::system_error& asio_ex) {
221 : : // to hide ASIO specific exceptions, we catch them explicitly
222 : : // and convert it to SessionError.
223 [ # # ][ # # ]: 0 : LOG_FATAL(logger, CC_READ_EXCEPTION).arg(asio_ex.what());
[ # # ][ # # ]
[ # # ]
224 [ # # ][ # # ]: 3 : isc_throw(SessionError, "ASIO read failed: " << asio_ex.what());
[ # # ][ # # ]
225 : : }
226 : 32 : }
227 : :
228 : : void
229 : 4 : SessionImpl::startRead(boost::function<void()> user_handler) {
230 : 4 : data_length_ = 0;
231 : 4 : user_handler_ = user_handler;
232 : : asio::async_read(socket_, asio::buffer(&data_length_,
233 : 4 : sizeof(data_length_)),
234 : : boost::bind(&SessionImpl::internalRead, this,
235 : : asio::placeholders::error,
236 : 4 : asio::placeholders::bytes_transferred));
237 : 4 : }
238 : :
239 : : void
240 : 4 : SessionImpl::internalRead(const asio::error_code& error,
241 : : size_t bytes_transferred)
242 : : {
243 [ + - ]: 4 : if (!error) {
244 [ - + ]: 4 : assert(bytes_transferred == sizeof(data_length_));
245 [ - + ]: 4 : data_length_ = ntohl(data_length_);
246 [ - + ]: 4 : if (data_length_ == 0) {
247 [ # # ]: 0 : LOG_ERROR(logger, CC_ZERO_LENGTH);
248 [ # # ][ # # ]: 0 : isc_throw(SessionError, "Invalid message length (0)");
249 : : }
250 : 4 : user_handler_();
251 : : } else {
252 [ # # ][ # # ]: 0 : LOG_ERROR(logger, CC_ASYNC_READ_FAILED).arg(error.value());
253 [ # # ][ # # ]: 0 : isc_throw(SessionError, "asynchronous read failed");
254 : : }
255 : 3 : }
256 : :
257 : 7 : Session::Session(asio::io_service& io_service) :
258 [ + - ]: 7 : impl_(new SessionImpl(io_service))
259 : 7 : {}
260 : :
261 : 7 : Session::~Session() {
262 [ + - ]: 14 : delete impl_;
263 : 7 : }
264 : :
265 : : void
266 : 1 : Session::disconnect() {
267 : 1 : impl_->disconnect();
268 : 1 : }
269 : :
270 : : void
271 : 4 : Session::startRead(boost::function<void()> read_callback) {
272 [ + - ]: 4 : LOG_DEBUG(logger, DBG_TRACE_DETAILED, CC_START_READ);
273 [ + - ]: 4 : impl_->startRead(read_callback);
274 : 4 : }
275 : :
276 : : namespace { // maybe unnecessary.
277 : : // This is a helper class to make the establish() method (below) exception-safe
278 : : // with the RAII approach.
279 : : class SessionHolder {
280 : : public:
281 : 6 : SessionHolder(SessionImpl* obj) : impl_obj_(obj) {}
282 : : ~SessionHolder()
283 : : {
284 [ - + ][ + - ]: 6 : if (impl_obj_ != NULL) {
285 : 1 : impl_obj_->disconnect();
286 : : }
287 : : }
288 : 5 : void clear() { impl_obj_ = NULL; }
289 : : SessionImpl* impl_obj_;
290 : : };
291 : : }
292 : :
293 : : void
294 : 7 : Session::establish(const char* socket_file) {
295 [ - + ]: 7 : if (socket_file == NULL) {
296 : 0 : socket_file = getenv("BIND10_MSGQ_SOCKET_FILE");
297 : : }
298 [ - + ]: 7 : if (socket_file == NULL) {
299 : 0 : socket_file = BIND10_MSGQ_SOCKET_FILE;
300 : : }
301 : :
302 : 7 : impl_->establish(*socket_file);
303 : :
304 : : // once established, encapsulate the implementation object so that we
305 : : // can safely release the internal resource when exception happens
306 : : // below.
307 : 6 : SessionHolder session_holder(impl_);
308 : :
309 : : //
310 : : // send a request for our local name, and wait for a response
311 : : //
312 : : ConstElementPtr get_lname_msg =
313 [ + - ][ + - ]: 12 : Element::fromJSON("{ \"type\": \"getlname\" }");
314 [ + - ]: 6 : sendmsg(get_lname_msg);
315 : :
316 : : ConstElementPtr routing, msg;
317 [ + + ]: 6 : recvmsg(routing, msg, false);
318 : :
319 [ + - ][ + - ]: 15 : impl_->lname_ = msg->get("lname")->stringValue();
[ + - ]
320 : :
321 : : // At this point there's no risk of resource leak.
322 : : session_holder.clear();
323 : 5 : }
324 : :
325 : : //
326 : : // Convert to wire format and send this via the stream socket with its length
327 : : // prefix.
328 : : //
329 : : void
330 : 6 : Session::sendmsg(ConstElementPtr msg) {
331 : 12 : std::string header_wire = msg->toWire();
332 : 6 : unsigned int length = 2 + header_wire.length();
333 [ - + ]: 6 : unsigned int length_net = htonl(length);
334 : 6 : unsigned short header_length = header_wire.length();
335 [ - + ]: 6 : unsigned short header_length_net = htons(header_length);
336 : :
337 [ + - ]: 6 : impl_->writeData(&length_net, sizeof(length_net));
338 [ + - ]: 6 : impl_->writeData(&header_length_net, sizeof(header_length_net));
339 [ + - ]: 6 : impl_->writeData(header_wire.data(), header_length);
340 : 6 : }
341 : :
342 : : void
343 : 0 : Session::sendmsg(ConstElementPtr env, ConstElementPtr msg) {
344 : 0 : std::string header_wire = env->toWire();
345 [ # # ]: 0 : std::string body_wire = msg->toWire();
346 : 0 : unsigned int length = 2 + header_wire.length() + body_wire.length();
347 [ # # ]: 0 : unsigned int length_net = htonl(length);
348 : 0 : unsigned short header_length = header_wire.length();
349 [ # # ]: 0 : unsigned short header_length_net = htons(header_length);
350 : :
351 [ # # ]: 0 : impl_->writeData(&length_net, sizeof(length_net));
352 [ # # ]: 0 : impl_->writeData(&header_length_net, sizeof(header_length_net));
353 [ # # ]: 0 : impl_->writeData(header_wire.data(), header_length);
354 [ # # ]: 0 : impl_->writeData(body_wire.data(), body_wire.length());
355 : 0 : }
356 : :
357 : : bool
358 : 0 : Session::recvmsg(ConstElementPtr& msg, bool nonblock, int seq) {
359 : : ConstElementPtr l_env;
360 [ # # ]: 0 : return (recvmsg(l_env, msg, nonblock, seq));
361 : : }
362 : :
363 : : bool
364 : 15 : Session::recvmsg(ConstElementPtr& env, ConstElementPtr& msg,
365 : : bool nonblock, int seq)
366 : : {
367 : 15 : size_t length = impl_->readDataLength();
368 [ - + ]: 12 : if (hasQueuedMsgs()) {
369 : : ConstElementPtr q_el;
370 [ # # ][ # # ]: 0 : for (size_t i = 0; i < impl_->queue_->size(); i++) {
371 [ # # ][ # # ]: 0 : q_el = impl_->queue_->get(i);
372 [ # # ][ # # ]: 0 : if (( seq == -1 &&
[ # # ][ # # ]
[ # # ]
373 [ # # ][ # # ]: 0 : !q_el->get(0)->contains("reply")
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
374 : : ) || (
375 [ # # ][ # # ]: 0 : q_el->get(0)->contains("reply") &&
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
376 [ # # ][ # # ]: 0 : q_el->get(0)->get("reply")->intValue() == seq
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
377 : : )
378 : : ) {
379 [ # # ][ # # ]: 0 : env = q_el->get(0);
380 [ # # ][ # # ]: 0 : msg = q_el->get(1);
381 [ # # ]: 0 : impl_->queue_->remove(i);
382 : 0 : return (true);
383 : : }
384 : : }
385 : : }
386 : :
387 : : unsigned short header_length_net;
388 : 12 : impl_->readData(&header_length_net, sizeof(header_length_net));
389 : :
390 [ - + ]: 12 : unsigned short header_length = ntohs(header_length_net);
391 [ - + ]: 12 : if (header_length > length || length < 2) {
392 [ # # ][ # # ]: 0 : LOG_ERROR(logger, CC_INVALID_LENGTHS).arg(length).arg(header_length);
[ # # ]
393 [ # # ][ # # ]: 0 : isc_throw(SessionError, "Length parameters invalid: total=" << length
[ # # ]
394 : : << ", header=" << header_length);
395 : : }
396 : :
397 : : // remove the header-length bytes from the total length
398 : 12 : length -= 2;
399 : 12 : std::vector<char> buffer(length);
400 [ + - ]: 12 : impl_->readData(&buffer[0], length);
401 : :
402 [ + - ]: 24 : std::string header_wire = std::string(&buffer[0], header_length);
403 : 12 : std::string body_wire = std::string(&buffer[0] + header_length,
404 [ + - ]: 24 : length - header_length);
405 [ + - ][ + - ]: 24 : std::stringstream header_wire_stream;
406 [ + - ]: 12 : header_wire_stream << header_wire;
407 : : ConstElementPtr l_env =
408 [ + - ]: 12 : Element::fromWire(header_wire_stream, header_length);
409 : :
410 [ + - ][ + - ]: 24 : std::stringstream body_wire_stream;
411 [ + - ]: 12 : body_wire_stream << body_wire;
412 : : ConstElementPtr l_msg =
413 [ + - ]: 12 : Element::fromWire(body_wire_stream, length - header_length);
414 [ + - ][ - + ]: 36 : if ((seq == -1 &&
[ # # ][ # # ]
[ + - ]
415 [ + - ][ + - ]: 24 : !l_env->contains("reply")
[ + - ][ # # ]
416 : : ) || (
417 [ # # ][ # # ]: 12 : l_env->contains("reply") &&
[ - + ][ # # ]
418 [ # # ][ # # ]: 12 : l_env->get("reply")->intValue() == seq
[ # # ][ - + ]
[ - + ][ # # ]
[ # # ]
419 : : )
420 : : ) {
421 [ + - ]: 12 : env = l_env;
422 [ + - ]: 12 : msg = l_msg;
423 : : return (true);
424 : : } else {
425 [ # # ]: 0 : ElementPtr q_el = Element::createList();
426 [ # # ]: 0 : q_el->add(l_env);
427 [ # # ]: 0 : q_el->add(l_msg);
428 [ # # ]: 0 : impl_->queue_->add(q_el);
429 [ # # ]: 0 : return (recvmsg(env, msg, nonblock, seq));
430 : : }
431 : : // XXXMLG handle non-block here, and return false for short reads
432 : : }
433 : :
434 : : void
435 : 0 : Session::subscribe(std::string group, std::string instance) {
436 [ # # ][ # # ]: 0 : LOG_DEBUG(logger, DBG_TRACE_DETAILED, CC_SUBSCRIBE).arg(group);
437 : 0 : ElementPtr env = Element::createMap();
438 : :
439 [ # # ][ # # ]: 0 : env->set("type", Element::create("subscribe"));
[ # # ]
440 [ # # ][ # # ]: 0 : env->set("group", Element::create(group));
[ # # ]
441 [ # # ][ # # ]: 0 : env->set("instance", Element::create(instance));
[ # # ]
442 : :
443 [ # # ]: 0 : sendmsg(env);
444 : 0 : }
445 : :
446 : : void
447 : 0 : Session::unsubscribe(std::string group, std::string instance) {
448 [ # # ][ # # ]: 0 : LOG_DEBUG(logger, DBG_TRACE_DETAILED, CC_UNSUBSCRIBE).arg(group);
449 : 0 : ElementPtr env = Element::createMap();
450 : :
451 [ # # ][ # # ]: 0 : env->set("type", Element::create("unsubscribe"));
[ # # ]
452 [ # # ][ # # ]: 0 : env->set("group", Element::create(group));
[ # # ]
453 [ # # ][ # # ]: 0 : env->set("instance", Element::create(instance));
[ # # ]
454 : :
455 [ # # ]: 0 : sendmsg(env);
456 : 0 : }
457 : :
458 : : int
459 : 0 : Session::group_sendmsg(ConstElementPtr msg, std::string group,
460 : : std::string instance, std::string to)
461 : : {
462 [ # # ][ # # ]: 0 : LOG_DEBUG(logger, DBG_TRACE_DETAILED, CC_GROUP_SEND).arg(msg->str()).
[ # # ][ # # ]
463 [ # # ]: 0 : arg(group);
464 : 0 : ElementPtr env = Element::createMap();
465 : 0 : long int nseq = ++impl_->sequence_;
466 : :
467 [ # # ][ # # ]: 0 : env->set("type", Element::create("send"));
[ # # ]
468 [ # # ][ # # ]: 0 : env->set("from", Element::create(impl_->lname_));
[ # # ]
469 [ # # ][ # # ]: 0 : env->set("to", Element::create(to));
[ # # ]
470 [ # # ][ # # ]: 0 : env->set("group", Element::create(group));
[ # # ]
471 [ # # ][ # # ]: 0 : env->set("instance", Element::create(instance));
[ # # ]
472 [ # # ][ # # ]: 0 : env->set("seq", Element::create(nseq));
[ # # ]
473 : : //env->set("msg", Element::create(msg->toWire()));
474 : :
475 [ # # ]: 0 : sendmsg(env, msg);
476 : 0 : return (nseq);
477 : : }
478 : :
479 : : bool
480 : 9 : Session::group_recvmsg(ConstElementPtr& envelope, ConstElementPtr& msg,
481 : : bool nonblock, int seq)
482 : : {
483 [ + - ]: 9 : LOG_DEBUG(logger, DBG_TRACE_DETAILED, CC_GROUP_RECEIVE);
484 : 9 : bool result(recvmsg(envelope, msg, nonblock, seq));
485 [ + - ]: 7 : if (result) {
486 [ + - ][ + - ]: 14 : LOG_DEBUG(logger, DBG_TRACE_DETAILED, CC_GROUP_RECEIVED).
487 [ + - ][ + - ]: 14 : arg(envelope->str()).arg(msg->str());
[ + - ][ + - ]
488 : : } else {
489 [ # # ]: 0 : LOG_DEBUG(logger, DBG_TRACE_DETAILED, CC_NO_MESSAGE);
490 : : }
491 : 7 : return (result);
492 : : }
493 : :
494 : : int
495 : 0 : Session::reply(ConstElementPtr envelope, ConstElementPtr newmsg) {
496 [ # # # # ]: 0 : LOG_DEBUG(logger, DBG_TRACE_DETAILED, CC_REPLY).arg(envelope->str()).
[ # # ][ # # ]
[ # # ]
497 [ # # ]: 0 : arg(newmsg->str());
498 : 0 : ElementPtr env = Element::createMap();
499 : 0 : long int nseq = ++impl_->sequence_;
500 : :
501 [ # # ][ # # ]: 0 : env->set("type", Element::create("send"));
[ # # ]
502 [ # # ][ # # ]: 0 : env->set("from", Element::create(impl_->lname_));
[ # # ]
503 [ # # ][ # # ]: 0 : env->set("to", Element::create(envelope->get("from")->stringValue()));
[ # # ][ # # ]
[ # # ][ # # ]
504 [ # # ][ # # ]: 0 : env->set("group", Element::create(envelope->get("group")->stringValue()));
[ # # ][ # # ]
[ # # ][ # # ]
505 [ # # ][ # # ]: 0 : env->set("instance", Element::create(envelope->get("instance")->stringValue()));
[ # # ][ # # ]
[ # # ][ # # ]
506 [ # # ][ # # ]: 0 : env->set("seq", Element::create(nseq));
[ # # ]
507 [ # # ][ # # ]: 0 : env->set("reply", Element::create(envelope->get("seq")->intValue()));
[ # # ][ # # ]
[ # # ][ # # ]
508 : :
509 [ # # ]: 0 : sendmsg(env, newmsg);
510 : :
511 : 0 : return (nseq);
512 : : }
513 : :
514 : : bool
515 : 12 : Session::hasQueuedMsgs() const {
516 : 12 : return (impl_->queue_->size() > 0);
517 : : }
518 : :
519 : : void
520 : 3 : Session::setTimeout(size_t milliseconds) {
521 [ + - ][ + - ]: 3 : LOG_DEBUG(logger, DBG_TRACE_DETAILED, CC_SET_TIMEOUT).arg(milliseconds);
522 : 3 : impl_->setTimeout(milliseconds);
523 : 3 : }
524 : :
525 : : size_t
526 : 2 : Session::getTimeout() const {
527 : 2 : return (impl_->getTimeout());
528 : : }
529 : : }
530 [ + - ][ + - ]: 396 : }
|