Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

# Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC") 

# 

# Permission to use, copy, modify, and distribute this software for any 

# purpose with or without fee is hereby granted, provided that the above 

# copyright notice and this permission notice appear in all copies. 

# 

# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM 

# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL 

# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL 

# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, 

# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING 

# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 

# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION 

# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 

 

""" 

Tests for the bind10.sockcreator module. 

""" 

 

import unittest 

import struct 

import socket 

from isc.net.addr import IPAddr 

import isc.log 

from libutil_io_python import send_fd 

from isc.bind10.sockcreator import Parser, CreatorError, WrappedSocket 

 

class FakeCreator: 

    """ 

    Class emulating the socket to the socket creator. It can be given expected 

    data to receive (and check) and responses to give to the Parser class 

    during testing. 

    """ 

 

    class InvalidPlan(Exception): 

        """ 

        Raised when someone wants to recv when sending is planned or vice 

        versa. 

        """ 

        pass 

 

    class InvalidData(Exception): 

        """ 

        Raises when the data passed to sendall are not the same as expected. 

        """ 

        pass 

 

    def __init__(self, plan): 

        """ 

        Create the object. The plan variable contains list of expected actions, 

        in form: 

 

        [('r', 'Data to return from recv'), ('s', 'Data expected on sendall'), 

             , ('d', 'File descriptor number to return from read_sock'), ('e', 

             None), ...] 

 

        It modifies the array as it goes. 

        """ 

        self.__plan = plan 

 

    def __get_plan(self, expected): 

63        if len(self.__plan) == 0: 

            raise InvalidPlan('Nothing more planned') 

        (kind, data) = self.__plan[0] 

        if kind == 'e': 

            self.__plan.pop(0) 

            raise socket.error('False socket error') 

69        if kind != expected: 

            raise InvalidPlan('Planned ' + kind + ', but ' + expected + 

                'requested') 

        return data 

 

    def recv(self, maxsize): 

        """ 

        Emulate recv. Returs maxsize bytes from the current recv plan. If 

        there are data left from previous recv call, it is used first. 

 

        If no recv is planned, raises InvalidPlan. 

        """ 

        data = self.__get_plan('r') 

        result, rest = data[:maxsize], data[maxsize:] 

        if len(rest) > 0: 

            self.__plan[0] = ('r', rest) 

        else: 

            self.__plan.pop(0) 

        return result 

 

    def read_fd(self): 

        """ 

        Emulate the reading of file descriptor. Returns one from a plan. 

 

        It raises InvalidPlan if no socket is planned now. 

        """ 

        fd = self.__get_plan('f') 

        self.__plan.pop(0) 

        return fd 

 

    def sendall(self, data): 

        """ 

        Checks that the data passed are correct according to plan. It raises 

        InvalidData if the data differs or InvalidPlan when sendall is not 

        expected. 

        """ 

        planned = self.__get_plan('s') 

        dlen = len(data) 

        prefix, rest = planned[:dlen], planned[dlen:] 

108        if prefix != data: 

            raise InvalidData('Expected "' + str(prefix)+ '", got "' + 

                str(data) + '"') 

111        if len(rest) > 0: 

            self.__plan[0] = ('s', rest) 

        else: 

            self.__plan.pop(0) 

 

    def all_used(self): 

        """ 

        Returns if the whole plan was consumed. 

        """ 

        return len(self.__plan) == 0 

 

