Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451

452

453

454

455

456

457

458

459

460

461

462

463

464

465

466

467

468

469

470

471

472

473

474

475

476

477

478

479

480

481

482

483

484

485

486

487

488

489

490

491

492

493

494

495

496

497

498

499

500

501

502

503

504

505

506

507

508

509

510

511

512

513

514

515

516

517

518

519

520

521

522

523

524

525

526

527

528

529

530

531

532

533

534

535

536

537

538

539

540

541

542

543

544

545

546

547

548

549

550

551

552

553

554

555

556

557

558

559

560

561

562

563

564

565

566

567

568

569

570

571

572

573

574

575

576

577

578

579

580

581

582

583

584

585

586

587

588

589

590

591

592

593

594

595

596

597

598

599

600

601

602

603

604

605

606

607

608

609

610

611

612

613

614

615

616

617

618

619

620

621

622

623

624

625

626

627

628

629

630

631

632

633

634

635

636

637

638

639

640

641

642

643

644

645

646

647

648

649

650

651

652

653

654

655

656

657

658

659

660

661

662

663

664

665

666

667

668

669

670

671

672

673

674

675

676

677

678

679

680

681

682

683

684

685

686

687

688

689

690

691

692

693

694

695

696

697

698

699

700

701

702

703

704

705

706

707

708

709

710

711

712

713

714

715

716

717

718

719

720

721

722

723

724

725

726

727

728

729

730

731

732

733

734

735

736

737

738

739

740

741

742

743

744

745

746

747

748

749

750

751

752

753

754

755

756

757

758

759

760

761

762

763

764

765

766

767

768

769

770

771

772

773

774

775

776

777

778

779

780

781

782

783

784

785

786

787

788

789

790

791

792

793

794

795

796

797

798

799

800

801

802

803

804

805

806

807

808

809

810

811

812

813

814

815

816

817

818

819

820

821

822

823

824

825

826

827

828

829

830

831

832

833

834

835

836

837

838

839

840

841

842

843

844

845

846

847

848

849

850

851

852

853

854

855

856

857

858

859

860

861

862

863

864

865

866

867

868

869

870

871

872

873

874

875

876

877

878

879

880

881

882

883

884

885

886

887

888

889

890

891

892

893

894

895

896

897

898

899

900

901

902

903

904

905

906

907

908

909

910

911

912

913

914

915

916

917

918

919

920

921

922

923

924

925

926

927

928

929

930

931

932

933

934

935

936

937

938

939

940

941

942

943

944

945

946

947

948

949

950

951

952

953

954

955

956

957

958

959

960

961

962

963

964

965

966

967

968

969

970

971

972

973

974

975

976

977

978

979

980

981

982

983

984

985

986

987

988

989

990

991

992

993

994

995

996

997

998

999

1000

1001

1002

1003

1004

1005

1006

1007

1008

1009

1010

1011

1012

1013

1014

1015

1016

1017

1018

1019

1020

1021

1022

1023

1024

1025

1026

1027

1028

1029

1030

1031

1032

1033

1034

1035

1036

1037

1038

1039

1040

1041

1042

1043

1044

1045

1046

1047

1048

1049

1050

1051

1052

1053

1054

#!/usr/bin/python3 

 

# Copyright (C) 2010  Internet Systems Consortium. 

# 

# Permission to use, copy, modify, and distribute this software for any 

# purpose with or without fee is hereby granted, provided that the above 

# copyright notice and this permission notice appear in all copies. 

# 

# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM 

# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL 

# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL 

# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, 

# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING 

# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 

# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION 

# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 

 

 

import sys; sys.path.append ('@@PYTHONPATH@@') 

import isc 

import isc.cc 

import threading 

import struct 

import signal 

from isc.datasrc import DataSourceClient, ZoneFinder, ZoneJournalReader 

from socketserver import * 

import os 

from isc.config.ccsession import * 

from isc.cc import SessionError, SessionTimeout 

from isc.notify import notify_out 

import isc.util.process 

import socket 

import select 

import errno 

from optparse import OptionParser, OptionValueError 

from isc.util import socketserver_mixin 

import isc.server_common.tsig_keyring 

 

from isc.log_messages.xfrout_messages import * 

 

isc.log.init("b10-xfrout") 

logger = isc.log.Logger("xfrout") 

 

# Pending system-wide debug level definitions, the ones we 

# use here are hardcoded for now 

DBG_PROCESS = logger.DBGLVL_TRACE_BASIC 

DBG_COMMANDS = logger.DBGLVL_TRACE_DETAIL 

 

DBG_XFROUT_TRACE = logger.DBGLVL_TRACE_BASIC 

 

try: 

    from libutil_io_python import * 

    from pydnspp import * 

except ImportError as e: 

    # C++ loadable module may not be installed; even so the xfrout process 

    # must keep running, so we warn about it and move forward. 

    logger.error(XFROUT_IMPORT, str(e)) 

 

from isc.acl.acl import ACCEPT, REJECT, DROP, LoaderError 

from isc.acl.dns import REQUEST_LOADER 

 

isc.util.process.rename() 

 

class XfroutConfigError(Exception): 

    """An exception indicating an error in updating xfrout configuration. 

 

    This exception is raised when the xfrout process encouters an error in 

    handling configuration updates.  Not all syntax error can be caught 

    at the module-CC layer, so xfrout needs to (explicitly or implicitly) 

    validate the given configuration data itself.  When it finds an error 

    it raises this exception (either directly or by converting an exception 

    from other modules) as a unified error in configuration. 

    """ 

    pass 

 

class XfroutSessionError(Exception): 

    '''An exception raised for some unexpected events during an xfrout session. 

    ''' 

    pass 

 

