Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

# Copyright (C) 2011  Internet Systems Consortium. 

# 

# Permission to use, copy, modify, and distribute this software for any 

# purpose with or without fee is hereby granted, provided that the above 

# copyright notice and this permission notice appear in all copies. 

# 

# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM 

# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL 

# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL 

# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, 

# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING 

# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 

# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION 

# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 

 

'''Tests for the DDNS module''' 

 

import unittest 

import isc 

import ddns 

import isc.config 

import select 

import errno 

import isc.util.cio.socketsession 

import socket 

import os.path 

 

class FakeSocket: 

    """ 

    A fake socket. It only provides a file number, peer name and accept method. 

    """ 

    def __init__(self, fileno): 

        self.__fileno = fileno 

    def fileno(self): 

        return self.__fileno 

    def getpeername(self): 

        return "fake_unix_socket" 

    def accept(self): 

        return FakeSocket(self.__fileno + 1) 

 

class FakeSessionReceiver: 

    """ 

    A fake socket session receiver, for our tests. 

    """ 

    def __init__(self, socket): 

        self._socket = socket 

    def socket(self): 

        """ 

        This method is not present in the real receiver, but we use it to 

        inspect the socket passed to the constructor. 

        """ 

        return self._socket 

 

class MyCCSession(isc.config.ConfigData): 

    '''Fake session with minimal interface compliance''' 

    def __init__(self): 

        module_spec = isc.config.module_spec_from_file( 

            ddns.SPECFILE_LOCATION) 

        isc.config.ConfigData.__init__(self, module_spec) 

        self._started = False 

        self._stopped = False 

 

    def start(self): 

        '''Called by DDNSServer initialization, but not used in tests''' 

        self._started = True 

 

    def send_stopping(self): 

        '''Called by shutdown code''' 

        self._stopped = True 

 

    def get_socket(self): 

        """ 

        Used to get the file number for select. 

        """ 

        return FakeSocket(1) 

 

class MyDDNSServer(): 

    '''Fake DDNS server used to test the main() function''' 

    def __init__(self): 

        self.reset() 

 

    def run(self): 

        ''' 

        Fake the run() method of the DDNS server. This will set 

        self._run_called to True. 

        If self._exception is not None, this is raised as an exception 

        ''' 

        self.run_called = True 

        if self._exception is not None: 

            self.exception_raised = True 

            raise self._exception 

 

    def set_exception(self, exception): 

        '''Set an exception to be raised when run() is called''' 

        self._exception = exception 

 

    def reset(self): 

        '''(Re)set to initial values''' 

        self.run_called = False 

        self.exception_raised = False 

        self._exception = None 

 

