Files
Eric F d398a6ced2 Add extracted tools: CitrineOS, OpenOCPP, ShapeShifter
- CitrineOS core extracted (CSMS OCPP 2.0.1)
- OpenOCPP extracted (firmware OCPP 1.6J/2.0.1)
- ShapeShifter library installed (pip install -e)
- ShapeShifter specification extracted
- EVerest extracted

TODO updated with progress
2026-06-08 00:38:27 -04:00

352 lines
12 KiB
Python

import itertools
from base64 import b64encode
from concurrent.futures import Future
from nacl.bindings import crypto_sign_keypair
from shapeshifter_uftp import (
ShapeshifterAgrService,
ShapeshifterCroService,
ShapeshifterDsoService,
)
from shapeshifter_uftp.service.base_service import snake_case
AGR_DOMAIN = "agr.dev"
CRO_DOMAIN = "cro.dev"
DSO_DOMAIN = "dso.dev"
AGR_TEST_PORT = 9001
CRO_TEST_PORT = 9002
DSO_TEST_PORT = 9003
AGR_PUBLIC_KEY, AGR_PRIVATE_KEY = [b64encode(key).decode() for key in crypto_sign_keypair()]
CRO_PUBLIC_KEY, CRO_PRIVATE_KEY = [b64encode(key).decode() for key in crypto_sign_keypair()]
DSO_PUBLIC_KEY, DSO_PRIVATE_KEY = [b64encode(key).decode() for key in crypto_sign_keypair()]
def endpoint_lookup_function(domain, role):
if domain == "agr.dev":
return f"http://localhost:{AGR_TEST_PORT}/shapeshifter/api/v3/message"
elif domain == "cro.dev":
return f"http://localhost:{CRO_TEST_PORT}/shapeshifter/api/v3/message"
elif domain == "dso.dev":
return f"http://localhost:{DSO_TEST_PORT}/shapeshifter/api/v3/message"
def key_lookup_function(domain, role):
if domain == "agr.dev":
return AGR_PUBLIC_KEY
elif domain == "cro.dev":
return CRO_PUBLIC_KEY
elif domain == "dso.dev":
return DSO_PUBLIC_KEY
class DummyAgrService(ShapeshifterAgrService):
def __init__(self, oauth_lookup_function=None):
super().__init__(
sender_domain=AGR_DOMAIN,
signing_key=AGR_PRIVATE_KEY,
key_lookup_function=key_lookup_function,
endpoint_lookup_function=endpoint_lookup_function,
oauth_lookup_function=oauth_lookup_function,
port=AGR_TEST_PORT
)
self.request_futures = {
f"{stage}_{name}": Future()
for stage, name in itertools.product(
["process"],
[
name
for name in [
snake_case(message.__name__)
for message in self.acceptable_messages
]
],
)
}
def reset_futures(self, name):
self.request_futures[f"pre_process_{name}"] = Future()
self.request_futures[f"process_{name}"] = Future()
def process_flex_request(self, message):
self.request_futures["process_flex_request"].set_result(message)
def process_flex_order(self, message):
self.request_futures["process_flex_order"].set_result(message)
def process_flex_reservation_update(self, message):
self.request_futures["process_flex_reservation_update"].set_result(message)
def process_flex_settlement(self, message):
self.request_futures["process_flex_settlement"].set_result(message)
def process_flex_offer_revocation_response(self, message):
self.request_futures["process_flex_offer_revocation_response"].set_result(message)
def process_agr_portfolio_query_response(self, message):
self.request_futures["process_agr_portfolio_query_response"].set_result(message)
def process_agr_portfolio_update_response(self, message):
self.request_futures["process_agr_portfolio_update_response"].set_result(message)
def process_d_prognosis_response(self, message):
self.request_futures["process_d_prognosis_response"].set_result(message)
def process_flex_offer_response(self, message):
self.request_futures["process_flex_offer_response"].set_result(message)
def process_metering_response(self, message):
self.request_futures["process_metering_response"].set_result(message)
def process_test_message(self, message, sender_role):
self.request_futures["process_test_message"].set_result(message)
super().process_test_message(message, sender_role)
def process_test_message_response(self, message):
self.request_futures["process_test_message_response"].set_result(message)
super().process_test_message_response(message)
class DummyCroService(ShapeshifterCroService):
def __init__(self):
super().__init__(
sender_domain=CRO_DOMAIN,
signing_key=CRO_PRIVATE_KEY,
key_lookup_function=key_lookup_function,
endpoint_lookup_function=endpoint_lookup_function,
port=CRO_TEST_PORT
)
self.request_futures = {
f"{stage}_{name}": Future()
for stage, name in itertools.product(
["pre_process", "process"],
[
name
for name in [
snake_case(message.__name__)
for message in self.acceptable_messages
]
],
)
}
self.response_futures = {
name: Future()
for name in [
f"pre_process_{snake_case(message.__name__)}"
for message in self.acceptable_messages
]
}
def reset_futures(self, name):
self.request_futures[f"pre_process_{name}"] = Future()
self.request_futures[f"process_{name}"] = Future()
self.response_futures[f"pre_process_{name}"] = Future()
def process_agr_portfolio_query(self, message):
self.request_futures["process_agr_portfolio_query"].set_result(message)
def process_agr_portfolio_update(self, message):
self.request_futures["process_agr_portfolio_update"].set_result(message)
def process_dso_portfolio_query(self, message):
self.request_futures["process_dso_portfolio_query"].set_result(message)
def process_dso_portfolio_update(self, message):
self.request_futures["process_dso_portfolio_update"].set_result(message)
def process_test_message(self, message, sender_role):
self.request_futures["process_test_message"].set_result(message)
super().process_test_message(message, sender_role)
def process_test_message_response(self, message):
self.request_futures["process_test_message_response"].set_result(message)
super().process_test_message_response(message)
class DummyDsoService(ShapeshifterDsoService):
def __init__(self):
super().__init__(
sender_domain=DSO_DOMAIN,
signing_key=DSO_PRIVATE_KEY,
key_lookup_function=key_lookup_function,
endpoint_lookup_function=endpoint_lookup_function,
port=DSO_TEST_PORT
)
self.request_futures = {
f"{stage}_{name}": Future()
for stage, name in itertools.product(
["pre_process", "process"],
[
name
for name in [
snake_case(message.__name__)
for message in self.acceptable_messages
]
],
)
}
self.response_futures = {
name: Future()
for name in [
f"pre_process_{snake_case(message.__name__)}"
for message in self.acceptable_messages
]
}
def reset_futures(self, name):
self.request_futures[f"pre_process_{name}"] = Future()
self.request_futures[f"process_{name}"] = Future()
self.response_futures[f"pre_process_{name}"] = Future()
def process_flex_offer(self, message):
self.request_futures["process_flex_offer"].set_result(message)
def process_flex_order_response(self, message):
self.request_futures["process_flex_order_response"].set_result(message)
def process_d_prognosis(self, message):
self.request_futures["process_d_prognosis"].set_result(message)
def process_flex_offer_revocation(self, message):
self.request_futures["process_flex_offer_revocation"].set_result(message)
def process_flex_settlement_response(self, message):
self.request_futures["process_flex_settlement_response"].set_result(message)
def process_dso_portfolio_update_response(self, message):
self.request_futures["process_dso_portfolio_update_response"].set_result(message)
def process_dso_portfolio_query_response(self, message):
self.request_futures["process_dso_portfolio_query_response"].set_result(message)
def process_flex_request_response(self, message):
self.request_futures["process_flex_request_response"].set_result(message)
def process_flex_reservation_update_response(self, message):
self.request_futures["process_flex_reservation_update_response"].set_result(message)
def process_metering(self, message):
self.request_futures["process_metering"].set_result(message)
def process_test_message(self, message, sender_role):
self.request_futures["process_test_message"].set_result(message)
super().process_test_message(message, sender_role)
def process_test_message_response(self, message):
if self.request_futures["process_test_message_response"].done() is False:
self.request_futures["process_test_message_response"].set_result(message)
super().process_test_message_response(message)
class DefaultResponseAgrService(ShapeshifterAgrService):
def __init__(self):
super().__init__(
sender_domain=AGR_DOMAIN,
signing_key=AGR_PRIVATE_KEY,
key_lookup_function=key_lookup_function,
endpoint_lookup_function=endpoint_lookup_function,
port=AGR_TEST_PORT
)
def process_flex_request(self, message):
pass
def process_flex_order(self, message):
pass
def process_flex_reservation_update(self, message):
pass
def process_flex_settlement(self, message):
pass
def process_flex_offer_revocation_response(self, message):
pass
def process_agr_portfolio_query_response(self, message):
pass
def process_agr_portfolio_update_response(self, message):
pass
def process_d_prognosis_response(self, message):
pass
def process_flex_offer_response(self, message):
pass
def process_metering_response(self, message):
pass
class DefaultResponseCroService(ShapeshifterCroService):
def __init__(self):
super().__init__(
sender_domain=CRO_DOMAIN,
signing_key=CRO_PRIVATE_KEY,
key_lookup_function=key_lookup_function,
endpoint_lookup_function=endpoint_lookup_function,
port=CRO_TEST_PORT
)
def process_agr_portfolio_query(self, message):
pass
def process_agr_portfolio_update(self, message):
pass
def process_dso_portfolio_query(self, message):
pass
def process_dso_portfolio_update(self, message):
pass
class DefaultResponseDsoService(ShapeshifterDsoService):
def __init__(self):
super().__init__(
sender_domain=DSO_DOMAIN,
signing_key=DSO_PRIVATE_KEY,
key_lookup_function=key_lookup_function,
endpoint_lookup_function=endpoint_lookup_function,
port=DSO_TEST_PORT
)
def process_flex_offer(self, message):
pass
def process_flex_order_response(self, message):
pass
def process_d_prognosis(self, message):
pass
def process_flex_offer_revocation(self, message):
pass
def process_flex_settlement_response(self, message):
pass
def process_dso_portfolio_update_response(self, message):
pass
def process_dso_portfolio_query_response(self, message):
pass
def process_flex_request_response(self, message):
pass
def process_flex_reservation_update_response(self, message):
pass
def process_metering(self, message):
pass