def init_paths(): 

    global SPECFILE_PATH 

    global AUTH_SPECFILE_PATH 

    global UNIX_SOCKET_FILE 

    if "B10_FROM_BUILD" in os.environ: 

        SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/xfrout" 

        AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth" 

89        if "B10_FROM_SOURCE_LOCALSTATEDIR" in os.environ: 

            UNIX_SOCKET_FILE = os.environ["B10_FROM_SOURCE_LOCALSTATEDIR"] + \ 

                "/auth_xfrout_conn" 

        else: 

            UNIX_SOCKET_FILE = os.environ["B10_FROM_BUILD"] + "/auth_xfrout_conn" 

    else: 

        PREFIX = "/home/jelte/opt/bind10" 

        DATAROOTDIR = "${prefix}/share" 

        SPECFILE_PATH = "${datarootdir}/bind10-devel".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX) 

        AUTH_SPECFILE_PATH = SPECFILE_PATH 

        if "BIND10_XFROUT_SOCKET_FILE" in os.environ: 

            UNIX_SOCKET_FILE = os.environ["BIND10_XFROUT_SOCKET_FILE"] 

        else: 

            UNIX_SOCKET_FILE = "@@LOCALSTATEDIR@@/bind10-devel/auth_xfrout_conn" 

 

init_paths() 

 

SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec" 

AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + os.sep + "auth.spec" 

VERBOSE_MODE = False 

XFROUT_DNS_HEADER_SIZE = 12     # protocol constant 

XFROUT_MAX_MESSAGE_SIZE = 65535 # ditto 

 

# borrowed from xfrin.py @ #1298.  We should eventually unify it. 

def format_zone_str(zone_name, zone_class): 

    """Helper function to format a zone name and class as a string of 

       the form '<name>/<class>'. 

       Parameters: 

       zone_name (isc.dns.Name) name to format 

       zone_class (isc.dns.RRClass) class to format 

    """ 

    return zone_name.to_text(True) + '/' + str(zone_class) 

 

# borrowed from xfrin.py @ #1298. 

def format_addrinfo(addrinfo): 

    """Helper function to format the addrinfo as a string of the form 

       <addr>:<port> (for IPv4) or [<addr>]:port (for IPv6). For unix domain 

       sockets, and unknown address families, it returns a basic string 

       conversion of the third element of the passed tuple. 

       Parameters: 

       addrinfo: a 3-tuple consisting of address family, socket type, and, 

                 depending on the family, either a 2-tuple with the address 

                 and port, or a filename 

    """ 

    try: 

135        if addrinfo[0] == socket.AF_INET: 

            return str(addrinfo[2][0]) + ":" + str(addrinfo[2][1]) 

        elif addrinfo[0] == socket.AF_INET6: 

            return "[" + str(addrinfo[2][0]) + "]:" + str(addrinfo[2][1]) 

        else: 

            return str(addrinfo[2]) 

    except IndexError: 

        raise TypeError("addrinfo argument to format_addrinfo() does not " 

                        "appear to be consisting of (family, socktype, (addr, port))") 

 

def get_rrset_len(rrset): 

    """Returns the wire length of the given RRset""" 

    bytes = bytearray() 

    rrset.to_wire(bytes) 

    return len(bytes) 

 

def get_soa_serial(soa_rdata): 

    '''Extract the serial field of an SOA RDATA and returns it as an Serial object. 

    ''' 

    return Serial(int(soa_rdata.to_text().split()[2])) 

 