class TestDDNSServer(unittest.TestCase): 

    def setUp(self): 

        cc_session = MyCCSession() 

        self.assertFalse(cc_session._started) 

        self.ddns_server = ddns.DDNSServer(cc_session) 

        self.__cc_session = cc_session 

        self.assertTrue(cc_session._started) 

        self.__select_expected = None 

        self.__select_answer = None 

        self.__select_exception = None 

        self.__hook_called = False 

        self.ddns_server._listen_socket = FakeSocket(2) 

        ddns.select.select = self.__select 

 

    def tearDown(self): 

        ddns.select.select = select.select 

        ddns.isc.util.cio.socketsession.SocketSessionReceiver = \ 

            isc.util.cio.socketsession.SocketSessionReceiver 

 

    def test_listen(self): 

        ''' 

        Test the old socket file is removed (if any) and a new socket 

        is created when the ddns server is created. 

        ''' 

        # Make sure the socket does not exist now 

        ddns.clear_socket() 

        # Hook the call for clearing the socket 

        orig_clear = ddns.clear_socket 

        ddns.clear_socket = self.__hook 

        # Create the server 

        ddnss = ddns.DDNSServer(MyCCSession()) 

        ddns.clear_socket = orig_clear 

        # The socket is created 

        self.assertTrue(os.path.exists(ddns.SOCKET_FILE)) 

        self.assertTrue(isinstance(ddnss._listen_socket, socket.socket)) 

        # And deletion of the socket was requested 

        self.assertIsNone(self.__hook_called) 

        # Now make sure the clear_socket really works 

        ddns.clear_socket() 

        self.assertFalse(os.path.exists(ddns.SOCKET_FILE)) 

 

    def test_config_handler(self): 

        # Config handler does not do anything yet, but should at least 

        # return 'ok' for now. 

        new_config = {} 

        answer = self.ddns_server.config_handler(new_config) 

        self.assertEqual((0, None), isc.config.parse_answer(answer)) 

 

    def test_shutdown_command(self): 

        '''Test whether the shutdown command works''' 

        self.assertFalse(self.ddns_server._shutdown) 

        answer = self.ddns_server.command_handler('shutdown', None) 

        self.assertEqual((0, None), isc.config.parse_answer(answer)) 

        self.assertTrue(self.ddns_server._shutdown) 

 

    def test_command_handler(self): 

        '''Test some commands.''' 

        # this command should not exist 

        answer = self.ddns_server.command_handler('bad_command', None) 

        self.assertEqual((1, 'Unknown command: bad_command'), 

                         isc.config.parse_answer(answer)) 

 

    def test_signal_handler(self): 

        '''Test whether signal_handler calls shutdown()''' 

        signal_handler = ddns.create_signal_handler(self.ddns_server) 

        self.assertFalse(self.ddns_server._shutdown) 

        signal_handler(None, None) 

        self.assertTrue(self.ddns_server._shutdown) 

 

    def __select(self, reads, writes, exceptions, timeout=None): 

        """ 

        A fake select. It checks it was called with the correct parameters and 

        returns a preset answer. 

 

        If there's an exception stored in __select_exception, it is raised 

        instead and the exception is cleared. 

        """ 

        self.assertEqual(self.__select_expected, (reads, writes, exceptions, 

                                                  timeout)) 

        if self.__select_exception is not None: 

            (self.__select_exception, exception) = (None, 

                                                    self.__select_exception) 

            raise exception 

        answer = self.__select_answer 

        self.__select_answer = None 

        self.ddns_server._shutdown = True 

        return answer 

 

    def __hook(self, param=None): 

        """ 

        A hook that can be installed to any nullary or unary function and see 

        if it was really called. 

        """ 

        self.__hook_called = param 

 

    def test_accept_called(self): 

        """ 

        Test we call the accept function when a new connection comes. 

        """ 

        self.ddns_server.accept = self.__hook 

        self.__select_expected = ([1, 2], [], [], None) 

        self.__select_answer = ([2], [], []) 

        self.__hook_called = "Not called" 

        self.ddns_server.run() 

        self.assertTrue(self.ddns_server._shutdown) 

        # The answer got used 

        self.assertIsNone(self.__select_answer) 

        # Reset, when called without parameter 

        self.assertIsNone(self.__hook_called) 

 

    def test_check_command_called(self): 

        """ 

        Test the check_command is called when there's something on the 

        socket. 

        """ 

        self.__cc_session.check_command = self.__hook 

        self.__select_expected = ([1, 2], [], [], None) 

        self.__select_answer = ([1], [], []) 

        self.ddns_server.run() 

        self.assertTrue(self.ddns_server._shutdown) 

        # The answer got used 

        self.assertIsNone(self.__select_answer) 

        # And the check_command was called with true parameter (eg. 

        # non-blocking) 

        self.assertTrue(self.__hook_called) 

 

    def test_accept(self): 

        """ 

        Test that we can accept a new connection. 

        """ 

        # There's nothing before the accept 

        ddns.isc.util.cio.socketsession.SocketSessionReceiver = \ 

            FakeSessionReceiver 

        self.assertEqual({}, self.ddns_server._socksession_receivers) 

        self.ddns_server.accept() 

        # Now the new socket session receiver is stored in the dict 

        # The 3 comes from _listen_socket.accept() - _listen_socket has 

        # fileno 2 and accept returns socket with fileno increased by one. 

        self.assertEqual([3], 

                         list(self.ddns_server._socksession_receivers.keys())) 

        (socket, receiver) = self.ddns_server._socksession_receivers[3] 

        self.assertTrue(isinstance(socket, FakeSocket)) 

        self.assertEqual(3, socket.fileno()) 

        self.assertTrue(isinstance(receiver, FakeSessionReceiver)) 

        self.assertEqual(socket, receiver.socket()) 

 

    def test_accept_fail(self): 

        """ 

        Test we don't crash if an accept fails and that we don't modify the 

        internals. 

        """ 

        # Make the accept fail 

        def accept_failure(): 

            raise socket.error(errno.ECONNABORTED) 

        orig = self.ddns_server._listen_socket.accept 

        self.ddns_server._listen_socket.accept = accept_failure 

        self.assertEqual({}, self.ddns_server._socksession_receivers) 

        # Doesn't raise the exception 

        self.ddns_server.accept() 

        # And nothing is stored 

        self.assertEqual({}, self.ddns_server._socksession_receivers) 

        # Now make the socket receiver fail 

        self.ddns_server._listen_socket.accept = orig 

        def receiver_failure(sock): 

            raise isc.util.cio.socketsession.SocketSessionError('Test error') 

        ddns.isc.util.cio.socketsession.SocketSessionReceiver = \ 

            receiver_failure 

        # Doesn't raise the exception 

        self.ddns_server.accept() 

        # And nothing is stored 

        self.assertEqual({}, self.ddns_server._socksession_receivers) 

        # Check we don't catch everything, so raise just an exception 

        def unexpected_failure(sock): 

            raise Exception('Test error') 

        ddns.isc.util.cio.socketsession.SocketSessionReceiver = \ 

            unexpected_failure 

        # This one gets through 

        self.assertRaises(Exception, self.ddns_server.accept) 

        # Nothing is stored as well 

        self.assertEqual({}, self.ddns_server._socksession_receivers) 

 

    def test_session_called(self): 

        """ 

        Test the run calls handle_session when there's something on the 

        socket. 

        """ 

        socket = FakeSocket(3) 

        self.ddns_server._socksession_receivers = \ 

            {3: (socket, FakeSessionReceiver(socket))} 

        self.ddns_server.handle_session = self.__hook 

        self.__select_expected = ([1, 2, 3], [], [], None) 

        self.__select_answer = ([3], [], []) 

        self.ddns_server.run() 

        self.assertTrue(self.ddns_server._shutdown) 

        self.assertTrue(self.__cc_session._stopped) 

        self.assertIsNone(self.__select_answer) 

        self.assertEqual(3, self.__hook_called) 

 

    def test_handle_session_ok(self): 

        """ 

        Test the handle_session pops the receiver and calls handle_request 

        when everything is OK. 

        """ 

        socket = FakeSocket(3) 

        receiver = FakeSessionReceiver(socket) 

        # It doesn't really matter what data we use here, it is only passed 

        # through the code 

        param = (FakeSocket(4), ('127.0.0.1', 1234), ('127.0.0.1', 1235), 

                 'Some data') 

        def pop(): 

            return param 

        # Prepare data into the receiver 

        receiver.pop = pop 

        self.ddns_server._socksession_receivers = {3: (socket, receiver)} 

        self.ddns_server.handle_request = self.__hook 

        # Call it 

        self.ddns_server.handle_session(3) 

        # The popped data are passed into the handle_request 

        self.assertEqual(param, self.__hook_called) 

        # The receivers are kept the same 

        self.assertEqual({3: (socket, receiver)}, 

                         self.ddns_server._socksession_receivers) 

 

    def test_handle_session_fail(self): 

        """ 

        Test the handle_session removes (and closes) the socket and receiver 

        when the receiver complains. 

        """ 

        socket = FakeSocket(3) 

        receiver = FakeSessionReceiver(socket) 

        def pop(): 

            raise isc.util.cio.socketsession.SocketSessionError('Test error') 

        receiver.pop = pop 

        socket.close = self.__hook 

        self.__hook_called = False 

        self.ddns_server._socksession_receivers = {3: (socket, receiver)} 

        self.ddns_server.handle_session(3) 

        # The "dead" receiver is removed 

        self.assertEqual({}, self.ddns_server._socksession_receivers) 

        # Close is called with no parameter, so the default None 

        self.assertIsNone(self.__hook_called) 

 

    def test_select_exception_ignored(self): 

        """ 

        Test that the EINTR is ignored in select. 

        """ 

        # Prepare the EINTR exception 

        self.__select_exception = select.error(errno.EINTR) 

        # We reuse the test here, as it should act the same. The exception 

        # should just get ignored. 

        self.test_check_command_called() 

 

    def test_select_exception_fatal(self): 

        """ 

        Test that other exceptions are fatal to the run. 

        """ 

        # Prepare a different exception 

        self.__select_exception = select.error(errno.EBADF) 

        self.__select_expected = ([1, 2], [], [], None) 

        self.assertRaises(select.error, self.ddns_server.run) 

 