class ParserTests(unittest.TestCase): 

    """ 

    Testcases for the Parser class. 

 

    A lot of these test could be done by 

    `with self.assertRaises(CreatorError) as cm`. But some versions of python 

    take the scope wrong and don't work, so we use the primitive way of 

    try-except. 

    """ 

    def __terminate(self): 

        creator = FakeCreator([('s', b'T'), ('r', b'')]) 

        parser = Parser(creator) 

        self.assertEqual(None, parser.terminate()) 

        self.assertTrue(creator.all_used()) 

        return parser 

 

    def test_terminate(self): 

        """ 

        Test if the command to terminate is correct and it waits for reading the 

        EOF. 

        """ 

        self.__terminate() 

 

    def __terminate_raises(self, parser): 

        """ 

        Check that terminate() raises a fatal exception. 

        """ 

        try: 

            parser.terminate() 

            self.fail("Not raised") 

        except CreatorError as ce: 

            self.assertTrue(ce.fatal) 

            self.assertEqual(None, ce.errno) 

 

    def test_terminate_error1(self): 

        """ 

        Test it reports an exception when there's error terminating the creator. 

        This one raises an error when receiving the EOF. 

        """ 

        creator = FakeCreator([('s', b'T'), ('e', None)]) 

        parser = Parser(creator) 

        self.__terminate_raises(parser) 

 

    def test_terminate_error2(self): 

        """ 

        Test it reports an exception when there's error terminating the creator. 

        This one raises an error when sending data. 

        """ 

        creator = FakeCreator([('e', None)]) 

        parser = Parser(creator) 

        self.__terminate_raises(parser) 

 

    def test_terminate_error3(self): 

        """ 

        Test it reports an exception when there's error terminating the creator. 

        This one sends data when it should have terminated. 

        """ 

        creator = FakeCreator([('s', b'T'), ('r', b'Extra data')]) 

        parser = Parser(creator) 

        self.__terminate_raises(parser) 

 

    def test_terminate_twice(self): 

        """ 

        Test we can't terminate twice. 

        """ 

        parser = self.__terminate() 

        self.__terminate_raises(parser) 

 

    def test_crash(self): 

        """ 

        Tests that the parser correctly raises exception when it crashes 

        unexpectedly. 

        """ 

        creator = FakeCreator([('s', b'SU4\0\0\0\0\0\0'), ('r', b'')]) 

        parser = Parser(creator) 

        try: 

            parser.get_socket(IPAddr('0.0.0.0'), 0, 'UDP') 

            self.fail("Not raised") 

        except CreatorError as ce: 

            self.assertTrue(creator.all_used()) 

            # Is the exception correct? 

            self.assertTrue(ce.fatal) 

            self.assertEqual(None, ce.errno) 

 

    def test_error(self): 

        """ 

        Tests that the parser correctly raises non-fatal exception when 

        the socket can not be created. 

        """ 

        # We split the int to see if it can cope with data coming in 

        # different packets 

        intpart = struct.pack('@i', 42) 

        creator = FakeCreator([('s', b'SU4\0\0\0\0\0\0'), ('r', b'ES' + 

            intpart[:1]), ('r', intpart[1:])]) 

        parser = Parser(creator) 

        try: 

            parser.get_socket(IPAddr('0.0.0.0'), 0, 'UDP') 

            self.fail("Not raised") 

        except CreatorError as ce: 

            self.assertTrue(creator.all_used()) 

            # Is the exception correct? 

            self.assertFalse(ce.fatal) 

            self.assertEqual(42, ce.errno) 

 

    def __error(self, plan): 

        creator = FakeCreator(plan) 

        parser = Parser(creator) 

        try: 

            parser.get_socket(IPAddr('0.0.0.0'), 0, socket.SOCK_DGRAM) 

            self.fail("Not raised") 

        except CreatorError as ce: 

            self.assertTrue(creator.all_used()) 

            self.assertTrue(ce.fatal) 

 

    def test_error_send(self): 

        self.__error([('e', None)]) 

 

    def test_error_recv(self): 

        self.__error([('s', b'SU4\0\0\0\0\0\0'), ('e', None)]) 

 

    def test_error_read_fd(self): 

        self.__error([('s', b'SU4\0\0\0\0\0\0'), ('r', b'S'), ('e', None)]) 

 

    def __create(self, addr, socktype, encoded): 

        creator = FakeCreator([('s', b'S' + encoded), ('r', b'S'), ('f', 42)]) 

        parser = Parser(creator) 

        self.assertEqual(42, parser.get_socket(IPAddr(addr), 42, socktype)) 

 

    def test_create1(self): 

        self.__create('192.0.2.0', 'UDP', b'U4\0\x2A\xC0\0\x02\0') 

 

    def test_create2(self): 

        self.__create('2001:db8::', socket.SOCK_STREAM, 

            b'T6\0\x2A\x20\x01\x0d\xb8\0\0\0\0\0\0\0\0\0\0\0\0') 

 

    def test_create_terminated(self): 

        """ 

        Test we can't request sockets after it was terminated. 

        """ 

        parser = self.__terminate() 

        try: 

            parser.get_socket(IPAddr('0.0.0.0'), 0, 'UDP') 

            self.fail("Not raised") 

        except CreatorError as ce: 

            self.assertTrue(ce.fatal) 

            self.assertEqual(None, ce.errno) 

 

    def test_invalid_socktype(self): 

        """ 

        Test invalid socket type is rejected 

        """ 

        self.assertRaises(ValueError, Parser(FakeCreator([])).get_socket, 

                          IPAddr('0.0.0.0'), 42, 'RAW') 

 

    def test_invalid_family(self): 

        """ 

        Test it rejects invalid address family. 

        """ 

        # Note: this produces a bad logger output, since this address 

        # can not be converted to string, so the original message with 

        # placeholders is output. This should not happen in practice, so 

        # it is harmless. 

        addr = IPAddr('0.0.0.0') 

        addr.family = 42 

        self.assertRaises(ValueError, Parser(FakeCreator([])).get_socket, 

                          addr, 42, socket.SOCK_DGRAM) 

 

class WrapTests(unittest.TestCase): 

    """ 

    Tests for the wrap_socket function. 

    """ 

    def test_wrap(self): 

        # We construct two pairs of socket. The receiving side of one pair will 

        # be wrapped. Then we send one of the other pair through this pair and 

        # check the received one can be used as a socket 

 

        # The transport socket 

        (t1, t2) = socket.socketpair() 

        # The payload socket 

        (p1, p2) = socket.socketpair() 

 

        t2 = WrappedSocket(t2) 

 

        # Transfer the descriptor 

        send_fd(t1.fileno(), p1.fileno()) 

        p1 = socket.fromfd(t2.read_fd(), socket.AF_UNIX, socket.SOCK_STREAM) 

 

        # Now, pass some data trough the socket 

        p1.send(b'A') 

        data = p2.recv(1) 

        self.assertEqual(b'A', data) 

 

        # Test the wrapping didn't hurt the socket's usual methods 

        t1.send(b'B') 

        data = t2.recv(1) 

        self.assertEqual(b'B', data) 

        t2.send(b'C') 

        data = t1.recv(1) 

        self.assertEqual(b'C', data) 

 

exitif __name__ == '__main__': 

    isc.log.init("bind10") # FIXME Should this be needed? 

    isc.log.resetUnitTestRootLogger() 

    unittest.main()