class XfroutSession(): 

    def __init__(self, sock_fd, request_data, server, tsig_key_ring, remote, 

                 default_acl, zone_config, client_class=DataSourceClient): 

        self._sock_fd = sock_fd 

        self._request_data = request_data 

        self._server = server 

        self._tsig_key_ring = tsig_key_ring 

        self._tsig_ctx = None 

        self._tsig_len = 0 

        self._remote = remote 

        self._request_type = None 

        self._request_typestr = None 

        self._acl = default_acl 

        self._zone_config = zone_config 

        self.ClientClass = client_class # parameterize this for testing 

        self._soa = None # will be set in _xfrout_setup or in tests 

        self._jnl_reader = None # will be set to a reader for IXFR 

        self._handle() 

 

    def create_tsig_ctx(self, tsig_record, tsig_key_ring): 

        return TSIGContext(tsig_record.get_name(), tsig_record.get_rdata().get_algorithm(), 

                           tsig_key_ring) 

 

    def _handle(self): 

        ''' Handle a xfrout query, send xfrout response(s). 

 

        This is separated from the constructor so that we can override 

        it from tests. 

 

        ''' 

        # Check the xfrout quota.  We do both increase/decrease in this 

        # method so it's clear we always release it once acuired. 

        quota_ok = self._server.increase_transfers_counter() 

        ex = None 

        try: 

            self.dns_xfrout_start(self._sock_fd, self._request_data, quota_ok) 

        except Exception as e: 

            # To avoid resource leak we need catch all possible exceptions 

            # We log it later to exclude the case where even logger raises 

            # an exception. 

            ex = e 

 

        # Release any critical resources 

        if quota_ok: 

            self._server.decrease_transfers_counter() 

        self._close_socket() 

 

        if ex is not None: 

            logger.error(XFROUT_HANDLE_QUERY_ERROR, ex) 

 

    def _close_socket(self): 

        '''Simply close the socket via the given FD. 

 

        This is a dedicated subroutine of handle() and is sepsarated from it 

        for the convenience of tests. 

 

        ''' 

        os.close(self._sock_fd) 

 

    def _check_request_tsig(self, msg, request_data): 

        ''' If request has a tsig record, perform tsig related checks ''' 

        tsig_record = msg.get_tsig_record() 

        if tsig_record is not None: 

            self._tsig_len = tsig_record.get_length() 

            self._tsig_ctx = self.create_tsig_ctx(tsig_record, 

                                                  self._tsig_key_ring) 

            tsig_error = self._tsig_ctx.verify(tsig_record, request_data) 

            if tsig_error != TSIGError.NOERROR: 

                return Rcode.NOTAUTH() 

 

        return Rcode.NOERROR() 

 

    def _parse_query_message(self, mdata): 

        ''' parse query message to [socket,message]''' 

        #TODO, need to add parseHeader() in case the message header is invalid 

        try: 

            msg = Message(Message.PARSE) 

            Message.from_wire(msg, mdata) 

        except Exception as err: # Exception is too broad 

            logger.error(XFROUT_PARSE_QUERY_ERROR, err) 

            return Rcode.FORMERR(), None 

 

        # TSIG related checks 

        rcode = self._check_request_tsig(msg, mdata) 

        if rcode != Rcode.NOERROR(): 

            return rcode, msg 

 

        # Make sure the question is valid.  This should be ensured by 

        # the auth server, but since it's far from xfrout itself, we check 

        # it by ourselves.  A viloation would be an internal bug, so we 

        # raise and stop here rather than returning a FORMERR or SERVFAIL. 

        if msg.get_rr_count(Message.SECTION_QUESTION) != 1: 

            raise RuntimeError('Invalid number of question for XFR: ' + 

                               str(msg.get_rr_count(Message.SECTION_QUESTION))) 

        question = msg.get_question()[0] 

 

        # Identify the request type 

        self._request_type = question.get_type() 

        if self._request_type == RRType.AXFR(): 

            self._request_typestr = 'AXFR' 

        elif self._request_type == RRType.IXFR(): 

            self._request_typestr = 'IXFR' 

        else: 

            # Likewise, this should be impossible. 

            raise RuntimeError('Unexpected XFR type: ' + 

                               str(self._request_type)) 

 

        # ACL checks 

        zone_name = question.get_name() 

        zone_class = question.get_class() 

        acl = self._get_transfer_acl(zone_name, zone_class) 

        acl_result = acl.execute( 

            isc.acl.dns.RequestContext(self._remote[2], msg.get_tsig_record())) 

        if acl_result == DROP: 

            logger.debug(DBG_XFROUT_TRACE, XFROUT_QUERY_DROPPED, 

                         self._request_type, format_addrinfo(self._remote), 

                         format_zone_str(zone_name, zone_class)) 

            return None, None 

        elif acl_result == REJECT: 

            logger.debug(DBG_XFROUT_TRACE, XFROUT_QUERY_REJECTED, 

                         self._request_type, format_addrinfo(self._remote), 

                         format_zone_str(zone_name, zone_class)) 

            return Rcode.REFUSED(), msg 

 

        return rcode, msg 

 

    def _get_transfer_acl(self, zone_name, zone_class): 

        '''Return the ACL that should be applied for a given zone. 

 

        The zone is identified by a tuple of name and RR class. 

        If a per zone configuration for the zone exists and contains 

        transfer_acl, that ACL will be used; otherwise, the default 

        ACL will be used. 

 

        ''' 

        # Internally zone names are managed in lower cased label characters, 

        # so we first need to convert the name. 

        zone_name_lower = Name(zone_name.to_text(), True) 

        config_key = (zone_class.to_text(), zone_name_lower.to_text()) 

        if config_key in self._zone_config and \ 

                'transfer_acl' in self._zone_config[config_key]: 

            return self._zone_config[config_key]['transfer_acl'] 

        return self._acl 

 

    def _send_data(self, sock_fd, data): 

        size = len(data) 

        total_count = 0 

        while total_count < size: 

            count = os.write(sock_fd, data[total_count:]) 

            total_count += count 

 

 

    def _send_message(self, sock_fd, msg, tsig_ctx=None): 

        render = MessageRenderer() 

        # As defined in RFC5936 section3.4, perform case-preserving name 

        # compression for AXFR message. 

        render.set_compress_mode(MessageRenderer.CASE_SENSITIVE) 

        render.set_length_limit(XFROUT_MAX_MESSAGE_SIZE) 

 

        # XXX Currently, python wrapper doesn't accept 'None' parameter in this case, 

        # we should remove the if statement and use a universal interface later. 

        if tsig_ctx is not None: 

            msg.to_wire(render, tsig_ctx) 

        else: 

            msg.to_wire(render) 

 

        header_len = struct.pack('H', socket.htons(render.get_length())) 

        self._send_data(sock_fd, header_len) 

        self._send_data(sock_fd, render.get_data()) 

 

 

    def _reply_query_with_error_rcode(self, msg, sock_fd, rcode_): 

        if not msg: 

            return # query message is invalid. send nothing back. 

 

        msg.make_response() 

        msg.set_rcode(rcode_) 

        self._send_message(sock_fd, msg, self._tsig_ctx) 

 

    def _get_zone_soa(self, zone_name): 

        '''Retrieve the SOA RR of the given zone. 

 

        It returns a pair of RCODE and the SOA (in the form of RRset). 

        On success RCODE is NOERROR and returned SOA is not None; 

        on failure RCODE indicates the appropriate code in the context of 

        xfr processing, and the returned SOA is None. 

 

        ''' 

        result, finder = self._datasrc_client.find_zone(zone_name) 

        if result != DataSourceClient.SUCCESS: 

            return (Rcode.NOTAUTH(), None) 

        result, soa_rrset, _ = finder.find(zone_name, RRType.SOA()) 

        if result != ZoneFinder.SUCCESS: 

            return (Rcode.SERVFAIL(), None) 

        # Especially for database-based zones, a working zone may be in 

        # a broken state where it has more than one SOA RR.  We proactively 

        # check the condition and abort the xfr attempt if we identify it. 

        if soa_rrset.get_rdata_count() != 1: 

            return (Rcode.SERVFAIL(), None) 

        return (Rcode.NOERROR(), soa_rrset) 

 

    def __axfr_setup(self, zone_name): 

        '''Setup a zone iterator for AXFR or AXFR-style IXFR. 

 

        ''' 

        try: 

            # Note that we enable 'separate_rrs'.  In xfr-out we need to 

            # preserve as many things as possible (even if it's half broken) 

            # stored in the zone. 

            self._iterator = self._datasrc_client.get_iterator(zone_name, 

                                                               True) 

        except isc.datasrc.Error: 

            # If the current name server does not have authority for the 

            # zone, xfrout can't serve for it, return rcode NOTAUTH. 

            # Note: this exception can happen for other reasons.  We should 

            # update get_iterator() API so that we can distinguish "no such 

            # zone" and other cases (#1373).  For now we consider all these 

            # cases as NOTAUTH. 

            return Rcode.NOTAUTH() 

 

        # If we are an authoritative name server for the zone, but fail 

        # to find the zone's SOA record in datasource, xfrout can't 

        # provide zone transfer for it. 

        self._soa = self._iterator.get_soa() 

        if self._soa is None or self._soa.get_rdata_count() != 1: 

            return Rcode.SERVFAIL() 

 

        return Rcode.NOERROR() 

 

    def __ixfr_setup(self, request_msg, zone_name, zone_class): 

        '''Setup a zone journal reader for IXFR. 

 

        If the underlying data source does not know the requested range 

        of zone differences it automatically falls back to AXFR-style 

        IXFR by setting up a zone iterator instead of a journal reader. 

 

        ''' 

        # Check the authority section.  Look for a SOA record with 

        # the same name and class as the question. 

        remote_soa = None 

        for auth_rrset in request_msg.get_section(Message.SECTION_AUTHORITY): 

            # Ignore data whose owner name is not the zone apex, and 

            # ignore non-SOA or different class of records. 

            if auth_rrset.get_name() != zone_name or \ 

                    auth_rrset.get_type() != RRType.SOA() or \ 

                    auth_rrset.get_class() != zone_class: 

                continue 

            if auth_rrset.get_rdata_count() != 1: 

                logger.info(XFROUT_IXFR_MULTIPLE_SOA, 

                            format_addrinfo(self._remote)) 

                return Rcode.FORMERR() 

            remote_soa = auth_rrset 

        if remote_soa is None: 

            logger.info(XFROUT_IXFR_NO_SOA, format_addrinfo(self._remote)) 

            return Rcode.FORMERR() 

 

        # Retrieve the local SOA 

        rcode, self._soa = self._get_zone_soa(zone_name) 

        if rcode != Rcode.NOERROR(): 

            return rcode 

 

        # RFC1995 says "If an IXFR query with the same or newer version 

        # number than that of the server is received, it is replied to with 

        # a single SOA record of the server's current version, just as 

        # in AXFR".  The claim about AXFR is incorrect, but other than that, 

        # we do as the RFC says. 

        begin_serial = get_soa_serial(remote_soa.get_rdata()[0]) 

        end_serial = get_soa_serial(self._soa.get_rdata()[0]) 

        if begin_serial >= end_serial: 

            # clear both iterator and jnl_reader to signal we won't do 

            # iteration in response generation 

            self._iterator = None 

            self._jnl_reader = None 

            logger.info(XFROUT_IXFR_UPTODATE, format_addrinfo(self._remote), 

                        format_zone_str(zone_name, zone_class), 

                        begin_serial, end_serial) 

            return Rcode.NOERROR() 

 

        # Set up the journal reader or fall back to AXFR-style IXFR 

        try: 

            code, self._jnl_reader = self._datasrc_client.get_journal_reader( 

                zone_name, begin_serial.get_value(), end_serial.get_value()) 

        except isc.datasrc.NotImplemented as ex: 

            # The underlying data source doesn't support journaling. 

            # Fall back to AXFR-style IXFR. 

            logger.info(XFROUT_IXFR_NO_JOURNAL_SUPPORT, 

                        format_addrinfo(self._remote), 

                        format_zone_str(zone_name, zone_class)) 

            return self.__axfr_setup(zone_name) 

        if code == ZoneJournalReader.NO_SUCH_VERSION: 

            logger.info(XFROUT_IXFR_NO_VERSION, format_addrinfo(self._remote), 

                        format_zone_str(zone_name, zone_class), 

                        begin_serial, end_serial) 

            return self.__axfr_setup(zone_name) 

        if code == ZoneJournalReader.NO_SUCH_ZONE: 

            # this is quite unexpected as we know zone's SOA exists. 

            # It might be a bug or the data source is somehow broken, 

            # but it can still happen if someone has removed the zone 

            # between these two operations.  We treat it as NOTAUTH. 

            logger.warn(XFROUT_IXFR_NO_ZONE, format_addrinfo(self._remote), 

                        format_zone_str(zone_name, zone_class)) 

            return Rcode.NOTAUTH() 

 

        # Use the reader as the iterator to generate the response. 

        self._iterator = self._jnl_reader 

 

        return Rcode.NOERROR() 

 

    def _xfrout_setup(self, request_msg, zone_name, zone_class): 

        '''Setup a context for xfr responses according to the request type. 

 

        This method identifies the most appropriate data source for the 

        request and set up a zone iterator or journal reader depending on 

        whether the request is AXFR or IXFR.  If it identifies any protocol 

        level error it returns an RCODE other than NOERROR. 

 

        ''' 

 

        # Identify the data source for the requested zone and see if it has 

        # SOA while initializing objects used for request processing later. 

        # We should eventually generalize this so that we can choose the 

        # appropriate data source from (possible) multiple candidates. 

        # We should eventually take into account the RR class here. 

        # For now, we hardcode a particular type (SQLite3-based), and only 

        # consider that one. 

        datasrc_config = '{ "database_file": "' + \ 

            self._server.get_db_file() + '"}' 

        self._datasrc_client = self.ClientClass('sqlite3', datasrc_config) 

 

        if self._request_type == RRType.AXFR(): 

            return self.__axfr_setup(zone_name) 

        else: 

            return self.__ixfr_setup(request_msg, zone_name, zone_class) 

 

    def dns_xfrout_start(self, sock_fd, msg_query, quota_ok=True): 

        rcode_, msg = self._parse_query_message(msg_query) 

        #TODO. create query message and parse header 

