Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451

452

453

454

455

456

457

458

459

460

461

462

463

464

465

466

467

468

469

470

471

472

473

474

475

476

477

478

479

480

481

482

483

484

485

486

487

488

489

490

491

492

493

494

495

496

497

498

499

500

501

502

503

504

505

506

507

508

509

510

511

512

513

514

515

516

517

518

519

520

521

522

523

524

525

526

527

528

529

530

531

532

533

534

535

536

537

538

539

540

541

542

543

544

545

546

547

548

549

550

551

552

553

554

555

#!/usr/bin/python3 

 

# Copyright (C) 2010  Internet Systems Consortium. 

# 

# Permission to use, copy, modify, and distribute this software for any 

# purpose with or without fee is hereby granted, provided that the above 

# copyright notice and this permission notice appear in all copies. 

# 

# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM 

# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL 

# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL 

# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, 

# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING 

# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 

# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION 

# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 

 

import sys; sys.path.append ('@@PYTHONPATH@@') 

 

"""This code implements the msgq daemon.""" 

 

import subprocess 

import signal 

import os 

import socket 

import sys 

import struct 

import errno 

import time 

import select 

import random 

from optparse import OptionParser, OptionValueError 

import isc.util.process 

 

import isc.cc 

 

isc.util.process.rename() 

 

# This is the version that gets displayed to the user. 

# The VERSION string consists of the module name, the module version 

# number, and the overall BIND 10 version number (set in configure.ac). 

VERSION = "b10-msgq 20110127 (BIND 10 20120405)" 

 

class MsgQReceiveError(Exception): pass 

 

class SubscriptionManager: 

    def __init__(self): 

        self.subscriptions = {} 

 

    def subscribe(self, group, instance, socket): 

        """Add a subscription.""" 

        target = ( group, instance ) 

        if target in self.subscriptions: 

            print("[b10-msgq] Appending to existing target") 

exit            if socket not in self.subscriptions[target]: 

                self.subscriptions[target].append(socket) 

        else: 

            print("[b10-msgq] Creating new target") 

            self.subscriptions[target] = [ socket ] 

 

    def unsubscribe(self, group, instance, socket): 

        """Remove the socket from the one specific subscription.""" 

        target = ( group, instance ) 

exit        if target in self.subscriptions: 

            if socket in self.subscriptions[target]: 

                self.subscriptions[target].remove(socket) 

 

    def unsubscribe_all(self, socket): 

        """Remove the socket from all subscriptions.""" 

        for socklist in self.subscriptions.values(): 

            if socket in socklist: 

                socklist.remove(socket) 

 

    def find_sub(self, group, instance): 

        """Return an array of sockets which want this specific group, 

        instance.""" 

        target = (group, instance) 

        if target in self.subscriptions: 

            return self.subscriptions[target] 

        else: 

            return [] 

 

    def find(self, group, instance): 

        """Return an array of sockets who should get something sent to 

        this group, instance pair.  This includes wildcard subscriptions.""" 

        target = (group, instance) 

        partone = self.find_sub(group, instance) 

        parttwo = self.find_sub(group, "*") 

        return list(set(partone + parttwo)) 

 

