|
# Copyright (C) 2011 Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import os, signal, socket, unittest
from socket import AF_INET, AF_INET6, SOCK_STREAM, SOCK_DGRAM, IPPROTO_UDP, \
IPPROTO_TCP
from isc.util.cio.socketsession import *
TESTDATA_OBJDIR = os.getenv("TESTDATAOBJDIR")
TEST_UNIX_FILE = TESTDATA_OBJDIR + '/ssessiontest.unix'
TEST_DATA = b'BIND10 test'
TEST_PORT = 53535
class TestForwarder(unittest.TestCase):
'''In general, this is a straightforward port of the C++ counterpart.
In some cases test cases are simplified or have Python specific cases.
'''
def setUp(self):
self.forwarder = SocketSessionForwarder(TEST_UNIX_FILE)
36 if os.path.exists(TEST_UNIX_FILE):
os.unlink(TEST_UNIX_FILE)
self.large_text = b'a' * 65535
def tearDown(self):
if os.path.exists(TEST_UNIX_FILE):
os.unlink(TEST_UNIX_FILE)
def start_listen(self):
self.listen_sock = socket.socket(socket.AF_UNIX, SOCK_STREAM, 0)
self.listen_sock.bind(TEST_UNIX_FILE)
self.listen_sock.listen(10)
def accept_forwarder(self):
self.listen_sock.setblocking(False)
s, _ = self.listen_sock.accept()
s.setblocking(True)
return s
def test_init(self):
# check bad arguments. valid cases will covered in other tests.
self.assertRaises(TypeError, SocketSessionForwarder, 1)
self.assertRaises(TypeError, SocketSessionForwarder,
'test.unix', 'test.unix')
def test_badpush(self):
# bad numbers of parameters
self.assertRaises(TypeError, self.forwarder.push, 1)
self.assertRaises(TypeError, self.forwarder.push, 0, AF_INET,
SOCK_DGRAM, IPPROTO_UDP, ('127.0.0.1', 53),
('192.0.2.1', 5300), TEST_DATA, 0)
# contain a bad type of parameter
self.assertRaises(TypeError, self.forwarder.push, 0, 'AF_INET',
SOCK_DGRAM, IPPROTO_UDP, ('127.0.0.1', 53),
('192.0.2.1', 5300), TEST_DATA)
# bad local address
self.assertRaises(TypeError, self.forwarder.push, 0, AF_INET,
SOCK_DGRAM, IPPROTO_UDP, ('127.0.0..1', 53),
('192.0.2.1', 5300), TEST_DATA)
self.assertRaises(TypeError, self.forwarder.push, 0, AF_INET,
SOCK_DGRAM, IPPROTO_UDP, '127.0.0.1',
('192.0.2.1', 5300), TEST_DATA)
# bad remote address
self.assertRaises(TypeError, self.forwarder.push, 0, AF_INET6,
SOCK_DGRAM, IPPROTO_UDP, ('2001:db8::1', 53),
('2001:db8:::3', 5300), TEST_DATA)
# push before connect
self.assertRaises(TypeError, self.forwarder.push, 0, AF_INET,
SOCK_DGRAM, IPPROTO_UDP, ('192.0.2.1', 53),
('192.0.2.2', 53), TEST_DATA)
# Now connect the forwarder for the rest of tests
self.start_listen()
self.forwarder.connect_to_receiver()
# Inconsistent address family
self.assertRaises(TypeError, self.forwarder.push, 1, AF_INET,
SOCK_DGRAM, IPPROTO_UDP, ('2001:db8::1', 53, 0, 1),
('192.0.2.2', 53), TEST_DATA)
self.assertRaises(TypeError, self.forwarder.push, 1, AF_INET6,
SOCK_DGRAM, IPPROTO_UDP, ('2001:db8::1', 53, 0, 1),
('192.0.2.2', 53), TEST_DATA)
# Empty data: we reject them at least for now
self.assertRaises(TypeError, self.forwarder.push, 1, AF_INET,
SOCK_DGRAM, IPPROTO_UDP, ('192.0.2.1', 53),
('192.0.2.2', 53), b'')
# Too big data: we reject them at least for now
self.assertRaises(TypeError, self.forwarder.push, 1, AF_INET,
SOCK_DGRAM, IPPROTO_UDP, ('192.0.2.1', 53),
('192.0.2.2', 53), b'd' * 65536)
# Close the receptor before push. It will result in SIGPIPE (should be
# ignored) and EPIPE, which will be converted to SocketSessionError.
self.listen_sock.close()
self.assertRaises(SocketSessionError, self.forwarder.push, 1, AF_INET,
SOCK_DGRAM, IPPROTO_UDP, ('192.0.2.1', 53),
('192.0.2.2', 53), TEST_DATA)
def create_socket(self, family, type, protocol, addr, do_listen):
s = socket.socket(family, type, protocol)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(addr)
if do_listen and protocol == IPPROTO_TCP:
s.listen(1)
return s
def check_push_and_pop(self, family, type, protocol, local, remote,
data, new_connection):
sock = self.create_socket(family, type, protocol, local, True)
fwd_fd = sock.fileno()
if protocol == IPPROTO_TCP:
client_addr = ('::1', 0, 0, 0) if family == AF_INET6 \
else ('127.0.0.1', 0)
client_sock = self.create_socket(family, type, protocol,
client_addr, False)
client_sock.setblocking(False)
try:
client_sock.connect(local)
except socket.error:
pass
server_sock, _ = sock.accept()
fwd_fd = server_sock.fileno()
# If a new connection is required, start the "server", have the
# internal forwarder connect to it, and then internally accept it.
if new_connection:
self.start_listen()
self.forwarder.connect_to_receiver()
self.accept_sock = self.accept_forwarder()
# Then push one socket session via the forwarder.
self.forwarder.push(fwd_fd, family, type, protocol, local, remote,
data)
# Pop the socket session we just pushed from a local receiver, and
# check the content.
receiver = SocketSessionReceiver(self.accept_sock)
signal.alarm(1)
sock_session = receiver.pop()
signal.alarm(0)
passed_sock = sock_session[0]
self.assertNotEqual(fwd_fd, passed_sock.fileno())
self.assertEqual(family, passed_sock.family)
self.assertEqual(type, passed_sock.type)
self.assertEqual(protocol, passed_sock.proto)
self.assertEqual(local, sock_session[1])
self.assertEqual(remote, sock_session[2])
self.assertEqual(data, sock_session[3])
# Check if the passed FD is usable by sending some data from it.
passed_sock.setblocking(True)
if protocol == IPPROTO_UDP:
self.assertEqual(len(TEST_DATA), passed_sock.sendto(TEST_DATA,
local))
sock.settimeout(10)
self.assertEqual(TEST_DATA, sock.recvfrom(len(TEST_DATA))[0])
else:
server_sock.close()
self.assertEqual(len(TEST_DATA), passed_sock.send(TEST_DATA))
client_sock.setblocking(True)
client_sock.settimeout(10)
self.assertEqual(TEST_DATA, client_sock.recv(len(TEST_DATA)))
def test_push_and_pop(self):
# This is a straightforward port of C++ pushAndPop test.
local6 = ('::1', TEST_PORT, 0, 0)
remote6 = ('2001:db8::1', 5300, 0, 0)
self.check_push_and_pop(AF_INET6, SOCK_DGRAM, IPPROTO_UDP,
local6, remote6, TEST_DATA, True)
self.check_push_and_pop(AF_INET6, SOCK_STREAM, IPPROTO_TCP,
local6, remote6, TEST_DATA, False)
local4 = ('127.0.0.1', TEST_PORT)
remote4 = ('192.0.2.2', 5300)
self.check_push_and_pop(AF_INET, SOCK_DGRAM, IPPROTO_UDP,
local4, remote4, TEST_DATA, False)
self.check_push_and_pop(AF_INET, SOCK_STREAM, IPPROTO_TCP,
local4, remote4, TEST_DATA, False)
self.check_push_and_pop(AF_INET6, SOCK_DGRAM, IPPROTO_UDP,
local6, remote6, self.large_text, False)
self.check_push_and_pop(AF_INET6, SOCK_STREAM, IPPROTO_TCP,
local6, remote6, self.large_text, False)
self.check_push_and_pop(AF_INET, SOCK_DGRAM, IPPROTO_UDP,
local4, remote4, self.large_text, False)
self.check_push_and_pop(AF_INET, SOCK_STREAM, IPPROTO_TCP,
local4, remote4, self.large_text, False)
# Python specific: check for an IPv6 scoped address with non 0
# scope (zone) ID
scope6 = ('fe80::1', TEST_PORT, 0, 1)
self.check_push_and_pop(AF_INET6, SOCK_DGRAM, IPPROTO_UDP,
local6, scope6, TEST_DATA, False)
def test_push_too_fast(self):
# A straightforward port of C++ pushTooFast test.
def multi_push(forwarder, addr, data):
exit for i in range(0, 10):
forwarder.push(1, AF_INET, SOCK_DGRAM, IPPROTO_UDP, addr,
addr, data)
self.start_listen()
self.forwarder.connect_to_receiver()
self.assertRaises(SocketSessionError, multi_push, self.forwarder,
('192.0.2.1', 53), self.large_text)
def test_bad_pop(self):
# This is a subset of C++ badPop test. We only check pop() raises
# SocketSessionError when it internally fails to get the FD.
# Other cases would require passing a valid FD from the test,
# which would make the test too complicated. As a wrapper checking
# one common failure case should be reasonably sufficient.
self.start_listen()
s = socket.socket(socket.AF_UNIX, SOCK_STREAM, 0)
s.setblocking(False)
s.connect(TEST_UNIX_FILE)
accept_sock = self.accept_forwarder()
receiver = SocketSessionReceiver(accept_sock)
s.close()
self.assertRaises(SocketSessionError, receiver.pop)
class TestReceiver(unittest.TestCase):
# We only check a couple of failure cases on construction. Valid cases
# are covered in TestForwarder.
def test_bad_init(self):
class FakeSocket:
# pretending to be th standard socket class, but its fileno() is
# bogus.
def fileno(self):
return None
self.assertRaises(TypeError, SocketSessionReceiver, 1)
self.assertRaises(TypeError, SocketSessionReceiver, FakeSocket())
exitif __name__ == '__main__':
unittest.main()
|