|
# Copyright (C) 2011 Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import unittest
import socket
from pydnspp import *
from isc.acl.acl import LoaderError, Error, ACCEPT, REJECT, DROP
from isc.acl.dns import *
def get_sockaddr(address, port):
'''This is a simple shortcut wrapper for getaddrinfo'''
ai = socket.getaddrinfo(address, port, 0, socket.SOCK_DGRAM,
socket.IPPROTO_UDP, socket.AI_NUMERICHOST)[0]
return ai[4]
def get_acl(prefix):
'''This is a simple shortcut for creating an ACL containing single rule
that accepts addresses for the given IP prefix (and reject any others
by default)
'''
return REQUEST_LOADER.load('[{"action": "ACCEPT", "from": "' + \
prefix + '"}]')
def get_acl_json(prefix):
'''Same as get_acl, but this function passes a Python representation of
JSON to the loader, not a string.'''
json = [{"action": "ACCEPT"}]
json[0]["from"] = prefix
return REQUEST_LOADER.load(json)
# The following two are similar to the previous two, but use a TSIG key name
# instead of IP prefix.
def get_tsig_acl(key):
return REQUEST_LOADER.load('[{"action": "ACCEPT", "key": "' + \
key + '"}]')
def get_tsig_acl_json(key):
json = [{"action": "ACCEPT"}]
json[0]["key"] = key
return REQUEST_LOADER.load(json)
# commonly used TSIG RDATA. For the purpose of ACL checks only the key name
# matters; other parrameters are simply borrowed from some other tests, which
# can be anything for the purpose of the tests here.
TSIG_RDATA = TSIG("hmac-md5.sig-alg.reg.int. 1302890362 " + \
"300 16 2tra2tra2tra2tra2tra2g== " + \
"11621 0 0")
def get_context(address, key_name=None):
'''This is a simple shortcut wrapper for creating a RequestContext
object with a given IP address and optionally TSIG key name.
Port number doesn't matter in the test (as of the initial implementation),
so it's fixed for simplicity.
If key_name is not None, it internally creates a (faked) TSIG record
and constructs a context with that key. Note that only the key name
matters for the purpose of ACL checks.
'''
tsig_record = None
if key_name is not None:
tsig_record = TSIGRecord(Name(key_name), TSIG_RDATA)
return RequestContext(get_sockaddr(address, 53000), tsig_record)
# These are commonly used RequestContext object
CONTEXT4 = get_context('192.0.2.1')
CONTEXT6 = get_context('2001:db8::1')
class RequestContextTest(unittest.TestCase):
def test_construct(self):
# Construct the context from IPv4/IPv6 addresses, check the object
# by printing it.
self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
'remote_addr=[192.0.2.1]:53001>',
RequestContext(('192.0.2.1', 53001)).__str__())
self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
'remote_addr=[2001:db8::1234]:53006>',
RequestContext(('2001:db8::1234', 53006,
0, 0)).__str__())
# Construct the context from IP address and a TSIG record.
tsig_record = TSIGRecord(Name("key.example.com"), TSIG_RDATA)
self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
'remote_addr=[192.0.2.1]:53001, ' + \
'key=key.example.com.>',
RequestContext(('192.0.2.1', 53001),
tsig_record).__str__())
# same with IPv6 address, just in case.
self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
'remote_addr=[2001:db8::1234]:53006, ' + \
'key=key.example.com.>',
RequestContext(('2001:db8::1234', 53006,
0, 0), tsig_record).__str__())
# Unusual case: port number overflows (this constructor allows that,
# although it should be rare anyway; the socket address should
# normally come from the Python socket module.
self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
'remote_addr=[192.0.2.1]:0>',
RequestContext(('192.0.2.1', 65536)).__str__())
# same test using socket.getaddrinfo() to ensure it accepts the sock
# address representation used in the Python socket module.
self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
'remote_addr=[192.0.2.1]:53001>',
RequestContext(get_sockaddr('192.0.2.1',
53001)).__str__())
self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
'remote_addr=[2001:db8::1234]:53006>',
RequestContext(get_sockaddr('2001:db8::1234',
53006)).__str__())
#
# Invalid parameters (in our expected usage this should not happen
# because the sockaddr would come from the Python socket module, but
# validation should still be performed correctly)
#
# not a tuple
self.assertRaises(TypeError, RequestContext, 1)
# invalid number of parameters
self.assertRaises(TypeError, RequestContext, ('192.0.2.1', 53), 0, 1)
# type error for TSIG
self.assertRaises(TypeError, RequestContext, ('192.0.2.1', 53), tsig=1)
# tuple is not in the form of sockaddr
self.assertRaises(TypeError, RequestContext, (0, 53))
self.assertRaises(TypeError, RequestContext, ('192.0.2.1', 'http'))
self.assertRaises(TypeError, RequestContext, ('::', 0, 'flow', 0))
# invalid address
self.assertRaises(Error, RequestContext, ('example.com', 5300))
self.assertRaises(Error, RequestContext, ('192.0.2.1.1', 5300))
self.assertRaises(Error, RequestContext, ('2001:db8:::1', 5300))
class RequestACLTest(unittest.TestCase):
def test_direct_construct(self):
self.assertRaises(Error, RequestACL)
def test_request_loader(self):
# these shouldn't raise an exception
REQUEST_LOADER.load('[{"action": "DROP"}]')
REQUEST_LOADER.load([{"action": "DROP"}])
REQUEST_LOADER.load('[{"action": "DROP", "from": "192.0.2.1"}]')
REQUEST_LOADER.load([{"action": "DROP", "from": "192.0.2.1"}])
# Invalid types (note that arguments like '1' or '[]' is of valid
# 'type' (but syntax error at a higher level)). So we need to use
# something that is not really JSON nor string.
self.assertRaises(TypeError, REQUEST_LOADER.load, b'')
# Incorrect number of arguments
self.assertRaises(TypeError, REQUEST_LOADER.load,
'[{"action": "DROP"}]', 0)
def test_bad_acl_syntax(self):
# the following are derived from loader_test.cc
self.assertRaises(LoaderError, REQUEST_LOADER.load, '{}');
self.assertRaises(LoaderError, REQUEST_LOADER.load, {});
self.assertRaises(LoaderError, REQUEST_LOADER.load, '42');
self.assertRaises(LoaderError, REQUEST_LOADER.load, 42);
self.assertRaises(LoaderError, REQUEST_LOADER.load, 'true');
self.assertRaises(LoaderError, REQUEST_LOADER.load, True);
self.assertRaises(LoaderError, REQUEST_LOADER.load, 'null');
self.assertRaises(LoaderError, REQUEST_LOADER.load, None);
self.assertRaises(LoaderError, REQUEST_LOADER.load, '"hello"');
self.assertRaises(LoaderError, REQUEST_LOADER.load, "hello");
self.assertRaises(LoaderError, REQUEST_LOADER.load, '[42]');
self.assertRaises(LoaderError, REQUEST_LOADER.load, [42]);
self.assertRaises(LoaderError, REQUEST_LOADER.load, '["hello"]');
self.assertRaises(LoaderError, REQUEST_LOADER.load, ["hello"]);
self.assertRaises(LoaderError, REQUEST_LOADER.load, '[[]]');
self.assertRaises(LoaderError, REQUEST_LOADER.load, [[]]);
self.assertRaises(LoaderError, REQUEST_LOADER.load, '[true]');
self.assertRaises(LoaderError, REQUEST_LOADER.load, [True]);
self.assertRaises(LoaderError, REQUEST_LOADER.load, '[null]');
self.assertRaises(LoaderError, REQUEST_LOADER.load, [None]);
self.assertRaises(LoaderError, REQUEST_LOADER.load, '[{}]');
self.assertRaises(LoaderError, REQUEST_LOADER.load, [{}]);
# the following are derived from dns_test.cc
self.assertRaises(LoaderError, REQUEST_LOADER.load,
'[{"action": "ACCEPT", "bad": "192.0.2.1"}]')
self.assertRaises(LoaderError, REQUEST_LOADER.load,
[{"action": "ACCEPT", "bad": "192.0.2.1"}])
self.assertRaises(LoaderError, REQUEST_LOADER.load,
'[{"action": "ACCEPT", "from": 4}]')
self.assertRaises(LoaderError, REQUEST_LOADER.load,
[{"action": "ACCEPT", "from": 4}])
self.assertRaises(LoaderError, REQUEST_LOADER.load,
'[{"action": "ACCEPT", "from": []}]')
self.assertRaises(LoaderError, REQUEST_LOADER.load,
[{"action": "ACCEPT", "from": []}])
self.assertRaises(LoaderError, REQUEST_LOADER.load,
'[{"action": "ACCEPT", "key": 1}]')
self.assertRaises(LoaderError, REQUEST_LOADER.load,
[{"action": "ACCEPT", "key": 1}])
self.assertRaises(LoaderError, REQUEST_LOADER.load,
'[{"action": "ACCEPT", "key": {}}]')
self.assertRaises(LoaderError, REQUEST_LOADER.load,
[{"action": "ACCEPT", "key": {}}])
self.assertRaises(LoaderError, REQUEST_LOADER.load,
'[{"action": "ACCEPT", "from": "bad"}]')
self.assertRaises(LoaderError, REQUEST_LOADER.load,
[{"action": "ACCEPT", "from": "bad"}])
self.assertRaises(LoaderError, REQUEST_LOADER.load,
[{"action": "ACCEPT", "key": "bad..name"}])
self.assertRaises(LoaderError, REQUEST_LOADER.load,
[{"action": "ACCEPT", "key": "bad..name"}])
self.assertRaises(LoaderError, REQUEST_LOADER.load,
'[{"action": "ACCEPT", "from": null}]')
self.assertRaises(LoaderError, REQUEST_LOADER.load,
[{"action": "ACCEPT", "from": None}])
def test_bad_acl_ipsyntax(self):
# this test is derived from ip_check_unittest.cc
self.assertRaises(LoaderError, REQUEST_LOADER.load,
'[{"action": "DROP", "from": "192.0.2.43/-1"}]')
self.assertRaises(LoaderError, REQUEST_LOADER.load,
[{"action": "DROP", "from": "192.0.2.43/-1"}])
self.assertRaises(LoaderError, REQUEST_LOADER.load,
'[{"action": "DROP", "from": "192.0.2.43//1"}]')
self.assertRaises(LoaderError, REQUEST_LOADER.load,
[{"action": "DROP", "from": "192.0.2.43//1"}])
self.assertRaises(LoaderError, REQUEST_LOADER.load,
'[{"action": "DROP", "from": "192.0.2.43/1/"}]')
self.assertRaises(LoaderError, REQUEST_LOADER.load,
[{"action": "DROP", "from": "192.0.2.43/1/"}])
self.assertRaises(LoaderError, REQUEST_LOADER.load,
'[{"action": "DROP", "from": "/192.0.2.43/1"}]')
self.assertRaises(LoaderError, REQUEST_LOADER.load,
[{"action": "DROP", "from": "/192.0.2.43/1"}])
self.assertRaises(LoaderError, REQUEST_LOADER.load,
'[{"action": "DROP", "from": "2001:db8::/xxxx"}]')
self.assertRaises(LoaderError, REQUEST_LOADER.load,
[{"action": "DROP", "from": "2001:db8::/xxxx"}])
self.assertRaises(LoaderError, REQUEST_LOADER.load,
'[{"action": "DROP", "from": "2001:db8::/32/s"}]')
self.assertRaises(LoaderError, REQUEST_LOADER.load,
[{"action": "DROP", "from": "2001:db8::/32/s"}])
self.assertRaises(LoaderError, REQUEST_LOADER.load,
'[{"action": "DROP", "from": "1/"}]')
self.assertRaises(LoaderError, REQUEST_LOADER.load,
[{"action": "DROP", "from": "1/"}])
self.assertRaises(LoaderError, REQUEST_LOADER.load,
'[{"action": "DROP", "from": "/1"}]')
self.assertRaises(LoaderError, REQUEST_LOADER.load,
[{"action": "DROP", "from": "/1"}])
self.assertRaises(LoaderError, REQUEST_LOADER.load,
'[{"action": "DROP", "from": "192.0.2.0/33"}]')
self.assertRaises(LoaderError, REQUEST_LOADER.load,
[{"action": "DROP", "from": "192.0.2.0/33"}])
self.assertRaises(LoaderError, REQUEST_LOADER.load,
'[{"action": "DROP", "from": "::1/129"}]')
self.assertRaises(LoaderError, REQUEST_LOADER.load,
[{"action": "DROP", "from": "::1/129"}])
def test_execute(self):
# tests derived from dns_test.cc. We don't directly expose checks
# in the python wrapper, so we test it via execute().
self.assertEqual(ACCEPT, get_acl('192.0.2.1').execute(CONTEXT4))
self.assertEqual(ACCEPT, get_acl_json('192.0.2.1').execute(CONTEXT4))
self.assertEqual(REJECT, get_acl('192.0.2.53').execute(CONTEXT4))
self.assertEqual(REJECT, get_acl_json('192.0.2.53').execute(CONTEXT4))
self.assertEqual(ACCEPT, get_acl('192.0.2.0/24').execute(CONTEXT4))
self.assertEqual(ACCEPT, get_acl_json('192.0.2.0/24').execute(CONTEXT4))
self.assertEqual(REJECT, get_acl('192.0.1.0/24').execute(CONTEXT4))
self.assertEqual(REJECT, get_acl_json('192.0.1.0/24').execute(CONTEXT4))
self.assertEqual(REJECT, get_acl('192.0.1.0/24').execute(CONTEXT4))
self.assertEqual(REJECT, get_acl_json('192.0.1.0/24').execute(CONTEXT4))
self.assertEqual(ACCEPT, get_acl('2001:db8::1').execute(CONTEXT6))
self.assertEqual(ACCEPT, get_acl_json('2001:db8::1').execute(CONTEXT6))
self.assertEqual(REJECT, get_acl('2001:db8::53').execute(CONTEXT6))
self.assertEqual(REJECT, get_acl_json('2001:db8::53').execute(CONTEXT6))
self.assertEqual(ACCEPT, get_acl('2001:db8::/64').execute(CONTEXT6))
self.assertEqual(ACCEPT,
get_acl_json('2001:db8::/64').execute(CONTEXT6))
self.assertEqual(REJECT, get_acl('2001:db8:1::/64').execute(CONTEXT6))
self.assertEqual(REJECT,
get_acl_json('2001:db8:1::/64').execute(CONTEXT6))
self.assertEqual(REJECT, get_acl('32.1.13.184').execute(CONTEXT6))
self.assertEqual(REJECT, get_acl_json('32.1.13.184').execute(CONTEXT6))
# TSIG checks, derived from dns_test.cc
self.assertEqual(ACCEPT, get_tsig_acl('key.example.com').\
execute(get_context('192.0.2.1',
'key.example.com')))
self.assertEqual(REJECT, get_tsig_acl_json('key.example.com').\
execute(get_context('192.0.2.1',
'badkey.example.com')))
self.assertEqual(ACCEPT, get_tsig_acl('key.example.com').\
execute(get_context('2001:db8::1',
'key.example.com')))
self.assertEqual(REJECT, get_tsig_acl_json('key.example.com').\
execute(get_context('2001:db8::1',
'badkey.example.com')))
self.assertEqual(REJECT, get_tsig_acl('key.example.com').\
execute(CONTEXT4))
self.assertEqual(REJECT, get_tsig_acl_json('key.example.com').\
execute(CONTEXT4))
self.assertEqual(REJECT, get_tsig_acl('key.example.com').\
execute(CONTEXT6))
self.assertEqual(REJECT, get_tsig_acl_json('key.example.com').\
execute(CONTEXT6))
# A bit more complicated example, derived from resolver_config_unittest
acl = REQUEST_LOADER.load('[ {"action": "ACCEPT", ' +
' "from": "192.0.2.1"},' +
' {"action": "REJECT",' +
' "from": "192.0.2.0/24"},' +
' {"action": "DROP",' +
' "from": "2001:db8::1"},' +
']')
self.assertEqual(ACCEPT, acl.execute(CONTEXT4))
self.assertEqual(REJECT, acl.execute(get_context('192.0.2.2')))
self.assertEqual(DROP, acl.execute(get_context('2001:db8::1')))
self.assertEqual(REJECT, acl.execute(get_context('2001:db8::2')))
# same test using the JSON representation
acl = REQUEST_LOADER.load([{"action": "ACCEPT", "from": "192.0.2.1"},
{"action": "REJECT",
"from": "192.0.2.0/24"},
{"action": "DROP", "from": "2001:db8::1"}])
self.assertEqual(ACCEPT, acl.execute(CONTEXT4))
self.assertEqual(REJECT, acl.execute(get_context('192.0.2.2')))
self.assertEqual(DROP, acl.execute(get_context('2001:db8::1')))
self.assertEqual(REJECT, acl.execute(get_context('2001:db8::2')))
def test_bad_execute(self):
acl = get_acl('192.0.2.1')
# missing parameter
self.assertRaises(TypeError, acl.execute)
# too many parameters
self.assertRaises(TypeError, acl.execute, get_context('192.0.2.2'), 0)
# type mismatch
self.assertRaises(TypeError, acl.execute, 'bad parameter')
class RequestLoaderTest(unittest.TestCase):
# Note: loading ACLs is tested in other test cases.
def test_construct(self):
# at least for now, we don't allow direct construction.
self.assertRaises(Error, RequestLoader)
exitif __name__ == '__main__':
unittest.main()
|