class MsgQ: 

    """Message Queue class.""" 

    # did we find a better way to do this? 

    SOCKET_FILE = os.path.join("${prefix}/var", 

                               "bind10-devel", 

                               "msgq_socket").replace("${prefix}", 

                                                      "/home/jelte/opt/bind10") 

 

    def __init__(self, socket_file=None, verbose=False): 

        """Initialize the MsgQ master. 

 

        The socket_file specifies the path to the UNIX domain socket 

        that the msgq process listens on. If it is None, the 

        environment variable BIND10_MSGQ_SOCKET_FILE is used. If that 

        is not set, it will default to 

        ${prefix}/var/bind10-devel/msg_socket. 

        If verbose is True, then the MsgQ reports 

        what it is doing. 

        """ 

 

        if socket_file is None: 

            if "BIND10_MSGQ_SOCKET_FILE" in os.environ: 

                self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"] 

            else: 

                self.socket_file = self.SOCKET_FILE 

        else: 

            self.socket_file = socket_file 

 

        self.verbose = verbose 

        self.poller = None 

        self.kqueue = None 

        self.runnable = False 

        self.listen_socket = False 

        self.sockets = {} 

        self.connection_counter = random.random() 

        self.hostname = socket.gethostname() 

        self.subs = SubscriptionManager() 

        self.lnames = {} 

        self.sendbuffs = {} 

 

    def setup_poller(self): 

        """Set up the poll thing.  Internal function.""" 

        try: 

            self.poller = select.poll() 

        except AttributeError: 

            self.kqueue = select.kqueue() 

 

    def add_kqueue_socket(self, socket, write_filter=False): 

        """Add a kquque filter for a socket.  By default the read 

        filter is used; if write_filter is set to True, the write 

        filter is used.  We use a boolean value instead of a specific 

        filter constant, because kqueue filter values do not seem to 

        be defined on some systems.  The use of boolean makes the 

        interface restrictive because there are other filters, but this 

        method is mostly only for our internal use, so it should be 

        acceptable at least for now.""" 

        filter_type = select.KQ_FILTER_WRITE if write_filter else \ 

            select.KQ_FILTER_READ 

        event = select.kevent(socket.fileno(), filter_type, 

                              select.KQ_EV_ADD | select.KQ_EV_ENABLE) 

        self.kqueue.control([event], 0) 

 

    def delete_kqueue_socket(self, socket, write_filter=False): 

        """Delete a kqueue filter for socket.  See add_kqueue_socket() 

        for the semantics and notes about write_filter.""" 

        filter_type = select.KQ_FILTER_WRITE if write_filter else \ 

            select.KQ_FILTER_READ 

        event = select.kevent(socket.fileno(), filter_type, 

                              select.KQ_EV_DELETE) 

        self.kqueue.control([event], 0) 

 

    def setup_listener(self): 

        """Set up the listener socket.  Internal function.""" 

165        if self.verbose: 

            sys.stdout.write("[b10-msgq] Setting up socket at %s\n" % 

                             self.socket_file) 

 

        self.listen_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 

 

171        if os.path.exists(self.socket_file): 

            os.remove(self.socket_file) 

        try: 

            self.listen_socket.bind(self.socket_file) 

            self.listen_socket.listen(1024) 

        except Exception as e: 

            # remove the file again if something goes wrong 

            # (note this is a catch-all, but we reraise it) 

179            if os.path.exists(self.socket_file): 

                os.remove(self.socket_file) 

            raise e 

 

185        if self.poller: 

            self.poller.register(self.listen_socket, select.POLLIN) 

        else: 

            self.add_kqueue_socket(self.listen_socket) 

 

    def setup(self): 

        """Configure listener socket, polling, etc. 

           Raises a socket.error if the socket_file cannot be 

           created. 

        """ 

 

        self.setup_poller() 

        self.setup_listener() 

 

197        if self.verbose: 

            sys.stdout.write("[b10-msgq] Listening\n") 

 

        self.runnable = True 

 

    def process_accept(self): 

        """Process an accept on the listening socket.""" 

        newsocket, ipaddr = self.listen_socket.accept() 

        # TODO: When we have logging, we might want 

        # to add a debug message here that a new connection 

        # was made 

        self.register_socket(newsocket) 

 

    def register_socket(self, newsocket): 

        """ 

        Internal function to insert a socket. Used by process_accept and some tests. 

        """ 

        self.sockets[newsocket.fileno()] = newsocket 

        lname = self.newlname() 

        self.lnames[lname] = newsocket 

 

220        if self.poller: 

            self.poller.register(newsocket, select.POLLIN) 

        else: 

            self.add_kqueue_socket(newsocket) 

 

    def process_socket(self, fd): 

        """Process a read on a socket.""" 