492        if rcode_ is None: # Dropped by ACL 

            return 

494        elif rcode_ == Rcode.NOTAUTH() or rcode_ == Rcode.REFUSED(): 

            return self._reply_query_with_error_rcode(msg, sock_fd, rcode_) 

        elif rcode_ != Rcode.NOERROR(): 

            return self._reply_query_with_error_rcode(msg, sock_fd, 

                                                      Rcode.FORMERR()) 

        elif not quota_ok: 

            logger.warn(XFROUT_QUERY_QUOTA_EXCCEEDED, self._request_typestr, 

                        format_addrinfo(self._remote), 

                        self._server._max_transfers_out) 

            return self._reply_query_with_error_rcode(msg, sock_fd, 

                                                      Rcode.REFUSED()) 

 

        question = msg.get_question()[0] 

        zone_name = question.get_name() 

        zone_class = question.get_class() 

        zone_str = format_zone_str(zone_name, zone_class) # for logging 

 

        try: 

            rcode_ = self._xfrout_setup(msg, zone_name, zone_class) 

        except Exception as ex: 

            logger.error(XFROUT_XFR_TRANSFER_CHECK_ERROR, self._request_typestr, 

                         format_addrinfo(self._remote), zone_str, ex) 

            rcode_ = Rcode.SERVFAIL() 

        if rcode_ != Rcode.NOERROR(): 

            logger.info(XFROUT_XFR_TRANSFER_FAILED, self._request_typestr, 

                        format_addrinfo(self._remote), zone_str, rcode_) 

            return self._reply_query_with_error_rcode(msg, sock_fd, rcode_) 

 

        try: 

            logger.info(XFROUT_XFR_TRANSFER_STARTED, self._request_typestr, 

                        format_addrinfo(self._remote), zone_str) 

            self._reply_xfrout_query(msg, sock_fd) 

        except Exception as err: 

            logger.error(XFROUT_XFR_TRANSFER_ERROR, self._request_typestr, 

                    format_addrinfo(self._remote), zone_str, err) 

        logger.info(XFROUT_XFR_TRANSFER_DONE, self._request_typestr, 

                    format_addrinfo(self._remote), zone_str) 

 

    def _clear_message(self, msg): 

        qid = msg.get_qid() 

        opcode = msg.get_opcode() 

        rcode = msg.get_rcode() 

 

        msg.clear(Message.RENDER) 

        msg.set_qid(qid) 

        msg.set_opcode(opcode) 

        msg.set_rcode(rcode) 

        msg.set_header_flag(Message.HEADERFLAG_AA) 

        msg.set_header_flag(Message.HEADERFLAG_QR) 

        return msg 

 

    def _send_message_with_last_soa(self, msg, sock_fd, rrset_soa, 

                                    message_upper_len): 

        '''Add the SOA record to the end of message. 

 

        If it would exceed the maximum allowable size of a message, a new 

        message will be created to send out the last SOA. 

 

        We assume a message with a single SOA can always fit the buffer 

        with or without TSIG.  In theory this could be wrong if TSIG is 

        stupidly large, but in practice this assumption should be reasonable. 

        ''' 

        if message_upper_len + get_rrset_len(rrset_soa) > \ 

                XFROUT_MAX_MESSAGE_SIZE: 

            self._send_message(sock_fd, msg, self._tsig_ctx) 

            msg = self._clear_message(msg) 

 

        msg.add_rrset(Message.SECTION_ANSWER, rrset_soa) 

        self._send_message(sock_fd, msg, self._tsig_ctx) 

 

    def _reply_xfrout_query(self, msg, sock_fd): 

        msg.make_response() 

        msg.set_header_flag(Message.HEADERFLAG_AA) 

        # Reserved space for the fixed header size, the size of the question 

        # section, and TSIG size (when included).  The size of the question 

        # section is the sum of the qname length and the size of the 

        # fixed-length fields (type and class, 2 bytes each). 

        message_upper_len = XFROUT_DNS_HEADER_SIZE + \ 

            msg.get_question()[0].get_name().get_length() + 4 + \ 

            self._tsig_len 

 

        # If the iterator is None, we are responding to IXFR with a single 

        # SOA RR. 

        if self._iterator is None: 

            self._send_message_with_last_soa(msg, sock_fd, self._soa, 

                                             message_upper_len) 

            return 

 

        # Add the beginning SOA 

        msg.add_rrset(Message.SECTION_ANSWER, self._soa) 

        message_upper_len += get_rrset_len(self._soa) 

 

        # Add the rest of the zone/diff contets 

        for rrset in self._iterator: 

            # Check if xfrout is shutdown 

