| 
 # Copyright (C) 2011  Internet Systems Consortium.  
#  
# Permission to use, copy, modify, and distribute this software for any  
# purpose with or without fee is hereby granted, provided that the above  
# copyright notice and this permission notice appear in all copies.  
#  
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM  
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL  
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL  
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,  
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING  
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,  
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION  
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.  
  
import base64, sys, time, unittest  
from pydnspp import *  
from testutil import *  
from pyunittests_util import fix_current_time  
  
# bit-wise constant flags to configure DNS header flags for test  
# messages.  
QR_FLAG = 0x1  
AA_FLAG = 0x2  
RD_FLAG = 0x4  
  
COMMON_EXPECTED_MAC = b"\x22\x70\x26\xad\x29\x7b\xee\xe7\x21\xce\x6c\x6f\xff\x1e\x9e\xf3"  
DUMMY_DATA = b"\xdd" * 100  
  
class TSIGContextTest(unittest.TestCase):  
    tsig_key = TSIGKey('www.example.com:SFuWd/q99SzF8Yzd1QbB9g==')  
  
    def setUp(self):  
        # make sure we don't use faked time unless explicitly do so in tests  
        fix_current_time(None)  
        self.qid = 0x2d65  
        self.test_name = Name("www.example.com")  
        self.tsig_ctx = TSIGContext(self.tsig_key)  
        self.tsig_verify_ctx = TSIGContext(self.tsig_key)  
        self.keyring = TSIGKeyRing()  
        self.message = Message(Message.RENDER)  
        self.renderer = MessageRenderer()  
        self.test_class = RRClass.IN()  
        self.test_ttl = RRTTL(86400)  
        self.secret = base64.b64decode(b"SFuWd/q99SzF8Yzd1QbB9g==")  
        self.tsig_ctx = TSIGContext(TSIGKey(self.test_name,  
                                            TSIGKey.HMACMD5_NAME,  
                                            self.secret))  
        self.badkey_name = Name("badkey.example.com")  
        self.dummy_record = TSIGRecord(self.badkey_name,  
                                       TSIG("hmac-md5.sig-alg.reg.int. " + \  
                                                "1302890362 300 0 11621 " + \  
                                                "0 0"))  
  
    def tearDown(self):  
        # reset any faked current time setting (it would affect other tests)  
        fix_current_time(None)  
  
    # Note: intentionally use camelCase so that we can easily copy-paste  
    # corresponding C++ tests.  
    def createMessageAndSign(self, id, qname, ctx, message_flags=RD_FLAG,  
                             qtype=RRType.A(), answer_data=None,  
                             answer_type=None, add_question=True,  
                             rcode=Rcode.NOERROR()):  
        self.message.clear(Message.RENDER)  
        self.message.set_qid(id)  
        self.message.set_opcode(Opcode.QUERY())  
        self.message.set_rcode(rcode)  
        if (message_flags & QR_FLAG) != 0:  
            self.message.set_header_flag(Message.HEADERFLAG_QR)  
        if (message_flags & AA_FLAG) != 0:  
            self.message.set_header_flag(Message.HEADERFLAG_AA)  
        if (message_flags & RD_FLAG) != 0:  
            self.message.set_header_flag(Message.HEADERFLAG_RD)  
        if add_question:  
            self.message.add_question(Question(qname, self.test_class, qtype))  
        if answer_data is not None:  
            if answer_type is None:  
                answer_type = qtype  
            answer_rrset = RRset(qname, self.test_class, answer_type,  
                                 self.test_ttl)  
            answer_rrset.add_rdata(Rdata(answer_type, self.test_class,  
                                         answer_data))  
            self.message.add_rrset(Message.SECTION_ANSWER, answer_rrset)  
        self.renderer.clear()  
        self.message.to_wire(self.renderer)  
  
        if ctx.get_state() == TSIGContext.STATE_INIT:  
            expected_new_state = TSIGContext.STATE_SENT_REQUEST  
        else:  
            expected_new_state = TSIGContext.STATE_SENT_RESPONSE  
        tsig = ctx.sign(id, self.renderer.get_data())  
  
        return tsig  
  
    # Note: intentionally use camelCase so that we can easily copy-paste  
    # corresponding C++ tests.  
    def createMessageFromFile(self, file):  
        self.message.clear(Message.PARSE)  
        self.received_data = read_wire_data(file)  
        self.message.from_wire(self.received_data)  
  
    # Note: intentionally use camelCase so that we can easily copy-paste  
    # corresponding C++ tests.  
    def commonSignChecks(self, tsig, expected_qid, expected_timesigned,  
                         expected_mac, expected_error=0,  
                         expected_otherdata=None,  
                         expected_algorithm=TSIGKey.HMACMD5_NAME):  
        tsig_rdata = tsig.get_rdata()  
        self.assertEqual(expected_algorithm, tsig_rdata.get_algorithm())  
        self.assertEqual(expected_timesigned, tsig_rdata.get_timesigned())  
        self.assertEqual(300, tsig_rdata.get_fudge())  
        self.assertEqual(expected_mac, tsig_rdata.get_mac())  
        self.assertEqual(expected_qid, tsig_rdata.get_original_id())  
        self.assertEqual(expected_error, tsig_rdata.get_error())  
        self.assertEqual(expected_otherdata, tsig_rdata.get_other_data())  
  
    def test_initial_state(self):  
        # Until signing or verifying, the state should be INIT  
        self.assertEqual(TSIGContext.STATE_INIT, self.tsig_ctx.get_state())  
  
        # And there should be no error code.  
        self.assertEqual(TSIGError(Rcode.NOERROR()), self.tsig_ctx.get_error())  
  
    # Note: intentionally use camelCase so that we can easily copy-paste  
    # corresponding C++ tests.  
    def commonVerifyChecks(self, ctx, record, data, expected_error,  
                           expected_new_state=\  
                               TSIGContext.STATE_VERIFIED_RESPONSE):  
        self.assertEqual(expected_error, ctx.verify(record, data))  
        self.assertEqual(expected_error, ctx.get_error())  
        self.assertEqual(expected_new_state, ctx.get_state())  
  
    def test_from_keyring(self):  
        # Construct a TSIG context with an empty key ring.  Key shouldn't be  
        # found, and the BAD_KEY error should be recorded.  
        ctx = TSIGContext(self.test_name, TSIGKey.HMACMD5_NAME, self.keyring)  
        self.assertEqual(TSIGContext.STATE_INIT, ctx.get_state())  
        self.assertEqual(TSIGError.BAD_KEY, ctx.get_error())  
        # check get_error() doesn't cause ref leak.  Note: we can't  
        # realiably do this check for get_state(), as it returns an integer  
        # object, which could have many references  
        self.assertEqual(1, sys.getrefcount(ctx.get_error()))  
  
        # Add a matching key (we don't use the secret so leave it empty), and  
        # construct it again.  This time it should be constructed with a valid  
        # key.  
        self.keyring.add(TSIGKey(self.test_name, TSIGKey.HMACMD5_NAME, b""))  
        ctx = TSIGContext(self.test_name, TSIGKey.HMACMD5_NAME, self.keyring)  
        self.assertEqual(TSIGContext.STATE_INIT, ctx.get_state())  
        self.assertEqual(TSIGError.NOERROR, ctx.get_error())  
  
        # Similar to the first case except that the key ring isn't empty but  
        # it doesn't contain a matching key.  
        ctx = TSIGContext(self.test_name, TSIGKey.HMACSHA1_NAME, self.keyring)  
        self.assertEqual(TSIGContext.STATE_INIT, ctx.get_state())  
        self.assertEqual(TSIGError.BAD_KEY, ctx.get_error())  
  
        ctx = TSIGContext(Name("different-key.example"),  
                          TSIGKey.HMACMD5_NAME, self.keyring)  
        self.assertEqual(TSIGContext.STATE_INIT, ctx.get_state())  
        self.assertEqual(TSIGError.BAD_KEY, ctx.get_error())  
  
        # "Unknown" algorithm name will result in BADKEY, too.  
        ctx = TSIGContext(self.test_name, Name("unknown.algorithm"),  
                          self.keyring)  
        self.assertEqual(TSIGContext.STATE_INIT, ctx.get_state())  
        self.assertEqual(TSIGError.BAD_KEY, ctx.get_error())  
  
    def test_sign(self):  
        fix_current_time(0x4da8877a)  
        tsig = self.createMessageAndSign(self.qid, self.test_name,  
                                         self.tsig_ctx)  
        self.commonSignChecks(tsig, self.qid, 0x4da8877a, COMMON_EXPECTED_MAC)  
  
    # Same test as sign, but specifying the key name with upper-case (i.e.  
    # non canonical) characters.  The digest must be the same.  It should  
    # actually be ensured at the level of TSIGKey, but we confirm that at  
    # this level, too.  
    def test_sign_using_uppercase_keyname(self):  
        fix_current_time(0x4da8877a)  
        cap_ctx = TSIGContext(TSIGKey(Name("WWW.EXAMPLE.COM"),  
                                      TSIGKey.HMACMD5_NAME, self.secret))  
        tsig = self.createMessageAndSign(self.qid, self.test_name, cap_ctx)  
        self.commonSignChecks(tsig, self.qid, 0x4da8877a, COMMON_EXPECTED_MAC)  
  
    # Same as the previous test, but for the algorithm name.  
    def test_sign_using_uppercase_algorithm_name(self):  
        fix_current_time(0x4da8877a)  
        cap_ctx = TSIGContext(TSIGKey(self.test_name,  
                                      Name("HMAC-md5.SIG-alg.REG.int"),  
                                      self.secret))  
        tsig = self.createMessageAndSign(self.qid, self.test_name, cap_ctx)  
        self.commonSignChecks(tsig, self.qid, 0x4da8877a, COMMON_EXPECTED_MAC)  
  
    # Sign the message using the actual time, and check the accuracy of it.  
    # We cannot reasonably predict the expected MAC, so don't bother to  
    # check it.  
    def test_sign_at_actual_time(self):  
        now = int(time.time())  
        tsig = self.createMessageAndSign(self.qid, self.test_name,  
                                         self.tsig_ctx)  
        tsig_rdata = tsig.get_rdata()  
  
        # Check the resulted time signed is in the range of [now, now + 5]  
        self.assertTrue(now <= tsig_rdata.get_timesigned())  
        self.assertTrue(now + 5 >= tsig_rdata.get_timesigned())  
  
    def test_bad_data(self):  
        self.assertRaises(TypeError, self.tsig_ctx.sign, None, 10)  
  
    def test_verify_bad_data(self):  
        # the data must at least hold the DNS message header and the specified  
        # TSIG.  
        bad_len = 12 + self.dummy_record.get_length() - 1  
        self.assertRaises(InvalidParameter, self.tsig_ctx.verify,  
                          self.dummy_record, DUMMY_DATA[:bad_len])  
  
    def test_sign_using_hmacsha1(self):  
        fix_current_time(0x4dae7d5f)  
  
        secret = base64.b64decode(b"MA+QDhXbyqUak+qnMFyTyEirzng=")  
        sha1_ctx = TSIGContext(TSIGKey(self.test_name, TSIGKey.HMACSHA1_NAME,  
                                       secret))  
        qid = 0x0967  
        expected_mac = b"\x41\x53\x40\xc7\xda\xf8\x24\xed\x68\x4e\xe5\x86" + \  
            b"\xf7\xb5\xa6\x7a\x2f\xeb\xc0\xd3"  
        tsig = self.createMessageAndSign(qid, self.test_name, sha1_ctx)  
        self.commonSignChecks(tsig, qid, 0x4dae7d5f, expected_mac,  
                              0, None, TSIGKey.HMACSHA1_NAME)  
  
    def test_verify_then_sign_response(self):  
        fix_current_time(0x4da8877a)  
  
        self.createMessageFromFile("message_toWire2.wire")  
        self.commonVerifyChecks(self.tsig_verify_ctx,  
                                self.message.get_tsig_record(),  
                                self.received_data, TSIGError.NOERROR,  
                                TSIGContext.STATE_RECEIVED_REQUEST)  
  
        tsig = self.createMessageAndSign(self.qid, self.test_name,  
                                         self.tsig_verify_ctx,  
                                         QR_FLAG|AA_FLAG|RD_FLAG,  
                                         RRType.A(), "192.0.2.1")  
  
        expected_mac = b"\x8f\xcd\xa6\x6a\x7c\xd1\xa3\xb9\x94\x8e\xb1\x86" + \  
            b"\x9d\x38\x4a\x9f"  
        self.commonSignChecks(tsig, self.qid, 0x4da8877a, expected_mac)  
  
    def test_verify_uppercase_names(self):  
        fix_current_time(0x4da8877a)  
  
        self.createMessageFromFile("tsig_verify9.wire")  
        self.commonVerifyChecks(self.tsig_verify_ctx,  
                                self.message.get_tsig_record(),  
                                self.received_data, TSIGError.NOERROR,  
                                TSIGContext.STATE_RECEIVED_REQUEST)  
  
    def test_verify_forward_message(self):  
        fix_current_time(0x4da8877a)  
  
        self.createMessageFromFile("tsig_verify6.wire")  
        self.commonVerifyChecks(self.tsig_verify_ctx,  
                                self.message.get_tsig_record(),  
                                self.received_data, TSIGError.NOERROR,  
                                TSIGContext.STATE_RECEIVED_REQUEST)  
  
    def test_sign_continuation(self):  
        fix_current_time(0x4da8e951)  
  
        axfr_qid = 0x3410  
        zone_name = Name("example.com")  
  
        tsig = self.createMessageAndSign(axfr_qid, zone_name, self.tsig_ctx,  
                                         0, RRType.AXFR())  
  
        received_data = read_wire_data("tsig_verify1.wire")  
        self.commonVerifyChecks(self.tsig_verify_ctx, tsig, received_data,  
                                TSIGError.NOERROR,  
                                TSIGContext.STATE_RECEIVED_REQUEST)  
  
        tsig = self.createMessageAndSign(axfr_qid, zone_name,  
                                         self.tsig_verify_ctx,  
                                         AA_FLAG|QR_FLAG, RRType.AXFR(),  
                                         "ns.example.com. root.example.com." +\  
                                         " 2011041503 7200 3600 2592000 1200",  
                                         RRType.SOA())  
  
        received_data = read_wire_data("tsig_verify2.wire")  
        self.commonVerifyChecks(self.tsig_ctx, tsig, received_data,  
                                TSIGError.NOERROR)  
  
        expected_mac = b"\x10\x24\x58\xf7\xf6\x2d\xdd\x7d\x63\x8d\x74" +\  
            b"\x60\x34\x13\x09\x68"  
        tsig = self.createMessageAndSign(axfr_qid, zone_name,  
                                         self.tsig_verify_ctx,  
                                         AA_FLAG|QR_FLAG, RRType.AXFR(),  
                                         "ns.example.com.", RRType.NS(),  
                                         False)  
        self.commonSignChecks(tsig, axfr_qid, 0x4da8e951, expected_mac)  
  
        received_data = read_wire_data("tsig_verify3.wire")  
        self.commonVerifyChecks(self.tsig_ctx, tsig, received_data,  
                                TSIGError.NOERROR)  
  
    def test_badtime_response(self):  
        fix_current_time(0x4da8b9d6)  
  
        test_qid = 0x7fc4  
        tsig = self.createMessageAndSign(test_qid, self.test_name,  
                                         self.tsig_ctx, 0, RRType.SOA())  
  
        # "advance the clock" and try validating, which should fail due to  
        # BADTIME  
        fix_current_time(0x4da8be86)  
        self.commonVerifyChecks(self.tsig_verify_ctx, tsig, DUMMY_DATA,  
                                TSIGError.BAD_TIME,  
                                TSIGContext.STATE_RECEIVED_REQUEST)  
  
        # make and sign a response in the context of TSIG error.  
        tsig = self.createMessageAndSign(test_qid, self.test_name,  
                                         self.tsig_verify_ctx,  
                                         QR_FLAG, RRType.SOA(), None, None,  
                                         True, Rcode.NOTAUTH())  
  
        expected_otherdata = b"\x00\x00\x4d\xa8\xbe\x86"  
        expected_mac = b"\xd4\xb0\x43\xf6\xf4\x44\x95\xec\x8a\x01\x26" +\  
            b"\x0e\x39\x15\x9d\x76"  
  
        self.commonSignChecks(tsig, self.message.get_qid(), 0x4da8b9d6,  
                              expected_mac,  
                              18,     # error: BADTIME  
                              expected_otherdata)  
  
    def test_badtime_response2(self):  
        fix_current_time(0x4da8b9d6)  
  
        tsig = self.createMessageAndSign(self.qid, self.test_name,  
                                         self.tsig_ctx, 0, RRType.SOA())  
  
        # "rewind the clock" and try validating, which should fail due to  
        # BADTIME  
        fix_current_time(0x4da8b9d6 - 600)  
        self.commonVerifyChecks(self.tsig_verify_ctx, tsig, DUMMY_DATA,  
                           TSIGError.BAD_TIME,  
                                TSIGContext.STATE_RECEIVED_REQUEST)  
  
    # Test various boundary conditions.  We intentionally use the magic  
    # number of 300 instead of the constant variable for testing.  
    # In the okay cases, signature is not correct, but it's sufficient to  
    # check the error code isn't BADTIME for the purpose of this test.  
    def test_badtime_boundaries(self):  
        fix_current_time(0x4da8b9d6)  
  
        tsig = self.createMessageAndSign(self.qid, self.test_name,  
                                         self.tsig_ctx, 0, RRType.SOA())  
  
        fix_current_time(0x4da8b9d6 + 301)  
        self.assertEqual(TSIGError.BAD_TIME,  
                         self.tsig_verify_ctx.verify(tsig, DUMMY_DATA))  
  
        fix_current_time(0x4da8b9d6 + 300)  
        self.assertNotEqual(TSIGError.BAD_TIME,  
                            self.tsig_verify_ctx.verify(tsig, DUMMY_DATA))  
  
        fix_current_time(0x4da8b9d6 - 301)  
        self.assertEqual(TSIGError.BAD_TIME,  
                         self.tsig_verify_ctx.verify(tsig, DUMMY_DATA))  
  
        fix_current_time(0x4da8b9d6 - 300)  
        self.assertNotEqual(TSIGError.BAD_TIME,  
                            self.tsig_verify_ctx.verify(tsig, DUMMY_DATA))  
  
    def test_badtime_overflow(self):  
        fix_current_time(200)  
        tsig = self.createMessageAndSign(self.qid, self.test_name,  
                                         self.tsig_ctx, 0, RRType.SOA())  
  
        # This should be in the okay range, but since "200 - fudge" overflows  
        # and we compare them as 64-bit unsigned integers, it results in a  
        # false positive (we intentionally accept that).  
        fix_current_time(100)  
        self.assertEqual(TSIGError.BAD_TIME,  
                         self.tsig_verify_ctx.verify(tsig, DUMMY_DATA))  
  
    def test_badsig_response(self):  
        fix_current_time(0x4da8877a)  
  
        # Try to sign a simple message with bogus secret.  It should fail  
        # with BADSIG.  
        self.createMessageFromFile("message_toWire2.wire")  
        bad_ctx = TSIGContext(TSIGKey(self.test_name, TSIGKey.HMACMD5_NAME,  
                                      DUMMY_DATA))  
        self.commonVerifyChecks(bad_ctx, self.message.get_tsig_record(),  
                                self.received_data, TSIGError.BAD_SIG,  
                                TSIGContext.STATE_RECEIVED_REQUEST)  
  
        # Sign the same message (which doesn't matter for this test) with the  
        # context of "checked state".  
        tsig = self.createMessageAndSign(self.qid, self.test_name, bad_ctx)  
        self.commonSignChecks(tsig, self.message.get_qid(), 0x4da8877a, None,  
                              16)   # 16: BADSIG  
  
    def test_badkey_response(self):  
        # A similar test as badsigResponse but for BADKEY  
        fix_current_time(0x4da8877a)  
        tsig_ctx = TSIGContext(self.badkey_name, TSIGKey.HMACMD5_NAME,  
                               self.keyring)  
        self.commonVerifyChecks(tsig_ctx, self.dummy_record, DUMMY_DATA,  
                                TSIGError.BAD_KEY,  
                                TSIGContext.STATE_RECEIVED_REQUEST)  
  
        sig = self.createMessageAndSign(self.qid, self.test_name, tsig_ctx)  
        self.assertEqual(self.badkey_name, sig.get_name())  
        self.commonSignChecks(sig, self.qid, 0x4da8877a, None, 17) # 17: BADKEY  
  
    def test_badkey_for_response(self):  
        # "BADKEY" case for a response to a signed message  
        self.createMessageAndSign(self.qid, self.test_name, self.tsig_ctx)  
        self.commonVerifyChecks(self.tsig_ctx, self.dummy_record, DUMMY_DATA,  
                                TSIGError.BAD_KEY,  
                                TSIGContext.STATE_SENT_REQUEST)  
  
        # A similar case with a different algorithm  
        dummy_record = TSIGRecord(self.test_name,  
                                  TSIG("hmac-sha1. 1302890362 300 0 "  
                                       "11621 0 0"))  
        self.commonVerifyChecks(self.tsig_ctx, dummy_record, DUMMY_DATA,  
                                TSIGError.BAD_KEY,  
                                TSIGContext.STATE_SENT_REQUEST)  
  
    # According to RFC2845 4.6, if TSIG verification fails the client  
    # should discard that message and wait for another signed response.  
    # This test emulates that situation.  
    def test_badsig_then_validate(self):  
        fix_current_time(0x4da8877a)  
  
        self.createMessageAndSign(self.qid, self.test_name, self.tsig_ctx)  
        self.createMessageFromFile("tsig_verify4.wire")  
  
        self.commonVerifyChecks(self.tsig_ctx, self.message.get_tsig_record(),  
                                self.received_data, TSIGError.BAD_SIG,  
                                TSIGContext.STATE_SENT_REQUEST)  
  
        self.createMessageFromFile("tsig_verify5.wire")  
        self.commonVerifyChecks(self.tsig_ctx, self.message.get_tsig_record(),  
                                self.received_data, TSIGError.NOERROR,  
                                TSIGContext.STATE_VERIFIED_RESPONSE)  
  
    # Similar to the previous test, but the first response doesn't contain  
    # TSIG.  
    def test_nosig_then_validate(self):  
        fix_current_time(0x4da8877a)  
        self.createMessageAndSign(self.qid, self.test_name, self.tsig_ctx)  
  
        self.commonVerifyChecks(self.tsig_ctx, None, DUMMY_DATA,  
                           TSIGError.FORMERR, TSIGContext.STATE_SENT_REQUEST)  
  
        self.createMessageFromFile("tsig_verify5.wire")  
        self.commonVerifyChecks(self.tsig_ctx, self.message.get_tsig_record(),  
                                self.received_data, TSIGError.NOERROR,  
                                TSIGContext.STATE_VERIFIED_RESPONSE)  
  
    # Similar to the previous test, but the first response results in BADTIME.  
    def test_badtime_then_validate(self):  
        fix_current_time(0x4da8877a)  
        tsig = self.createMessageAndSign(self.qid, self.test_name,  
                                         self.tsig_ctx)  
  
        # "advance the clock" and try validating, which should fail due to  
        # BADTIME  
        fix_current_time(0x4da8877a + 600)  
        self.commonVerifyChecks(self.tsig_ctx, tsig, DUMMY_DATA,  
                           TSIGError.BAD_TIME, TSIGContext.STATE_SENT_REQUEST)  
  
        # revert the clock again.  
        fix_current_time(0x4da8877a)  
        self.createMessageFromFile("tsig_verify5.wire")  
        self.commonVerifyChecks(self.tsig_ctx, self.message.get_tsig_record(),  
                                self.received_data, TSIGError.NOERROR,  
                                TSIGContext.STATE_VERIFIED_RESPONSE)  
  
    # We don't allow empty MAC unless the TSIG error is BADSIG or BADKEY.  
    def test_empty_mac(self):  
        fix_current_time(0x4da8877a)  
  
        self.createMessageFromFile("tsig_verify7.wire")  
  
        self.commonVerifyChecks(self.tsig_verify_ctx,  
                                self.message.get_tsig_record(),  
                                self.received_data,  
                                TSIGError.BAD_SIG,  
                                TSIGContext.STATE_RECEIVED_REQUEST)  
  
        # If the empty MAC comes with a BADKEY error, the error is passed  
        # transparently.  
        self.createMessageFromFile("tsig_verify8.wire")  
        self.commonVerifyChecks(self.tsig_verify_ctx,  
                                self.message.get_tsig_record(),  
                                self.received_data,  
                                TSIGError.BAD_KEY,  
                                TSIGContext.STATE_RECEIVED_REQUEST)  
  
    # Once the context is used for sending a signed response, it shouldn't  
    # be used for further verification.  
    def test_verify_after_sendresponse(self):  
        fix_current_time(0x4da8877a)  
  
        self.createMessageFromFile("message_toWire2.wire")  
        self.tsig_verify_ctx.verify(self.message.get_tsig_record(),  
                                    self.received_data)  
        self.assertEqual(TSIGContext.STATE_RECEIVED_REQUEST,  
                         self.tsig_verify_ctx.get_state())  
        self.createMessageAndSign(self.qid, self.test_name,  
                                  self.tsig_verify_ctx,  
                                  QR_FLAG|AA_FLAG|RD_FLAG, RRType.A(),  
                                  "192.0.2.1")  
        self.assertEqual(TSIGContext.STATE_SENT_RESPONSE,  
                         self.tsig_verify_ctx.get_state())  
  
        # Now trying further verification.  
        self.createMessageFromFile("message_toWire2.wire")  
        self.assertRaises(TSIGContextError, self.tsig_verify_ctx.verify,  
                          self.message.get_tsig_record(), self.received_data)  
  
    # Likewise, once the context verifies a response, it shouldn't for  
    # signing any more.  
    def test_sign_after_verified(self):  
        fix_current_time(0x4da8877a)  
  
        self.createMessageAndSign(self.qid, self.test_name, self.tsig_ctx)  
        self.createMessageFromFile("tsig_verify5.wire")  
        self.tsig_ctx.verify(self.message.get_tsig_record(),  
                             self.received_data)  
        self.assertEqual(TSIGContext.STATE_VERIFIED_RESPONSE,  
                         self.tsig_ctx.get_state())  
  
        # Now trying further signing.  
        self.assertRaises(TSIGContextError, self.createMessageAndSign,  
                          self.qid, self.test_name, self.tsig_ctx)  
  
    # Too short MAC should be rejected.  
    # Note: when we implement RFC4635-based checks, the error code will  
    # (probably) be FORMERR.  
    def test_too_short_mac(self):  
        fix_current_time(0x4da8877a)  
        self.createMessageFromFile("tsig_verify10.wire")  
        self.commonVerifyChecks(self.tsig_verify_ctx,  
                                self.message.get_tsig_record(),  
                                self.received_data, TSIGError.BAD_SIG,  
                                TSIGContext.STATE_RECEIVED_REQUEST)  
  
exitif __name__ == '__main__':  
    unittest.main()  
                
             |