- CitrineOS core extracted (CSMS OCPP 2.0.1) - OpenOCPP extracted (firmware OCPP 1.6J/2.0.1) - ShapeShifter library installed (pip install -e) - ShapeShifter specification extracted - EVerest extracted TODO updated with progress
70 lines
2.1 KiB
Python
70 lines
2.1 KiB
Python
import re
|
|
from base64 import b64encode
|
|
from datetime import datetime, timezone
|
|
from unittest.mock import patch
|
|
|
|
import dns.resolver
|
|
import pytest
|
|
from nacl.bindings import crypto_sign, crypto_sign_keypair
|
|
|
|
from shapeshifter_uftp import TestMessage as UFTPTestMessage
|
|
from shapeshifter_uftp.exceptions import InvalidSignatureException, SchemaException
|
|
from shapeshifter_uftp.transport import get_key, seal_message, unseal_message
|
|
|
|
public, private = crypto_sign_keypair()
|
|
public_base64 = b64encode(public)
|
|
private_base64 = b64encode(private)
|
|
|
|
|
|
def test_seal_unseal_message():
|
|
msg = UFTPTestMessage(
|
|
version="3.1.0",
|
|
sender_domain="dso.dev",
|
|
recipient_domain="cro.dev",
|
|
time_stamp=datetime.now(timezone.utc).isoformat(),
|
|
message_id="1234",
|
|
conversation_id="1234"
|
|
)
|
|
msg.version = "3.1.0"
|
|
sealed = seal_message(msg, private_base64)
|
|
unsealed = unseal_message(sealed, public_base64)
|
|
assert msg == unsealed
|
|
|
|
|
|
def test_tampered_message():
|
|
msg = UFTPTestMessage(
|
|
version="3.1.0",
|
|
sender_domain="dso.dev",
|
|
recipient_domain="cro.dev",
|
|
time_stamp=datetime.now(timezone.utc).isoformat(),
|
|
message_id="1234",
|
|
conversation_id="1234"
|
|
)
|
|
msg.version = "3.1.0"
|
|
sealed = seal_message(msg, private_base64)
|
|
sealed = bytes([sealed[0] + 1]) + sealed[1:]
|
|
with pytest.raises((InvalidSignatureException, SchemaException)):
|
|
unseal_message(sealed, public_base64)
|
|
|
|
|
|
def test_invalid_message():
|
|
msg = '<?xml version="1.0" encoding="UTF-8"?><Hello />'.encode()
|
|
sealed = crypto_sign(msg, private)
|
|
with pytest.raises(SchemaException):
|
|
unsealed = unseal_message(sealed, public_base64)
|
|
|
|
|
|
def test_seal_invalid_type():
|
|
msg = "Hello"
|
|
with pytest.raises(TypeError):
|
|
sealed = seal_message(msg, private_base64)
|
|
|
|
|
|
def patched_resolve(*args, **kwargs):
|
|
return dns.resolver.resolve_at("1.1.1.1", *args, **kwargs)
|
|
|
|
@patch.object(dns.resolver, 'resolve', new=patched_resolve)
|
|
def test_get_key():
|
|
key = get_key("enexis.dev", "dso")
|
|
assert re.match(r'[0-9A-Za-z+/=]{44}', key)
|