589            if  self._server._shutdown_event.is_set(): 

                logger.info(XFROUT_STOPPING) 

                return 

 

            # For AXFR (or AXFR-style IXFR), in which case _jnl_reader is None, 

            # we should skip SOAs from the iterator. 

            if self._jnl_reader is None and rrset.get_type() == RRType.SOA(): 

                continue 

 

            # We calculate the maximum size of the RRset (i.e. the 

            # size without compression) and use that to see if we 

            # may have reached the limit 

            rrset_len = get_rrset_len(rrset) 

 

            if message_upper_len + rrset_len <= XFROUT_MAX_MESSAGE_SIZE: 

                msg.add_rrset(Message.SECTION_ANSWER, rrset) 

                message_upper_len += rrset_len 

                continue 

 

            # RR would not fit.  If there are other RRs in the buffer, send 

            # them now and leave this RR to the next message. 

            self._send_message(sock_fd, msg, self._tsig_ctx) 

 

            # Create a new message and reserve space for the carried-over 

            # RR (and TSIG space in case it's to be TSIG signed) 

            msg = self._clear_message(msg) 

            message_upper_len = XFROUT_DNS_HEADER_SIZE + rrset_len + \ 

                self._tsig_len 

 

            # If this RR overflows the buffer all by itself, fail.  In theory 

            # some RRs might fit in a TCP message when compressed even if they 

            # do not fit when uncompressed, but surely we don't want to send 

            # such monstrosities to an unsuspecting slave. 

            if message_upper_len > XFROUT_MAX_MESSAGE_SIZE: 

                raise XfroutSessionError('RR too large for zone transfer (' + 

                                         str(rrset_len) + ' bytes)') 

 

            # Add the RRset to the new message 

            msg.add_rrset(Message.SECTION_ANSWER, rrset) 

 

        # Add and send the trailing SOA 

        self._send_message_with_last_soa(msg, sock_fd, self._soa, 

                                         message_upper_len) 

 

