import re from base64 import b64encode from datetime import datetime, timezone from unittest.mock import patch import dns.resolver import pytest from nacl.bindings import crypto_sign, crypto_sign_keypair from shapeshifter_uftp import TestMessage as UFTPTestMessage from shapeshifter_uftp.exceptions import InvalidSignatureException, SchemaException from shapeshifter_uftp.transport import get_key, seal_message, unseal_message public, private = crypto_sign_keypair() public_base64 = b64encode(public) private_base64 = b64encode(private) def test_seal_unseal_message(): msg = UFTPTestMessage( version="3.1.0", sender_domain="dso.dev", recipient_domain="cro.dev", time_stamp=datetime.now(timezone.utc).isoformat(), message_id="1234", conversation_id="1234" ) msg.version = "3.1.0" sealed = seal_message(msg, private_base64) unsealed = unseal_message(sealed, public_base64) assert msg == unsealed def test_tampered_message(): msg = UFTPTestMessage( version="3.1.0", sender_domain="dso.dev", recipient_domain="cro.dev", time_stamp=datetime.now(timezone.utc).isoformat(), message_id="1234", conversation_id="1234" ) msg.version = "3.1.0" sealed = seal_message(msg, private_base64) sealed = bytes([sealed[0] + 1]) + sealed[1:] with pytest.raises((InvalidSignatureException, SchemaException)): unseal_message(sealed, public_base64) def test_invalid_message(): msg = ''.encode() sealed = crypto_sign(msg, private) with pytest.raises(SchemaException): unsealed = unseal_message(sealed, public_base64) def test_seal_invalid_type(): msg = "Hello" with pytest.raises(TypeError): sealed = seal_message(msg, private_base64) def patched_resolve(*args, **kwargs): return dns.resolver.resolve_at("1.1.1.1", *args, **kwargs) @patch.object(dns.resolver, 'resolve', new=patched_resolve) def test_get_key(): key = get_key("enexis.dev", "dso") assert re.match(r'[0-9A-Za-z+/=]{44}', key)