Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

from msgq import SubscriptionManager, MsgQ 

 

import unittest 

import os 

import socket 

import signal 

import sys 

import time 

import isc.cc 

 

# 

# Currently only the subscription part and some sending is implemented... 

# I'd have to mock out a socket, which, while not impossible, is not trivial. 

# 

 

class TestSubscriptionManager(unittest.TestCase): 

    def setUp(self): 

        self.sm = SubscriptionManager() 

 

    def test_subscription_add_delete_manager(self): 

        self.sm.subscribe("a", "*", 'sock1') 

        self.assertEqual(self.sm.find_sub("a", "*"), [ 'sock1' ]) 

 

    def test_subscription_add_delete_other(self): 

        self.sm.subscribe("a", "*", 'sock1') 

        self.sm.unsubscribe("a", "*", 'sock2') 

        self.assertEqual(self.sm.find_sub("a", "*"), [ 'sock1' ]) 

 

    def test_subscription_add_several_sockets(self): 

        socks = [ 's1', 's2', 's3', 's4', 's5' ] 

        for s in socks: 

            self.sm.subscribe("a", "*", s) 

        self.assertEqual(self.sm.find_sub("a", "*"), socks) 

 

    def test_unsubscribe(self): 

        socks = [ 's1', 's2', 's3', 's4', 's5' ] 

        for s in socks: 

            self.sm.subscribe("a", "*", s) 

        self.sm.unsubscribe("a", "*", 's3') 

        self.assertEqual(self.sm.find_sub("a", "*"), [ 's1', 's2', 's4', 's5' ]) 

 

    def test_unsubscribe_all(self): 

        self.sm.subscribe('g1', 'i1', 's1') 

        self.sm.subscribe('g1', 'i1', 's2') 

        self.sm.subscribe('g1', 'i2', 's1') 

        self.sm.subscribe('g1', 'i2', 's2') 

        self.sm.subscribe('g2', 'i1', 's1') 

        self.sm.subscribe('g2', 'i1', 's2') 

        self.sm.subscribe('g2', 'i2', 's1') 

        self.sm.subscribe('g2', 'i2', 's2') 

        self.sm.unsubscribe_all('s1') 

        self.assertEqual(self.sm.find_sub("g1", "i1"), [ 's2' ]) 

        self.assertEqual(self.sm.find_sub("g1", "i2"), [ 's2' ]) 

        self.assertEqual(self.sm.find_sub("g2", "i1"), [ 's2' ]) 

        self.assertEqual(self.sm.find_sub("g2", "i2"), [ 's2' ]) 

 

    def test_find(self): 

        self.sm.subscribe('g1', 'i1', 's1') 

        self.sm.subscribe('g1', '*', 's2') 

        self.assertEqual(set(self.sm.find("g1", "i1")), set([ 's1', 's2' ])) 

 

    def test_find_sub(self): 

        self.sm.subscribe('g1', 'i1', 's1') 

        self.sm.subscribe('g1', '*', 's2') 

        self.assertEqual(self.sm.find_sub("g1", "i1"), [ 's1' ]) 

 

    def test_open_socket_parameter(self): 

        self.assertFalse(os.path.exists("./my_socket_file")) 

        msgq = MsgQ("./my_socket_file"); 

        msgq.setup() 

        self.assertTrue(os.path.exists("./my_socket_file")) 

        msgq.shutdown(); 

        self.assertFalse(os.path.exists("./my_socket_file")) 

 

    def test_open_socket_environment_variable(self): 

        self.assertFalse(os.path.exists("my_socket_file")) 

        os.environ["BIND10_MSGQ_SOCKET_FILE"] = "./my_socket_file" 

        msgq = MsgQ(); 

        msgq.setup() 

        self.assertTrue(os.path.exists("./my_socket_file")) 

        msgq.shutdown(); 

        self.assertFalse(os.path.exists("./my_socket_file")) 

 

    def test_open_socket_default(self): 

        env_var = None 

        orig_socket_file = None 

88        if "BIND10_MSGQ_SOCKET_FILE" in os.environ: 

            env_var = os.environ["BIND10_MSGQ_SOCKET_FILE"] 

            del os.environ["BIND10_MSGQ_SOCKET_FILE"] 

        # temporarily replace the class "default" not to be disrupted by 

        # any running BIND 10 instance. 