class UnixSockServer(socketserver_mixin.NoPollMixIn, 

                     ThreadingUnixStreamServer): 

    '''The unix domain socket server which accept xfr query sent from auth server.''' 

 

    def __init__(self, sock_file, handle_class, shutdown_event, config_data, 

                 cc): 

        self._remove_unused_sock_file(sock_file) 

        self._sock_file = sock_file 

        socketserver_mixin.NoPollMixIn.__init__(self) 

        ThreadingUnixStreamServer.__init__(self, sock_file, handle_class) 

        self._shutdown_event = shutdown_event 

        self._write_sock, self._read_sock = socket.socketpair() 

        self._common_init() 

        self._cc = cc 

        self.update_config_data(config_data) 

 

    def _common_init(self): 

        '''Initialization shared with the mock server class used for tests''' 

        self._lock = threading.Lock() 

        self._transfers_counter = 0 

        self._zone_config = {} 

        self._acl = None # this will be initialized in update_config_data() 

 

    def _receive_query_message(self, sock): 

        ''' receive request message from sock''' 

        # receive data length 

        data_len = sock.recv(2) 

660        if not data_len: 

            return None 

        msg_len = struct.unpack('!H', data_len)[0] 

        # receive data 

        recv_size = 0 

        msgdata = b'' 

        while recv_size < msg_len: 

            data = sock.recv(msg_len - recv_size) 

668            if not data: 

                return None 

            recv_size += len(data) 

            msgdata += data 

 

        return msgdata 

 

    def handle_request(self): 

        ''' Enable server handle a request until shutdown or auth is closed.''' 

        try: 

            request, client_address = self.get_request() 

        except socket.error: 

            logger.error(XFROUT_FETCH_REQUEST_ERROR) 

            return 

 

        # Check self._shutdown_event to ensure the real shutdown comes. 

        # Linux could trigger a spurious readable event on the _read_sock 

        # due to a bug, so we need perform a double check. 

        while not self._shutdown_event.is_set(): # Check if xfrout is shutdown 

            try: 

                (rlist, wlist, xlist) = select.select([self._read_sock, request], [], []) 

            except select.error as e: 

                if e.args[0] == errno.EINTR: 

                    (rlist, wlist, xlist) = ([], [], []) 

                    continue 

                else: 

                    logger.error(XFROUT_SOCKET_SELECT_ERROR, str(e)) 

                    break 

 

            # self.server._shutdown_event will be set by now, if it is not a false 

            # alarm 

            if self._read_sock in rlist: 

                continue 

 

            try: 

                self.process_request(request) 

            except Exception as pre: 

                logger.error(XFROUT_PROCESS_REQUEST_ERROR, str(pre)) 

                break 

 

    def _handle_request_noblock(self): 

        """Override the function _handle_request_noblock(), it creates a new 

        thread to handle requests for each auth""" 

        td = threading.Thread(target=self.handle_request) 

        td.setDaemon(True) 

        td.start() 

 

    def process_request(self, request): 

        """Receive socket fd and query message from auth, then 

        start a new thread to process the request.""" 

        sock_fd = recv_fd(request.fileno()) 

        if sock_fd < 0: 

            # This may happen when one xfrout process try to connect to 

            # xfrout unix socket server, to check whether there is another 

            # xfrout running. 

            if sock_fd == FD_SYSTEM_ERROR: 

                logger.error(XFROUT_RECEIVE_FILE_DESCRIPTOR_ERROR) 

            return 

 

        # receive request msg 

        request_data = self._receive_query_message(request) 

        if not request_data: 

            return 

 

        t = threading.Thread(target=self.finish_request, 

                             args = (sock_fd, request_data)) 

        if self.daemon_threads: 

            t.daemon = True 

        t.start() 

 

    def _guess_remote(self, sock_fd): 

        """Guess remote address and port of the socket. 

 

        The sock_fd must be a file descriptor of a socket. 

        This method retuns a 3-tuple consisting of address family, 

        socket type, and a 2-tuple with the address (string) and port (int). 

 

        """ 

        # This uses a trick. If the socket is IPv4 in reality and we pretend 

        # it to be IPv6, it returns IPv4 address anyway. This doesn't seem 

        # to care about the SOCK_STREAM parameter at all (which it really is, 

        # except for testing) 

        if socket.has_ipv6: 

            sock = socket.fromfd(sock_fd, socket.AF_INET6, socket.SOCK_STREAM) 

        else: 

            # To make it work even on hosts without IPv6 support 

            # (Any idea how to simulate this in test?) 

            sock = socket.fromfd(sock_fd, socket.AF_INET, socket.SOCK_STREAM) 

        peer = sock.getpeername() 

 

        # Identify the correct socket family.  Due to the above "trick", 

        # we cannot simply use sock.family. 

        family = socket.AF_INET6 

        try: 

            socket.inet_pton(socket.AF_INET6, peer[0]) 

        except socket.error: 

            family = socket.AF_INET 

        return (family, socket.SOCK_STREAM, peer) 

 

    def finish_request(self, sock_fd, request_data): 

        '''Finish one request by instantiating RequestHandlerClass. 

 

        This is an entry point of a separate thread spawned in 

        UnixSockServer.process_request(). 

 

        This method creates a XfroutSession object. 

        ''' 

        self._lock.acquire() 

        acl = self._acl 

        zone_config = self._zone_config 

        self._lock.release() 

        self.RequestHandlerClass(sock_fd, request_data, self, 

                                 isc.server_common.tsig_keyring.get_keyring(), 

                                 self._guess_remote(sock_fd), acl, zone_config) 

 

    def _remove_unused_sock_file(self, sock_file): 

        '''Try to remove the socket file. If the file is being used 

        by one running xfrout process, exit from python. 

        If it's not a socket file or nobody is listening 

        , it will be removed. If it can't be removed, exit from python. ''' 

        if self._sock_file_in_use(sock_file): 

            logger.error(XFROUT_UNIX_SOCKET_FILE_IN_USE, sock_file) 

            sys.exit(0) 

        else: 