class TestMain(unittest.TestCase): 

    def setUp(self): 

        self._server = MyDDNSServer() 

        self.__orig_clear = ddns.clear_socket 

        ddns.clear_socket = self.__clear_socket 

        self.__clear_called = False 

 

    def tearDown(self): 

        ddns.clear_socket = self.__orig_clear 

 

    def test_main(self): 

        self.assertFalse(self._server.run_called) 

        ddns.main(self._server) 

        self.assertTrue(self._server.run_called) 

        self.assertTrue(self.__clear_called) 

 

    def __clear_socket(self): 

        self.__clear_called = True 

 

    def check_exception(self, ex): 

        '''Common test sequence to see if the given exception is caused. 

        ''' 

        # Should technically not be necessary, but reset server to be sure 

        self._server.reset() 

        self.assertFalse(self._server.exception_raised) 

        self._server.set_exception(ex) 

        ddns.main(self._server) 

        self.assertTrue(self._server.exception_raised) 

 

    def test_exceptions(self): 

        ''' 

        Test whether exceptions are caught in main() 

        These exceptions should not bubble up. 

        ''' 

        self._server.set_exception(KeyboardInterrupt()) 

        self.assertFalse(self._server.exception_raised) 

        ddns.main(self._server) 

        self.assertTrue(self._server.exception_raised) 

 

        self.check_exception(isc.cc.SessionError("error")) 

        self.check_exception(isc.config.ModuleCCSessionError("error")) 

        self.check_exception(ddns.DDNSConfigError("error")) 

        self.check_exception(isc.cc.SessionTimeout("error")) 

        self.check_exception(Exception("error")) 

 

        # Add one that is not a subclass of Exception, and hence not 

        # caught. Misuse BaseException for that. 

        self._server.reset() 

        self.assertFalse(self._server.exception_raised) 

        self._server.set_exception(BaseException("error")) 

        self.assertRaises(BaseException, ddns.main, self._server) 

        self.assertTrue(self._server.exception_raised) 

 

exitif __name__== "__main__": 

    isc.log.resetUnitTestRootLogger() 

    unittest.main()