225        if not fd in self.sockets: 

            sys.stderr.write("[b10-msgq] Got read on Strange Socket fd %d\n" % fd) 

            return 

        sock = self.sockets[fd] 

#        sys.stderr.write("[b10-msgq] Got read on fd %d\n" %fd) 

        self.process_packet(fd, sock) 

 

    def kill_socket(self, fd, sock): 

        """Fully close down the socket.""" 

235        if self.poller: 

            self.poller.unregister(sock) 

        self.subs.unsubscribe_all(sock) 

        lname = [ k for k, v in self.lnames.items() if v == sock ][0] 

        del self.lnames[lname] 

        sock.close() 

        del self.sockets[fd] 

241        if fd in self.sendbuffs: 

            del self.sendbuffs[fd] 

        sys.stderr.write("[b10-msgq] Closing socket fd %d\n" % fd) 

 

    def getbytes(self, fd, sock, length): 

        """Get exactly the requested bytes, or raise an exception if 

           EOF.""" 

        received = b'' 

        while len(received) < length: 

            try: 

                data = sock.recv(length - len(received)) 

            except socket.error: 

                raise MsgQReceiveError(socket.error) 

            if len(data) == 0: 

                raise MsgQReceiveError("EOF") 

            received += data 

        return received 

 

    def read_packet(self, fd, sock): 

        """Read a correctly formatted packet.  Will raise exceptions if 

           something fails.""" 

        lengths = self.getbytes(fd, sock, 6) 

        overall_length, routing_length = struct.unpack(">IH", lengths) 

264        if overall_length < 2: 

            raise MsgQReceiveError("overall_length < 2") 

        overall_length -= 2 

267        if routing_length > overall_length: 

            raise MsgQReceiveError("routing_length > overall_length") 

269        if routing_length == 0: 

            raise MsgQReceiveError("routing_length == 0") 

        data_length = overall_length - routing_length 

        # probably need to sanity check lengths here... 

        routing = self.getbytes(fd, sock, routing_length) 

        if data_length > 0: 

            data = self.getbytes(fd, sock, data_length) 

        else: 

            data = None 

        return (routing, data) 

 

    def process_packet(self, fd, sock): 

        """Process one packet.""" 

        try: 

            routing, data = self.read_packet(fd, sock) 

        except MsgQReceiveError as err: 

            self.kill_socket(fd, sock) 

            sys.stderr.write("[b10-msgq] Receive error: %s\n" % err) 

            return 

 

        try: 

            routingmsg = isc.cc.message.from_wire(routing) 

        except DecodeError as err: 

            self.kill_socket(fd, sock) 

            sys.stderr.write("[b10-msgq] Routing decode error: %s\n" % err) 

            return 

 

        self.process_command(fd, sock, routingmsg, data) 

 

    def process_command(self, fd, sock, routing, data): 

        """Process a single command.  This will split out into one of the 

           other functions.""" 

        # TODO: A print statement got removed here (one that prints the 

        # routing envelope). When we have logging with multiple levels, 

        # we might want to re-add that on a high debug verbosity. 

        cmd = routing["type"] 

        if cmd == 'send': 

            self.process_command_send(sock, routing, data) 

        elif cmd == 'subscribe': 

            self.process_command_subscribe(sock, routing, data) 

        elif cmd == 'unsubscribe': 

            self.process_command_unsubscribe(sock, routing, data) 

312        elif cmd == 'getlname': 

            self.process_command_getlname(sock, routing, data) 

        elif cmd == 'ping': 

            # Command for testing purposes 

            self.process_command_ping(sock, routing, data) 

        else: 

            sys.stderr.write("[b10-msgq] Invalid command: %s\n" % cmd) 

 

    def preparemsg(self, env, msg = None): 

321        if type(env) == dict: 

            env = isc.cc.message.to_wire(env) 

        if type(msg) == dict: 

            msg = isc.cc.message.to_wire(msg) 

        length = 2 + len(env); 