792            if not os.path.exists(sock_file): 

                return 

 

            try: 

                os.unlink(sock_file) 

            except OSError as err: 

                logger.error(XFROUT_REMOVE_OLD_UNIX_SOCKET_FILE_ERROR, sock_file, str(err)) 

                sys.exit(0) 

 

    def _sock_file_in_use(self, sock_file): 

        '''Check whether the socket file 'sock_file' exists and 

        is being used by one running xfrout process. If it is, 

        return True, or else return False. ''' 

        try: 

            sock = socket.socket(socket.AF_UNIX) 

            sock.connect(sock_file) 

        except socket.error as err: 

            return False 

        else: 

            return True 

 

    def shutdown(self): 

        self._write_sock.send(b"shutdown") #terminate the xfrout session thread 

        super().shutdown() # call the shutdown() of class socketserver_mixin.NoPollMixIn 

        try: 

            os.unlink(self._sock_file) 

        except Exception as e: 

            logger.error(XFROUT_REMOVE_UNIX_SOCKET_FILE_ERROR, self._sock_file, str(e)) 

 

    def update_config_data(self, new_config): 

        '''Apply the new config setting of xfrout module. 

 

        ''' 

        self._lock.acquire() 

        try: 

            logger.info(XFROUT_NEW_CONFIG) 

            new_acl = self._acl 

            if 'transfer_acl' in new_config: 

                try: 

                    new_acl = REQUEST_LOADER.load(new_config['transfer_acl']) 

                except LoaderError as e: 

                    raise XfroutConfigError('Failed to parse transfer_acl: ' + 

                                            str(e)) 

 

            new_zone_config = self._zone_config 

            zconfig_data = new_config.get('zone_config') 

            if zconfig_data is not None: 

                new_zone_config = self.__create_zone_config(zconfig_data) 

 

            self._acl = new_acl 

            self._zone_config = new_zone_config 

            self._max_transfers_out = new_config.get('transfers_out') 

        except Exception as e: 

            self._lock.release() 

            raise e 

        self._lock.release() 

        logger.info(XFROUT_NEW_CONFIG_DONE) 

 

    def __create_zone_config(self, zone_config_list): 

        new_config = {} 

        for zconf in zone_config_list: 

            # convert the class, origin (name) pair.  First build pydnspp 

            # object to reject invalid input. 

            zclass_str = zconf.get('class') 

            if zclass_str is None: 

                #zclass_str = 'IN' # temporary 

                zclass_str = self._cc.get_default_value('zone_config/class') 

            zclass = RRClass(zclass_str) 

            zorigin = Name(zconf['origin'], True) 

            config_key = (zclass.to_text(), zorigin.to_text()) 

 

            # reject duplicate config 

            if config_key in new_config: 

                raise XfroutConfigError('Duplicate zone_config for ' + 

                                        str(zorigin) + '/' + str(zclass)) 

 

            # create a new config entry, build any given (and known) config 

            new_config[config_key] = {} 

            if 'transfer_acl' in zconf: 

                try: 

                    new_config[config_key]['transfer_acl'] = \ 

                        REQUEST_LOADER.load(zconf['transfer_acl']) 

                except LoaderError as e: 

                    raise XfroutConfigError('Failed to parse transfer_acl ' + 

                                            'for ' + zorigin.to_text() + '/' + 

                                            zclass_str + ': ' + str(e)) 

        return new_config 

 

    def get_db_file(self): 

        file, is_default = self._cc.get_remote_config_value("Auth", "database_file") 

        # this too should be unnecessary, but currently the 

        # 'from build' override isn't stored in the config 

        # (and we don't have indirect python access to datasources yet) 

885        if is_default and "B10_FROM_BUILD" in os.environ: 

            file = os.environ["B10_FROM_BUILD"] + os.sep + "bind10_zones.sqlite3" 

        return file 

 

 

    def increase_transfers_counter(self): 

        '''Return False, if counter + 1 > max_transfers_out, or else 

        return True 

        ''' 

        ret = False 

        self._lock.acquire() 

        if self._transfers_counter < self._max_transfers_out: 

            self._transfers_counter += 1 

            ret = True 

        self._lock.release() 

        return ret 

 

    def decrease_transfers_counter(self): 

        self._lock.acquire() 

        self._transfers_counter -= 1 

        self._lock.release() 

 

