Coverage for src/bin/msgq/msgq : 71%
        
        
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
| 
 #!/usr/bin/python3 
 # Copyright (C) 2010 Internet Systems Consortium. # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 
 
 """This code implements the msgq daemon.""" 
 
 
 
 # This is the version that gets displayed to the user. # The VERSION string consists of the module name, the module version # number, and the overall BIND 10 version number (set in configure.ac). 
 
 
 """Add a subscription.""" else: 
 """Remove the socket from the one specific subscription.""" 
 """Remove the socket from all subscriptions.""" 
 """Return an array of sockets which want this specific group, instance.""" else: 
 """Return an array of sockets who should get something sent to this group, instance pair. This includes wildcard subscriptions.""" 
 """Message Queue class.""" # did we find a better way to do this? "bind10-devel", "msgq_socket").replace("${prefix}", "/home/jelte/opt/bind10") 
 """Initialize the MsgQ master. 
 The socket_file specifies the path to the UNIX domain socket that the msgq process listens on. If it is None, the environment variable BIND10_MSGQ_SOCKET_FILE is used. If that is not set, it will default to ${prefix}/var/bind10-devel/msg_socket. If verbose is True, then the MsgQ reports what it is doing. """ 
 else: else: 
 
 """Set up the poll thing. Internal function.""" except AttributeError: self.kqueue = select.kqueue() 
 """Add a kquque filter for a socket. By default the read filter is used; if write_filter is set to True, the write filter is used. We use a boolean value instead of a specific filter constant, because kqueue filter values do not seem to be defined on some systems. The use of boolean makes the interface restrictive because there are other filters, but this method is mostly only for our internal use, so it should be acceptable at least for now.""" filter_type = select.KQ_FILTER_WRITE if write_filter else \ select.KQ_FILTER_READ event = select.kevent(socket.fileno(), filter_type, select.KQ_EV_ADD | select.KQ_EV_ENABLE) self.kqueue.control([event], 0) 
 """Delete a kqueue filter for socket. See add_kqueue_socket() for the semantics and notes about write_filter.""" filter_type = select.KQ_FILTER_WRITE if write_filter else \ select.KQ_FILTER_READ event = select.kevent(socket.fileno(), filter_type, select.KQ_EV_DELETE) self.kqueue.control([event], 0) 
 """Set up the listener socket. Internal function.""" sys.stdout.write("[b10-msgq] Setting up socket at %s\n" % self.socket_file) 
 
 os.remove(self.socket_file) # remove the file again if something goes wrong # (note this is a catch-all, but we reraise it) os.remove(self.socket_file) 
 else: self.add_kqueue_socket(self.listen_socket) 
 """Configure listener socket, polling, etc. Raises a socket.error if the socket_file cannot be created. """ 
 
 sys.stdout.write("[b10-msgq] Listening\n") 
 
 """Process an accept on the listening socket.""" # TODO: When we have logging, we might want # to add a debug message here that a new connection # was made 
 """ Internal function to insert a socket. Used by process_accept and some tests. """ 
 else: self.add_kqueue_socket(newsocket) 
 """Process a read on a socket.""" sys.stderr.write("[b10-msgq] Got read on Strange Socket fd %d\n" % fd) return # sys.stderr.write("[b10-msgq] Got read on fd %d\n" %fd) 
 """Fully close down the socket.""" del self.sendbuffs[fd] 
 """Get exactly the requested bytes, or raise an exception if EOF.""" 
 """Read a correctly formatted packet. Will raise exceptions if something fails.""" raise MsgQReceiveError("overall_length < 2") raise MsgQReceiveError("routing_length > overall_length") raise MsgQReceiveError("routing_length == 0") # probably need to sanity check lengths here... else: 
 """Process one packet.""" 
 except DecodeError as err: self.kill_socket(fd, sock) sys.stderr.write("[b10-msgq] Routing decode error: %s\n" % err) return 
 
 """Process a single command. This will split out into one of the other functions.""" # TODO: A print statement got removed here (one that prints the # routing envelope). When we have logging with multiple levels, # we might want to re-add that on a high debug verbosity. elif cmd == 'ping': # Command for testing purposes self.process_command_ping(sock, routing, data) else: sys.stderr.write("[b10-msgq] Invalid command: %s\n" % cmd) 
 
 
 # We set the socket nonblocking, MSG_DONTWAIT doesn't exist # on some OSes except socket.error as e: if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK: return 0 else: raise e finally: # And set it back again 
 # Try to send the data, but only if there's nothing waiting amount_sent = 0 else: except socket.error as sockerr: # in the case the other side seems gone, kill the socket # and drop the send action if sockerr.errno == errno.EPIPE: print("[b10-msgq] SIGPIPE on send, dropping message " + "and closing connection") self.kill_socket(fileno, sock) return else: raise 
 # Still something to send now = time.clock() # Append it to buffer (but check the data go away) if fileno in self.sendbuffs: (last_sent, buff) = self.sendbuffs[fileno] if now - last_sent > 0.1: self.kill_socket(fileno, sock) return buff += msg else: buff = msg[amount_sent:] last_sent = now if self.poller: self.poller.register(fileno, select.POLLIN | select.POLLOUT) else: self.add_kqueue_socket(sock, True) self.sendbuffs[fileno] = (last_sent, buff) 
 # Try to send some data from the buffer (_, msg) = self.sendbuffs[fileno] sock = self.sockets[fileno] amount_sent = self.__send_data(sock, msg) # Keep the rest msg = msg[amount_sent:] if len(msg) == 0: # If there's no more, stop requesting for write availability if self.poller: self.poller.register(fileno, select.POLLIN) else: self.delete_kqueue_socket(sock, True) del self.sendbuffs[fileno] else: self.sendbuffs[fileno] = (time.clock(), msg) 
 """Generate a unique connection identifier for this socket. This is done by using an increasing counter and the current time.""" 
 self.sendmsg(sock, { "type" : "pong" }, data) 
 
 return # ignore invalid packets entirely 
 else: else: return # recipient doesn't exist 
 
 sockets.remove(sock) 
 return # ignore invalid packets entirely 
 return # ignore invalid packets entirely 
 """Process messages. Forever. Mostly.""" 
 self.run_poller() else: self.run_kqueue() 
 except select.error as err: if err.args[0] == errno.EINTR: events = [] else: sys.stderr.write("[b10-msgq] Error with poll(): %s\n" % err) break else: self.__process_write(fd) 
 while True: events = self.kqueue.control(None, 10) if not events: raise RuntimeError('serve: kqueue returned no events') 
 for event in events: if event.ident == self.listen_socket.fileno(): self.process_accept() else: if event.filter == select.KQ_FILTER_WRITE: self.__process_write(event.ident) if event.filter == select.KQ_FILTER_READ and \ event.data > 0: self.process_socket(event.ident) elif event.flags & select.KQ_EV_EOF: self.kill_socket(event.ident, self.sockets[event.ident]) 
 """Stop the MsgQ master.""" sys.stdout.write("[b10-msgq] Stopping the server.\n") 
 # can signal handling and calling a destructor be done without a # global variable? 
 if msgq: msgq.shutdown() sys.exit(0) 
 def check_port(option, opt_str, value, parser): """Function to insure that the port we are passed is actually a valid port number. Used by OptionParser() on startup.""" intval = int(value) if (intval < 0) or (intval > 65535): raise OptionValueError("%s requires a port number (0-65535)" % opt_str) parser.values.msgq_port = intval 
 # Parse any command-line options. parser = OptionParser(version=VERSION) parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="display more about what is going on") parser.add_option("-s", "--socket-file", dest="msgq_socket_file", type="string", default=None, help="UNIX domain socket file the msgq daemon will use") (options, args) = parser.parse_args() 
 signal.signal(signal.SIGTERM, signal_handler) 
 # Announce startup. if options.verbose: sys.stdout.write("[b10-msgq] %s\n" % VERSION) 
 msgq = MsgQ(options.msgq_socket_file, options.verbose) 
 setup_result = msgq.setup() if setup_result: sys.stderr.write("[b10-msgq] Error on startup: %s\n" % setup_result) sys.exit(1) 
 try: msgq.run() except KeyboardInterrupt: pass 
 msgq.shutdown()  |