Branch data Line data Source code
1 : : // Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC")
2 : : //
3 : : // Permission to use, copy, modify, and/or distribute this software for any
4 : : // purpose with or without fee is hereby granted, provided that the above
5 : : // copyright notice and this permission notice appear in all copies.
6 : : //
7 : : // THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
8 : : // REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
9 : : // AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
10 : : // INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
11 : : // LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
12 : : // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
13 : : // PERFORMANCE OF THIS SOFTWARE.
14 : :
15 : : #include <config.h>
16 : :
17 : : #include <unistd.h> // for some IPC/network system calls
18 : : #include <netinet/in.h>
19 : : #include <stdint.h>
20 : : #include <sys/socket.h>
21 : :
22 : : #include <boost/bind.hpp>
23 : : #include <boost/scoped_ptr.hpp>
24 : : #include <boost/date_time/posix_time/posix_time_types.hpp>
25 : :
26 : : #include <asio.hpp>
27 : : #include <asio/deadline_timer.hpp>
28 : :
29 : : #include <asiolink/io_address.h>
30 : : #include <asiolink/io_asio_socket.h>
31 : : #include <asiolink/io_endpoint.h>
32 : : #include <asiolink/io_service.h>
33 : : #include <asiolink/tcp_endpoint.h>
34 : : #include <asiolink/tcp_socket.h>
35 : : #include <asiolink/udp_endpoint.h>
36 : : #include <asiolink/udp_socket.h>
37 : :
38 : : #include <dns/messagerenderer.h>
39 : : #include <dns/opcode.h>
40 : : #include <dns/rcode.h>
41 : :
42 : : #include <asiodns/io_fetch.h>
43 : :
44 : : #include <util/buffer.h>
45 : : #include <util/random/qid_gen.h>
46 : :
47 : : #include <asiodns/logger.h>
48 : :
49 : : using namespace asio;
50 : : using namespace isc::asiolink;
51 : : using namespace isc::dns;
52 : : using namespace isc::util;
53 : : using namespace isc::util::random;
54 : : using namespace isc::log;
55 : : using namespace std;
56 : :
57 : : namespace isc {
58 : : namespace asiodns {
59 : :
60 : : // Log debug verbosity
61 : :
62 : : const int DBG_IMPORTANT = DBGLVL_TRACE_BASIC;
63 : : const int DBG_COMMON = DBGLVL_TRACE_DETAIL;
64 : : const int DBG_ALL = DBGLVL_TRACE_DETAIL + 20;
65 : :
66 : : /// \brief IOFetch Data
67 : : ///
68 : : /// The data for IOFetch is held in a separate struct pointed to by a shared_ptr
69 : : /// object. This is because the IOFetch object will be copied often (it is used
70 : : /// as a coroutine and passed as callback to many async_*() functions) and we
71 : : /// want keep the same data). Organising the data in this way keeps copying to
72 : : /// a minimum.
73 [ + - ]: 130 : struct IOFetchData {
74 : :
75 : : // The first two members are shared pointers to a base class because what is
76 : : // actually instantiated depends on whether the fetch is over UDP or TCP,
77 : : // which is not known until construction of the IOFetch. Use of a shared
78 : : // pointer here is merely to ensure deletion when the data object is deleted.
79 : : boost::scoped_ptr<IOAsioSocket<IOFetch> > socket;
80 : : ///< Socket to use for I/O
81 : : boost::scoped_ptr<IOEndpoint> remote_snd;///< Where the fetch is sent
82 : : boost::scoped_ptr<IOEndpoint> remote_rcv;///< Where the response came from
83 : : OutputBufferPtr msgbuf; ///< Wire buffer for question
84 : : OutputBufferPtr received; ///< Received data put here
85 : : IOFetch::Callback* callback; ///< Called on I/O Completion
86 : : asio::deadline_timer timer; ///< Timer to measure timeouts
87 : : IOFetch::Protocol protocol; ///< Protocol being used
88 : : size_t cumulative; ///< Cumulative received amount
89 : : size_t expected; ///< Expected amount of data
90 : : size_t offset; ///< Offset to receive data
91 : : bool stopped; ///< Have we stopped running?
92 : : int timeout; ///< Timeout in ms
93 : : bool packet; ///< true if packet was supplied
94 : :
95 : : // In case we need to log an error, the origin of the last asynchronous
96 : : // I/O is recorded. To save time and simplify the code, this is recorded
97 : : // as the ID of the error message that would be generated if the I/O failed.
98 : : // This means that we must make sure that all possible "origins" take the
99 : : // same arguments in their message in the same order.
100 : : isc::log::MessageID origin; ///< Origin of last asynchronous I/O
101 : : uint8_t staging[IOFetch::STAGING_LENGTH];
102 : : ///< Temporary array for received data
103 : : isc::dns::qid_t qid; ///< The QID set in the query
104 : :
105 : : /// \brief Constructor
106 : : ///
107 : : /// Just fills in the data members of the IOFetchData structure
108 : : ///
109 : : /// \param proto Either IOFetch::TCP or IOFetch::UDP.
110 : : /// \param service I/O Service object to handle the asynchronous
111 : : /// operations.
112 : : /// \param address IP address of upstream server
113 : : /// \param port Port to use for the query
114 : : /// \param buff Output buffer into which the response (in wire format)
115 : : /// is written (if a response is received).
116 : : /// \param cb Callback object containing the callback to be called
117 : : /// when we terminate. The caller is responsible for managing this
118 : : /// object and deleting it if necessary.
119 : : /// \param wait Timeout for the fetch (in ms).
120 : : ///
121 : : /// TODO: May need to alter constructor (see comment 4 in Trac ticket #554)
122 : 65 : IOFetchData(IOFetch::Protocol proto, IOService& service,
123 : : const IOAddress& address, uint16_t port, OutputBufferPtr& buff,
124 : : IOFetch::Callback* cb, int wait)
125 : : :
126 : : socket((proto == IOFetch::UDP) ?
127 : : static_cast<IOAsioSocket<IOFetch>*>(
128 [ + - ]: 37 : new UDPSocket<IOFetch>(service)) :
129 : : static_cast<IOAsioSocket<IOFetch>*>(
130 [ + - ]: 28 : new TCPSocket<IOFetch>(service))
131 : : ),
132 : : remote_snd((proto == IOFetch::UDP) ?
133 [ + - ]: 37 : static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
134 [ + - ]: 28 : static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
135 : : ),
136 : : remote_rcv((proto == IOFetch::UDP) ?
137 [ + - ]: 37 : static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
138 [ + - ]: 28 : static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
139 : : ),
140 [ + - ]: 65 : msgbuf(new OutputBuffer(512)),
141 : : received(buff),
142 : : callback(cb),
143 [ + - ]: 65 : timer(service.get_io_service()),
144 : : protocol(proto),
145 : : cumulative(0),
146 : : expected(0),
147 : : offset(0),
148 : : stopped(false),
149 : : timeout(wait),
150 : : packet(false),
151 : : origin(ASIODNS_UNKNOWN_ORIGIN),
152 : : staging(),
153 [ + + ][ + + ]: 532870 : qid(QidGenerator::getInstance().generateQid())
[ + - ][ + - ]
[ + + ][ + - ]
[ + - ][ + - ]
[ + - ][ + + ]
[ + - ][ + - ]
154 : 65 : {}
155 : :
156 : : // Checks if the response we received was ok;
157 : : // - data contains the buffer we read, as well as the address
158 : : // we sent to and the address we received from.
159 : : // length is provided by the operator() in IOFetch.
160 : : // Addresses must match, number of octets read must be at least
161 : : // 2, and the first two octets must match the qid of the message
162 : : // we sent.
163 : : bool responseOK() {
164 [ + - ]: 24 : return (*remote_snd == *remote_rcv && cumulative >= 2 &&
165 [ + - ][ + - ]: 24 : readUint16(received->getData()) == qid);
[ + + ]
166 : : }
167 : : };
168 : :
169 : : /// IOFetch Constructor - just initialize the private data
170 : :
171 : 60 : IOFetch::IOFetch(Protocol protocol, IOService& service,
172 : : const isc::dns::Question& question, const IOAddress& address,
173 : 120 : uint16_t port, OutputBufferPtr& buff, Callback* cb, int wait, bool edns)
174 : : {
175 [ + - ][ + - ]: 60 : MessagePtr query_msg(new Message(Message::RENDER));
[ + - ]
176 : : initIOFetch(query_msg, protocol, service, question, address, port, buff,
177 [ + - ]: 60 : cb, wait, edns);
178 : 60 : }
179 : :
180 : 0 : IOFetch::IOFetch(Protocol protocol, IOService& service,
181 : : OutputBufferPtr& outpkt, const IOAddress& address, uint16_t port,
182 : : OutputBufferPtr& buff, Callback* cb, int wait)
183 : : :
184 : : data_(new IOFetchData(protocol, service,
185 [ # # ][ # # ]: 0 : address, port, buff, cb, wait))
[ # # ]
186 : : {
187 : 0 : data_->msgbuf = outpkt;
188 : 0 : data_->packet = true;
189 : 0 : }
190 : :
191 : 5 : IOFetch::IOFetch(Protocol protocol, IOService& service,
192 : : ConstMessagePtr query_message, const IOAddress& address, uint16_t port,
193 : 10 : OutputBufferPtr& buff, Callback* cb, int wait)
194 : : {
195 [ + - ][ + - ]: 5 : MessagePtr msg(new Message(Message::RENDER));
[ + - ]
196 : :
197 : : msg->setHeaderFlag(Message::HEADERFLAG_RD,
198 [ + - ][ + - ]: 5 : query_message->getHeaderFlag(Message::HEADERFLAG_RD));
199 : : msg->setHeaderFlag(Message::HEADERFLAG_CD,
200 [ + - ][ + - ]: 5 : query_message->getHeaderFlag(Message::HEADERFLAG_CD));
201 : :
202 : : initIOFetch(msg, protocol, service,
203 [ + - ][ + - ]: 10 : **(query_message->beginQuestion()),
[ + - ]
204 [ + - ]: 5 : address, port, buff, cb, wait);
205 : 5 : }
206 : :
207 : : void
208 : 65 : IOFetch::initIOFetch(MessagePtr& query_msg, Protocol protocol,
209 : : IOService& service,
210 : : const isc::dns::Question& question,
211 : : const IOAddress& address, uint16_t port,
212 : : OutputBufferPtr& buff, Callback* cb, int wait, bool edns)
213 : : {
214 : : data_ = boost::shared_ptr<IOFetchData>(new IOFetchData(
215 [ + - ]: 65 : protocol, service, address, port, buff, cb, wait));
216 : :
217 : 65 : query_msg->setQid(data_->qid);
218 : 65 : query_msg->setOpcode(Opcode::QUERY());
219 : 65 : query_msg->setRcode(Rcode::NOERROR());
220 : 65 : query_msg->setHeaderFlag(Message::HEADERFLAG_RD);
221 : 65 : query_msg->addQuestion(question);
222 : :
223 [ + + ]: 65 : if (edns) {
224 [ + - ]: 64 : EDNSPtr edns_query(new EDNS());
225 : 128 : edns_query->setUDPSize(Message::DEFAULT_MAX_EDNS0_UDPSIZE);
226 [ + - ]: 64 : query_msg->setEDNS(edns_query);
227 : : }
228 : :
229 : 130 : MessageRenderer renderer;
230 [ + - ]: 65 : renderer.setBuffer(data_->msgbuf.get());
231 [ + - ]: 65 : query_msg->toWire(renderer);
232 [ + - ]: 65 : renderer.setBuffer(NULL);
233 : 65 : }
234 : :
235 : : // Return protocol in use.
236 : :
237 : : IOFetch::Protocol
238 : 2 : IOFetch::getProtocol() const {
239 : 2 : return (data_->protocol);
240 : : }
241 : :
242 : : /// The function operator is implemented with the "stackless coroutine"
243 : : /// pattern; see internal/coroutine.h for details.
244 : :
245 : : void
246 : 155 : IOFetch::operator()(asio::error_code ec, size_t length) {
247 : :
248 [ + + ]: 155 : if (data_->stopped) {
249 : : return;
250 [ + + ]: 153 : } else if (ec) {
251 : 1 : logIOFailure(ec);
252 : 1 : return;
253 : : }
254 : :
255 [ - + + + : 152 : CORO_REENTER (this) {
+ - ][ # # ]
256 : :
257 : : /// Generate the upstream query and render it to wire format
258 : : /// This is done in a different scope to allow inline variable
259 : : /// declarations.
260 : : {
261 [ - + ]: 36 : if (data_->packet) {
262 : : // A packet was given, overwrite the QID (which is in the
263 : : // first two bytes of the packet).
264 [ # # ]: 0 : data_->msgbuf->writeUint16At(data_->qid, 0);
265 : :
266 : : }
267 : : }
268 : :
269 : : // If we timeout, we stop, which will can cancel outstanding I/Os and
270 : : // shutdown everything.
271 [ + - ]: 36 : if (data_->timeout != -1) {
272 : 36 : data_->timer.expires_from_now(boost::posix_time::milliseconds(
273 : 36 : data_->timeout));
274 : 36 : data_->timer.async_wait(boost::bind(&IOFetch::stop, *this,
275 [ + - + - ]: 108 : TIME_OUT));
[ + - ]
276 : : }
277 : :
278 : : // Open a connection to the target system. For speed, if the operation
279 : : // is synchronous (i.e. UDP operation) we bypass the yield.
280 : 36 : data_->origin = ASIODNS_OPEN_SOCKET;
281 [ + - ][ + + ]: 36 : if (data_->socket->isOpenSynchronous()) {
282 [ + - ]: 16 : data_->socket->open(data_->remote_snd.get(), *this);
283 : : } else {
284 [ + - ]: 20 : CORO_YIELD data_->socket->open(data_->remote_snd.get(), *this);
[ - - - + ]
[ # # ][ - + ]
[ + - ]
285 : : }
286 : :
287 [ + + ]: 24 : do {
288 : : // Begin an asynchronous send, and then yield. When the send completes,
289 : : // we will resume immediately after this point.
290 : 36 : data_->origin = ASIODNS_SEND_DATA;
291 [ + - ]: 108 : CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(),
[ - - - + ]
[ # # ][ - + ]
292 [ + - ]: 72 : data_->msgbuf->getLength(), data_->remote_snd.get(), *this);
293 : :
294 : : // Now receive the response. Since TCP may not receive the entire
295 : : // message in one operation, we need to loop until we have received
296 : : // it. (This can't be done within the asyncReceive() method because
297 : : // each I/O operation will be done asynchronously and between each one
298 : : // we need to yield ... and we *really* don't want to set up another
299 : : // coroutine within that method.) So after each receive (and yield),
300 : : // we check if the operation is complete and if not, loop to read again.
301 : : //
302 : : // Another concession to TCP is that the amount of is contained in the
303 : : // first two bytes. This leads to two problems:
304 : : //
305 : : // a) We don't want those bytes in the return buffer.
306 : : // b) They may not both arrive in the first I/O.
307 : : //
308 : : // So... we need to loop until we have at least two bytes, then store
309 : : // the expected amount of data. Then we need to loop until we have
310 : : // received all the data before copying it back to the user's buffer.
311 : : // And we want to minimise the amount of copying...
312 : :
313 : 35 : data_->origin = ASIODNS_READ_DATA;
314 : 35 : data_->cumulative = 0; // No data yet received
315 : 35 : data_->offset = 0; // First data into start of buffer
316 : 35 : data_->received->clear(); // Clear the receive buffer
317 [ + + ]: 63 : do {
318 [ + - ]: 222 : CORO_YIELD data_->socket->asyncReceive(data_->staging,
[ - - - + ]
[ # # ][ - + ]
319 : : static_cast<size_t>(STAGING_LENGTH),
320 : 74 : data_->offset,
321 [ + - ]: 148 : data_->remote_rcv.get(), *this);
322 : 126 : } while (!data_->socket->processReceivedData(data_->staging, length,
323 : 63 : data_->cumulative, data_->offset,
324 [ + - ]: 126 : data_->expected, data_->received));
325 : 24 : } while (!data_->responseOK());
326 : :
327 : : // Finished with this socket, so close it. This will not generate an
328 : : // I/O error, but reset the origin to unknown in case we change this.
329 : 22 : data_->origin = ASIODNS_UNKNOWN_ORIGIN;
330 [ + - ]: 22 : data_->socket->close();
331 : :
332 : : /// We are done
333 [ + - ]: 22 : stop(SUCCESS);
334 : : }
335 : : }
336 : :
337 : : // Function that stops the coroutine sequence. It is called either when the
338 : : // query finishes or when the timer times out. Either way, it sets the
339 : : // "stopped_" flag and cancels anything that is in progress.
340 : : //
341 : : // As the function may be entered multiple times as things wind down, it checks
342 : : // if the stopped_ flag is already set. If it is, the call is a no-op.
343 : :
344 : : void
345 : 39 : IOFetch::stop(Result result) {
346 : :
347 [ + + ]: 39 : if (!data_->stopped) {
348 : :
349 : : // Mark the fetch as stopped to prevent other completion callbacks
350 : : // (invoked because of the calls to cancel()) from executing the
351 : : // cancel calls again.
352 : : //
353 : : // In a single threaded environment, the callbacks won't be invoked
354 : : // until this one completes. In a multi-threaded environment, they may
355 : : // well be, in which case the testing (and setting) of the stopped_
356 : : // variable should be done inside a mutex (and the stopped_ variable
357 : : // declared as "volatile").
358 : : //
359 : : // TODO: Update testing of stopped_ if threads are used.
360 : 34 : data_->stopped = true;
361 [ + + + - ]: 34 : switch (result) {
362 : : case TIME_OUT:
363 [ + - ][ + - ]: 16 : LOG_DEBUG(logger, DBG_COMMON, ASIODNS_READ_TIMEOUT).
364 [ + - ][ + - ]: 16 : arg(data_->remote_snd->getAddress().toText()).
365 [ + - ]: 16 : arg(data_->remote_snd->getPort());
366 : : break;
367 : :
368 : : case SUCCESS:
369 [ + - ][ + - ]: 44 : LOG_DEBUG(logger, DBG_ALL, ASIODNS_FETCH_COMPLETED).
370 [ + - ][ + - ]: 44 : arg(data_->remote_rcv->getAddress().toText()).
371 [ + - ]: 44 : arg(data_->remote_rcv->getPort());
372 : : break;
373 : :
374 : : case STOPPED:
375 : : // Fetch has been stopped for some other reason. This is
376 : : // allowed but as it is unusual it is logged, but with a lower
377 : : // debug level than a timeout (which is totally normal).
378 [ + - ][ + - ]: 8 : LOG_DEBUG(logger, DBG_IMPORTANT, ASIODNS_FETCH_STOPPED).
379 [ + - ][ + - ]: 8 : arg(data_->remote_snd->getAddress().toText()).
380 [ + - ]: 8 : arg(data_->remote_snd->getPort());
381 : : break;
382 : :
383 : : default:
384 [ # # ][ # # ]: 0 : LOG_ERROR(logger, ASIODNS_UNKNOWN_RESULT).
385 [ # # ][ # # ]: 0 : arg(data_->remote_snd->getAddress().toText()).
386 [ # # ]: 0 : arg(data_->remote_snd->getPort());
387 : : }
388 : :
389 : : // Stop requested, cancel and I/O's on the socket and shut it down,
390 : : // and cancel the timer.
391 : 34 : data_->socket->cancel();
392 : 34 : data_->socket->close();
393 : :
394 : 34 : data_->timer.cancel();
395 : :
396 : : // Execute the I/O completion callback (if present).
397 [ + - ]: 34 : if (data_->callback) {
398 : 34 : (*(data_->callback))(result);
399 : : }
400 : : }
401 : 39 : }
402 : :
403 : : // Log an error - called on I/O failure
404 : :
405 : 1 : void IOFetch::logIOFailure(asio::error_code ec) {
406 : :
407 : : // Should only get here with a known error code.
408 : 1 : assert((data_->origin == ASIODNS_OPEN_SOCKET) ||
409 : : (data_->origin == ASIODNS_SEND_DATA) ||
410 : : (data_->origin == ASIODNS_READ_DATA) ||
411 [ - + # # : 1 : (data_->origin == ASIODNS_UNKNOWN_ORIGIN));
# # # # ]
412 : :
413 : : static const char* PROTOCOL[2] = {"TCP", "UDP"};
414 [ + - ][ + - ]: 3 : LOG_ERROR(logger, data_->origin).arg(ec.value()).
[ + - ][ + - ]
415 [ + - ]: 1 : arg((data_->remote_snd->getProtocol() == IPPROTO_TCP) ?
416 [ - + ][ + - ]: 2 : PROTOCOL[0] : PROTOCOL[1]).
417 [ + - ]: 3 : arg(data_->remote_snd->getAddress().toText()).
418 [ + - ]: 2 : arg(data_->remote_snd->getPort());
419 : 1 : }
420 : :
421 : : } // namespace asiodns
422 [ + - ][ + - ]: 9 : } // namespace isc {
|