class XfroutServer: 

    def __init__(self): 

        self._unix_socket_server = None 

        self._listen_sock_file = UNIX_SOCKET_FILE 

        self._shutdown_event = threading.Event() 

        self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler) 

        self._config_data = self._cc.get_full_config() 

        self._cc.start() 

        self._cc.add_remote_config(AUTH_SPECFILE_LOCATION) 

        isc.server_common.tsig_keyring.init_keyring(self._cc) 

        self._start_xfr_query_listener() 

        self._start_notifier() 

 

    def _start_xfr_query_listener(self): 

        '''Start a new thread to accept xfr query. ''' 

        self._unix_socket_server = UnixSockServer(self._listen_sock_file, 

                                                  XfroutSession, 

                                                  self._shutdown_event, 

                                                  self._config_data, 

                                                  self._cc) 

        listener = threading.Thread(target=self._unix_socket_server.serve_forever) 

        listener.start() 

 

    def _start_notifier(self): 

        datasrc = self._unix_socket_server.get_db_file() 

        self._notifier = notify_out.NotifyOut(datasrc) 

        self._notifier.dispatcher() 

 

    def send_notify(self, zone_name, zone_class): 

        return self._notifier.send_notify(zone_name, zone_class) 

 

    def config_handler(self, new_config): 

        '''Update config data. TODO. Do error check''' 

        answer = create_answer(0) 

        for key in new_config: 

            if key not in self._config_data: 

                answer = create_answer(1, "Unknown config data: " + str(key)) 

                continue 

            self._config_data[key] = new_config[key] 

 

        if self._unix_socket_server: 

            try: 

                self._unix_socket_server.update_config_data(self._config_data) 

            except Exception as e: 

                answer = create_answer(1, 

                                       "Failed to handle new configuration: " + 

                                       str(e)) 

 

        return answer 

 

 

    def shutdown(self): 

        ''' shutdown the xfrout process. The thread which is doing zone transfer-out should be 

        terminated. 

        ''' 

 

        global xfrout_server 

        xfrout_server = None #Avoid shutdown is called twice 

        self._cc.send_stopping() 

        self._shutdown_event.set() 

        self._notifier.shutdown() 

968        if self._unix_socket_server: 

            self._unix_socket_server.shutdown() 

        self._wait_for_threads() 

 

    def _wait_for_threads(self): 

        # Wait for all threads to terminate. this is a call that is only used 

        # in shutdown(), but it has its own method, so we can test shutdown 

        # without involving thread operations (the test would override this 

        # method) 

        main_thread = threading.currentThread() 

        for th in threading.enumerate(): 

            if th is main_thread: 

                continue 

            th.join() 

 

    def command_handler(self, cmd, args): 

        if cmd == "shutdown": 

            logger.info(XFROUT_RECEIVED_SHUTDOWN_COMMAND) 

            self.shutdown() 

            answer = create_answer(0) 

 

        elif cmd == notify_out.ZONE_NEW_DATA_READY_CMD: 

            zone_name = args.get('zone_name') 

            zone_class = args.get('zone_class') 

            if not zone_class: 

                zone_class = str(RRClass.IN()) 

            if zone_name: 

                logger.info(XFROUT_NOTIFY_COMMAND, zone_name, zone_class) 

                if self.send_notify(zone_name, zone_class): 

                    answer = create_answer(0) 

                else: 

                    zonestr = notify_out.format_zone_str(Name(zone_name), 

                                                         zone_class) 

                    answer = create_answer(1, "Unknown zone: " + zonestr) 

            else: 

                answer = create_answer(1, "Bad command parameter:" + str(args)) 

 

        else: 

            answer = create_answer(1, "Unknown command:" + str(cmd)) 

 

        return answer 

 

    def run(self): 

        '''Get and process all commands sent from cfgmgr or other modules. ''' 

        logger.debug(DBG_PROCESS, XFROUT_STARTED) 

        while not self._shutdown_event.is_set(): 

            self._cc.check_command(False) 

 

 

xfrout_server = None 

 

def signal_handler(signal, frame): 

    if xfrout_server: 

        xfrout_server.shutdown() 

        sys.exit(0) 

 

def set_signal_handler(): 

    signal.signal(signal.SIGTERM, signal_handler) 

    signal.signal(signal.SIGINT, signal_handler) 

 

def set_cmd_options(parser): 

    parser.add_option("-v", "--verbose", dest="verbose", action="store_true", 

            help="display more about what is going on") 

 

1032if '__main__' == __name__: 

    try: 

        parser = OptionParser() 

        set_cmd_options(parser) 

        (options, args) = parser.parse_args() 

        VERBOSE_MODE = options.verbose 

 

        set_signal_handler() 

        xfrout_server = XfroutServer() 

        xfrout_server.run() 

    except KeyboardInterrupt: 

        logger.INFO(XFROUT_STOPPED_BY_KEYBOARD) 

    except SessionError as e: 

        logger.error(XFROUT_CC_SESSION_ERROR, str(e)) 

    except ModuleCCSessionError as e: 

        logger.error(XFROUT_MODULECC_SESSION_ERROR, str(e)) 

    except XfroutConfigError as e: 

        logger.error(XFROUT_CONFIG_ERROR, str(e)) 

    except SessionTimeout as e: 

        logger.error(XFROUT_CC_SESSION_TIMEOUT_ERROR) 

 

    if xfrout_server: 

        xfrout_server.shutdown()