| 
 # Copyright (C) 2011  Internet Systems Consortium.  
#  
# Permission to use, copy, modify, and distribute this software for any  
# purpose with or without fee is hereby granted, provided that the above  
# copyright notice and this permission notice appear in all copies.  
#  
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM  
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL  
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL  
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,  
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING  
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,  
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION  
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.  
  
import os, signal, socket, unittest  
from socket import AF_INET, AF_INET6, SOCK_STREAM, SOCK_DGRAM, IPPROTO_UDP, \  
    IPPROTO_TCP  
from isc.util.cio.socketsession import *  
  
TESTDATA_OBJDIR = os.getenv("TESTDATAOBJDIR")  
TEST_UNIX_FILE = TESTDATA_OBJDIR + '/ssessiontest.unix'  
TEST_DATA = b'BIND10 test'  
TEST_PORT = 53535  
  
class TestForwarder(unittest.TestCase):  
    '''In general, this is a straightforward port of the C++ counterpart.  
  
    In some cases test cases are simplified or have Python specific cases.  
  
    '''  
  
    def setUp(self):  
        self.forwarder = SocketSessionForwarder(TEST_UNIX_FILE)  
36        if os.path.exists(TEST_UNIX_FILE):  
            os.unlink(TEST_UNIX_FILE)  
        self.large_text = b'a' * 65535  
  
    def tearDown(self):  
        if os.path.exists(TEST_UNIX_FILE):  
            os.unlink(TEST_UNIX_FILE)  
  
    def start_listen(self):  
        self.listen_sock = socket.socket(socket.AF_UNIX, SOCK_STREAM, 0)  
        self.listen_sock.bind(TEST_UNIX_FILE)  
        self.listen_sock.listen(10)  
  
    def accept_forwarder(self):  
        self.listen_sock.setblocking(False)  
        s, _ = self.listen_sock.accept()  
        s.setblocking(True)  
        return s  
  
    def test_init(self):  
        # check bad arguments.  valid cases will covered in other tests.  
        self.assertRaises(TypeError, SocketSessionForwarder, 1)  
        self.assertRaises(TypeError, SocketSessionForwarder,  
                          'test.unix', 'test.unix')  
  
    def test_badpush(self):  
        # bad numbers of parameters  
        self.assertRaises(TypeError, self.forwarder.push, 1)  
        self.assertRaises(TypeError, self.forwarder.push, 0, AF_INET,  
                          SOCK_DGRAM, IPPROTO_UDP, ('127.0.0.1', 53),  
                          ('192.0.2.1', 5300), TEST_DATA, 0)  
        # contain a bad type of parameter  
        self.assertRaises(TypeError, self.forwarder.push, 0, 'AF_INET',  
                          SOCK_DGRAM, IPPROTO_UDP, ('127.0.0.1', 53),  
                          ('192.0.2.1', 5300), TEST_DATA)  
        # bad local address  
        self.assertRaises(TypeError, self.forwarder.push, 0, AF_INET,  
                          SOCK_DGRAM, IPPROTO_UDP, ('127.0.0..1', 53),  
                            ('192.0.2.1', 5300), TEST_DATA)  
        self.assertRaises(TypeError, self.forwarder.push, 0, AF_INET,  
                          SOCK_DGRAM, IPPROTO_UDP, '127.0.0.1',  
                            ('192.0.2.1', 5300), TEST_DATA)  
        # bad remote address  
        self.assertRaises(TypeError, self.forwarder.push, 0, AF_INET6,  
                          SOCK_DGRAM, IPPROTO_UDP, ('2001:db8::1', 53),  
                            ('2001:db8:::3', 5300), TEST_DATA)  
  
        # push before connect  
        self.assertRaises(TypeError, self.forwarder.push, 0, AF_INET,  
                          SOCK_DGRAM, IPPROTO_UDP, ('192.0.2.1', 53),  
                          ('192.0.2.2', 53), TEST_DATA)  
  
        # Now connect the forwarder for the rest of tests  
        self.start_listen()  
        self.forwarder.connect_to_receiver()  
  
        # Inconsistent address family  
        self.assertRaises(TypeError, self.forwarder.push, 1, AF_INET,  
                          SOCK_DGRAM, IPPROTO_UDP, ('2001:db8::1', 53, 0, 1),  
                          ('192.0.2.2', 53), TEST_DATA)  
        self.assertRaises(TypeError, self.forwarder.push, 1, AF_INET6,  
                          SOCK_DGRAM, IPPROTO_UDP, ('2001:db8::1', 53, 0, 1),  
                          ('192.0.2.2', 53), TEST_DATA)  
  
        # Empty data: we reject them at least for now  
        self.assertRaises(TypeError, self.forwarder.push, 1, AF_INET,  
                          SOCK_DGRAM, IPPROTO_UDP, ('192.0.2.1', 53),  
                          ('192.0.2.2', 53), b'')  
  
        # Too big data: we reject them at least for now  
        self.assertRaises(TypeError, self.forwarder.push, 1, AF_INET,  
                          SOCK_DGRAM, IPPROTO_UDP, ('192.0.2.1', 53),  
                          ('192.0.2.2', 53), b'd' * 65536)  
  
        # Close the receptor before push.  It will result in SIGPIPE (should be  
        # ignored) and EPIPE, which will be converted to SocketSessionError.  
        self.listen_sock.close()  
        self.assertRaises(SocketSessionError, self.forwarder.push, 1, AF_INET,  
                          SOCK_DGRAM, IPPROTO_UDP, ('192.0.2.1', 53),  
                          ('192.0.2.2', 53), TEST_DATA)  
  
    def create_socket(self, family, type, protocol, addr, do_listen):  
        s = socket.socket(family, type, protocol)  
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  
        s.bind(addr)  
        if do_listen and protocol == IPPROTO_TCP:  
            s.listen(1)  
        return s  
  
    def check_push_and_pop(self, family, type, protocol, local, remote,  
                           data, new_connection):  
        sock = self.create_socket(family, type, protocol, local, True)  
        fwd_fd = sock.fileno()  
        if protocol == IPPROTO_TCP:  
            client_addr = ('::1', 0, 0, 0) if family == AF_INET6 \  
                else ('127.0.0.1', 0)  
            client_sock = self.create_socket(family, type, protocol,  
                                             client_addr, False)  
            client_sock.setblocking(False)  
            try:  
                client_sock.connect(local)  
            except socket.error:  
                pass  
            server_sock, _ = sock.accept()  
            fwd_fd = server_sock.fileno()  
  
        # If a new connection is required, start the "server", have the  
        # internal forwarder connect to it, and then internally accept it.  
        if new_connection:  
            self.start_listen()  
            self.forwarder.connect_to_receiver()  
            self.accept_sock = self.accept_forwarder()  
  
        # Then push one socket session via the forwarder.  
        self.forwarder.push(fwd_fd, family, type, protocol, local, remote,  
                            data)  
  
        # Pop the socket session we just pushed from a local receiver, and  
        # check the content.  
        receiver = SocketSessionReceiver(self.accept_sock)  
        signal.alarm(1)  
        sock_session = receiver.pop()  
        signal.alarm(0)  
        passed_sock = sock_session[0]  
        self.assertNotEqual(fwd_fd, passed_sock.fileno())  
        self.assertEqual(family, passed_sock.family)  
        self.assertEqual(type, passed_sock.type)  
        self.assertEqual(protocol, passed_sock.proto)  
        self.assertEqual(local, sock_session[1])  
        self.assertEqual(remote, sock_session[2])  
        self.assertEqual(data, sock_session[3])  
  
        # Check if the passed FD is usable by sending some data from it.  
        passed_sock.setblocking(True)  
        if protocol == IPPROTO_UDP:  
            self.assertEqual(len(TEST_DATA), passed_sock.sendto(TEST_DATA,  
                                                                local))  
            sock.settimeout(10)  
            self.assertEqual(TEST_DATA, sock.recvfrom(len(TEST_DATA))[0])  
        else:  
            server_sock.close()  
            self.assertEqual(len(TEST_DATA), passed_sock.send(TEST_DATA))  
            client_sock.setblocking(True)  
            client_sock.settimeout(10)  
            self.assertEqual(TEST_DATA, client_sock.recv(len(TEST_DATA)))  
  
    def test_push_and_pop(self):  
        # This is a straightforward port of C++ pushAndPop test.  
        local6 = ('::1', TEST_PORT, 0, 0)  
        remote6 = ('2001:db8::1', 5300, 0, 0)  
        self.check_push_and_pop(AF_INET6, SOCK_DGRAM, IPPROTO_UDP,  
                                local6, remote6, TEST_DATA, True)  
        self.check_push_and_pop(AF_INET6, SOCK_STREAM, IPPROTO_TCP,  
                                local6, remote6, TEST_DATA, False)  
  
        local4 = ('127.0.0.1', TEST_PORT)  
        remote4 = ('192.0.2.2', 5300)  
        self.check_push_and_pop(AF_INET, SOCK_DGRAM, IPPROTO_UDP,  
                                local4, remote4, TEST_DATA, False)  
        self.check_push_and_pop(AF_INET, SOCK_STREAM, IPPROTO_TCP,  
                                local4, remote4, TEST_DATA, False)  
  
        self.check_push_and_pop(AF_INET6, SOCK_DGRAM, IPPROTO_UDP,  
                                local6, remote6, self.large_text, False)  
        self.check_push_and_pop(AF_INET6, SOCK_STREAM, IPPROTO_TCP,  
                                local6, remote6, self.large_text, False)  
        self.check_push_and_pop(AF_INET, SOCK_DGRAM, IPPROTO_UDP,  
                                local4, remote4, self.large_text, False)  
        self.check_push_and_pop(AF_INET, SOCK_STREAM, IPPROTO_TCP,  
                                local4, remote4, self.large_text, False)  
  
        # Python specific: check for an IPv6 scoped address with non 0  
        # scope (zone) ID  
        scope6 = ('fe80::1', TEST_PORT, 0, 1)  
        self.check_push_and_pop(AF_INET6, SOCK_DGRAM, IPPROTO_UDP,  
                                local6, scope6, TEST_DATA, False)  
  
    def test_push_too_fast(self):  
        # A straightforward port of C++ pushTooFast test.  
        def multi_push(forwarder, addr, data):  