326        if msg: 

            length += len(msg) 

        ret = struct.pack("!IH", length, len(env)) 

        ret += env 

330        if msg: 

            ret += msg 

        return ret 

 

    def sendmsg(self, sock, env, msg = None): 

        self.send_prepared_msg(sock, self.preparemsg(env, msg)) 

 

    def __send_data(self, sock, data): 

        try: 

            # We set the socket nonblocking, MSG_DONTWAIT doesn't exist 

            # on some OSes 

            sock.setblocking(0) 

            return sock.send(data) 

        except socket.error as e: 

            if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK: 

                return 0 

            else: 

                raise e 

        finally: 

            # And set it back again 

            sock.setblocking(1) 

 

    def send_prepared_msg(self, sock, msg): 

        # Try to send the data, but only if there's nothing waiting 

        fileno = sock.fileno() 

354        if fileno in self.sendbuffs: 

            amount_sent = 0 

        else: 

            try: 

                amount_sent = self.__send_data(sock, msg) 

            except socket.error as sockerr: 

                # in the case the other side seems gone, kill the socket 

                # and drop the send action 

                if sockerr.errno == errno.EPIPE: 

                    print("[b10-msgq] SIGPIPE on send, dropping message " + 

                          "and closing connection") 

                    self.kill_socket(fileno, sock) 

                    return 

                else: 

                    raise 

 

        # Still something to send 

371        if amount_sent < len(msg): 

            now = time.clock() 

            # Append it to buffer (but check the data go away) 

            if fileno in self.sendbuffs: 

                (last_sent, buff) = self.sendbuffs[fileno] 

                if now - last_sent > 0.1: 

                    self.kill_socket(fileno, sock) 

                    return 

                buff += msg 

            else: 

                buff = msg[amount_sent:] 

                last_sent = now 

                if self.poller: 

                    self.poller.register(fileno, select.POLLIN | 

                        select.POLLOUT) 

                else: 

                    self.add_kqueue_socket(sock, True) 

            self.sendbuffs[fileno] = (last_sent, buff) 

 

    def __process_write(self, fileno): 

        # Try to send some data from the buffer 

        (_, msg) = self.sendbuffs[fileno] 

        sock = self.sockets[fileno] 

        amount_sent = self.__send_data(sock, msg) 

        # Keep the rest 

        msg = msg[amount_sent:] 

        if len(msg) == 0: 

            # If there's no more, stop requesting for write availability 

            if self.poller: 

                self.poller.register(fileno, select.POLLIN) 

            else: 

                self.delete_kqueue_socket(sock, True) 

            del self.sendbuffs[fileno] 

        else: 

            self.sendbuffs[fileno] = (time.clock(), msg) 

 

    def newlname(self): 

        """Generate a unique connection identifier for this socket. 

        This is done by using an increasing counter and the current 

        time.""" 

        self.connection_counter += 1 

        return "%x_%x@%s" % (time.time(), self.connection_counter, self.hostname) 

 

    def process_command_ping(self, sock, routing, data): 

        self.sendmsg(sock, { "type" : "pong" }, data) 

 

    def process_command_getlname(self, sock, routing, data): 

        lname = [ k for k, v in self.lnames.items() if v == sock ][0] 

        self.sendmsg(sock, { "type" : "getlname" }, { "lname" : lname }) 

 

    def process_command_send(self, sock, routing, data): 

        group = routing["group"] 

        instance = routing["instance"] 

        to = routing["to"] 

425        if group == None or instance == None: 

            return  # ignore invalid packets entirely 

 

        if to == "*": 

            sockets = self.subs.find(group, instance) 

        else: 

433            if to in self.lnames: 

                sockets = [ self.lnames[to] ] 

            else: 

                return # recipient doesn't exist 

 

        msg = self.preparemsg(routing, data) 

 