94        if "BIND10_TEST_SOCKET_FILE" in os.environ: 

            MsgQ.SOCKET_FILE = os.environ["BIND10_TEST_SOCKET_FILE"] 

        socket_file = MsgQ.SOCKET_FILE 

        self.assertFalse(os.path.exists(socket_file)) 

        msgq = MsgQ(); 

        try: 

            msgq.setup() 

            self.assertTrue(os.path.exists(socket_file)) 

            msgq.shutdown(); 

            self.assertFalse(os.path.exists(socket_file)) 

        except socket.error: 

            # ok, the install path doesn't exist at all, 

            # so we can't check any further 

            pass 

107        if env_var is not None: 

            os.environ["BIND10_MSGQ_SOCKET_FILE"] = env_var 

109        if orig_socket_file is not None: 

            MsgQ.SOCKET_FILE = orig_socket_file 

 

    def test_open_socket_bad(self): 

        msgq = MsgQ("/does/not/exist") 

        self.assertRaises(socket.error, msgq.setup) 

 

class SendNonblock(unittest.TestCase): 

    """ 

    Tests that the whole thing will not get blocked if someone does not read. 

    """ 

 

    def terminate_check(self, task, timeout=30): 

        """ 

        Runs task in separate process (task is a function) and checks 

        it terminates sooner than timeout. 

        """ 

        task_pid = os.fork() 

128        if task_pid == 0: 

            # Kill the forked process after timeout by SIGALRM 

            signal.alarm(timeout) 

            # Run the task 

            # If an exception happens or we run out of time, we terminate 

            # with non-zero 

            task() 

            # If we got here, then everything worked well and in time 

            # In that case, we terminate successfully 

            os._exit(0) # needs exit code 

        else: 

            (pid, status) = os.waitpid(task_pid, 0) 

            self.assertEqual(0, status, 

                "The task did not complete successfully in time") 

 

    def infinite_sender(self, sender): 

        """ 

        Sends data until an exception happens. socket.error is caught, 

        as it means the socket got closed. Sender is called to actually 

        send the data. 

        """ 

        msgq = MsgQ() 

        # We do only partial setup, so we don't create the listening socket 

        msgq.setup_poller() 

        (read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) 

        msgq.register_socket(write) 

        # Keep sending while it is not closed by the msgq 

        try: 

            while True: 

                sender(msgq, write) 

        except socket.error: 

            pass 

 

    def test_infinite_sendmsg(self): 

        """ 

        Tries sending messages (and not reading them) until it either times 

        out (in blocking call, wrong) or closes it (correct). 

        """ 

        data = "data" 

        for i in range(1, 10): 

            data += data 

exit   exit        self.terminate_check(lambda: self.infinite_sender( 

            lambda msgq, socket: msgq.sendmsg(socket, {}, {"message" : data}))) 

 

    def test_infinite_sendprepared(self): 

        """ 

        Tries sending data (and not reading them) until it either times 

        out (in blocking call, wrong) or closes it (correct). 

        """ 

        data = b"data" 

        for i in range(1, 10): 

            data += data 

exit   exit        self.terminate_check(lambda: self.infinite_sender( 

            lambda msgq, socket: msgq.send_prepared_msg(socket, data))) 

 

    def send_many(self, data): 

        """ 

        Tries that sending a command many times and getting an answer works. 

        """ 

        msgq = MsgQ() 

        # msgq.run needs to compare with the listen_socket, so we provide 

        # a replacement 

        class DummySocket: 

            def fileno(): 

                return -1 

        msgq.listen_socket = DummySocket 

        (queue, out) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) 

        def run(): 

            length = len(data) 

            queue_pid = os.fork() 

            if queue_pid == 0: 

                signal.alarm(120) 

                msgq.setup_poller() 

                msgq.register_socket(queue) 

                msgq.run() 

            else: 

                try: 

                    def killall(signum, frame): 

                        os.kill(queue_pid, signal.SIGTERM) 

                        os._exit(1) 

                    signal.signal(signal.SIGALRM, killall) 

                    msg = msgq.preparemsg({"type" : "ping"}, data) 

                    now = time.clock() 

                    while time.clock() - now < 0.2: 

                        out.sendall(msg) 

                        # Check the answer 

                        (routing, received) = msgq.read_packet(out.fileno(), 

                            out) 

                        self.assertEqual({"type" : "pong"}, 

                            isc.cc.message.from_wire(routing)) 

                        self.assertEqual(data, received) 

                finally: 

                    os.kill(queue_pid, signal.SIGTERM) 

        self.terminate_check(run) 

 

    def test_small_sends(self): 

        """ 

        Tests sending small data many times. 

        """ 

        self.send_many(b"data") 

 

    def test_large_sends(self): 

        """ 

        Tests sending large data many times. 

        """ 

        data = b"data" 

        for i in range(1, 20): 

            data = data + data 

        self.send_many(data) 

 

exitif __name__ == '__main__': 

    unittest.main()