exit            for i in range(0, 10):  
                forwarder.push(1, AF_INET, SOCK_DGRAM, IPPROTO_UDP, addr,  
                               addr, data)  
        self.start_listen()  
        self.forwarder.connect_to_receiver()  
        self.assertRaises(SocketSessionError, multi_push, self.forwarder,  
                          ('192.0.2.1', 53), self.large_text)  
  
    def test_bad_pop(self):  
        # This is a subset of C++ badPop test.  We only check pop() raises  
        # SocketSessionError when it internally fails to get the FD.  
        # Other cases would require passing a valid FD from the test,  
        # which would make the test too complicated.  As a wrapper checking  
        # one common failure case should be reasonably sufficient.  
  
        self.start_listen()  
        s = socket.socket(socket.AF_UNIX, SOCK_STREAM, 0)  
        s.setblocking(False)  
        s.connect(TEST_UNIX_FILE)  
        accept_sock = self.accept_forwarder()  
        receiver = SocketSessionReceiver(accept_sock)  
        s.close()  
        self.assertRaises(SocketSessionError, receiver.pop)  
  
class TestReceiver(unittest.TestCase):  
    # We only check a couple of failure cases on construction.  Valid cases  
    # are covered in TestForwarder.  
  
    def test_bad_init(self):  
        class FakeSocket:  
            # pretending to be th standard socket class, but its fileno() is  
            # bogus.  
            def fileno(self):  
                return None  
        self.assertRaises(TypeError, SocketSessionReceiver, 1)  
        self.assertRaises(TypeError, SocketSessionReceiver, FakeSocket())  
  
exitif __name__ == '__main__':  
    unittest.main()  
                
             |