438        if sock in sockets: 

            sockets.remove(sock) 

        for socket in sockets: 

            self.send_prepared_msg(socket, msg) 

 

    def process_command_subscribe(self, sock, routing, data): 

        group = routing["group"] 

        instance = routing["instance"] 

446        if group == None or instance == None: 

            return  # ignore invalid packets entirely 

        self.subs.subscribe(group, instance, sock) 

 

    def process_command_unsubscribe(self, sock, routing, data): 

        group = routing["group"] 

        instance = routing["instance"] 

453        if group == None or instance == None: 

            return  # ignore invalid packets entirely 

        self.subs.unsubscribe(group, instance, sock) 

 

    def run(self): 

        """Process messages.  Forever.  Mostly.""" 

 

462        if self.poller: 

            self.run_poller() 

        else: 

            self.run_kqueue() 

 

    def run_poller(self): 

        while True: 

            try: 

                events = self.poller.poll() 

            except select.error as err: 

                if err.args[0] == errno.EINTR: 

                    events = [] 

                else: 

                    sys.stderr.write("[b10-msgq] Error with poll(): %s\n" % err) 

                    break 

            for (fd, event) in events: 

                if fd == self.listen_socket.fileno(): 

                    self.process_accept() 

                else: 

479                    if event & select.POLLOUT: 

                        self.__process_write(fd) 

474                    if event & select.POLLIN: 

                        self.process_socket(fd) 

 

    def run_kqueue(self): 

        while True: 

            events = self.kqueue.control(None, 10) 

            if not events: 

                raise RuntimeError('serve: kqueue returned no events') 

 

            for event in events: 

                if event.ident == self.listen_socket.fileno(): 

                    self.process_accept() 

                else: 

                    if event.filter == select.KQ_FILTER_WRITE: 

                        self.__process_write(event.ident) 

                    if event.filter == select.KQ_FILTER_READ and \ 

                            event.data > 0: 

                        self.process_socket(event.ident) 

                    elif event.flags & select.KQ_EV_EOF: 

                        self.kill_socket(event.ident, 

                                         self.sockets[event.ident]) 

 

    def shutdown(self): 

        """Stop the MsgQ master.""" 

505        if self.verbose: 

            sys.stdout.write("[b10-msgq] Stopping the server.\n") 

        self.listen_socket.close() 

exit        if os.path.exists(self.socket_file): 

            os.remove(self.socket_file) 

 

# can signal handling and calling a destructor be done without a 

# global variable? 

msgq = None 

 

def signal_handler(signal, frame): 

    if msgq: 

        msgq.shutdown() 

    sys.exit(0) 

 

520if __name__ == "__main__": 

    def check_port(option, opt_str, value, parser): 

        """Function to insure that the port we are passed is actually 

        a valid port number. Used by OptionParser() on startup.""" 

        intval = int(value) 

        if (intval < 0) or (intval > 65535): 

            raise OptionValueError("%s requires a port number (0-65535)" % opt_str) 

        parser.values.msgq_port = intval 

 

    # Parse any command-line options. 

    parser = OptionParser(version=VERSION) 

    parser.add_option("-v", "--verbose", dest="verbose", action="store_true", 

                      help="display more about what is going on") 

    parser.add_option("-s", "--socket-file", dest="msgq_socket_file", 

                      type="string", default=None, 

                      help="UNIX domain socket file the msgq daemon will use") 

    (options, args) = parser.parse_args() 

 

    signal.signal(signal.SIGTERM, signal_handler) 

 

    # Announce startup. 

    if options.verbose: 

        sys.stdout.write("[b10-msgq] %s\n" % VERSION) 

 

    msgq = MsgQ(options.msgq_socket_file, options.verbose) 

 

    setup_result = msgq.setup() 

    if setup_result: 

        sys.stderr.write("[b10-msgq] Error on startup: %s\n" % setup_result) 

        sys.exit(1) 

 

    try: 

        msgq.run() 

    except KeyboardInterrupt: 

        pass 

 

    msgq.shutdown()