- CitrineOS core extracted (CSMS OCPP 2.0.1) - OpenOCPP extracted (firmware OCPP 1.6J/2.0.1) - ShapeShifter library installed (pip install -e) - ShapeShifter specification extracted - EVerest extracted TODO updated with progress
253 lines
9.8 KiB
Python
253 lines
9.8 KiB
Python
import sched
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from queue import Queue
|
|
from threading import Event, Thread
|
|
from uuid import uuid4
|
|
|
|
import requests
|
|
|
|
from .. import transport
|
|
from ..exceptions import ClientTransportException
|
|
from ..logging import logger
|
|
from ..oauth import OAuthClient, PassthroughOAuthClient
|
|
from ..uftp import (
|
|
PayloadMessage,
|
|
SignedMessage,
|
|
TestMessage,
|
|
TestMessageResponse,
|
|
UsefRole,
|
|
)
|
|
|
|
|
|
class ShapeshifterClient:
|
|
"""
|
|
Basis for all Shapeshifter client.
|
|
"""
|
|
|
|
sender_role: UsefRole
|
|
recipient_role: UsefRole
|
|
num_outgoing_workers = 10
|
|
num_delivery_attempts = 10
|
|
request_timeout = 30
|
|
exponential_retry_factor = 1.0
|
|
exponential_retry_base = 2.0
|
|
|
|
def __init__(
|
|
self,
|
|
sender_domain: str,
|
|
signing_key: str,
|
|
recipient_domain: str,
|
|
recipient_endpoint: str | None = None,
|
|
recipient_signing_key: str | None = None,
|
|
oauth_client: OAuthClient | None = None,
|
|
version: str = "3.1.0"
|
|
):
|
|
"""
|
|
Shapeshifter client class that allows you to initiate messages to a different party.
|
|
:param str sender_domain: your sender domain
|
|
:param str signing_key: your private signing key
|
|
:param str recipient_domain: the domain of the recipient
|
|
:param str recipient_endpoint: the full http endpoint URL of the recipient. If omitted,
|
|
will look up the endpoint using DNS.
|
|
:param str recipient_signing_key: the public signing key of the recipient. If omitted, will
|
|
look up the signing key using DNS.
|
|
:param OAuthClient oauth_client: Optional OAuth client instance for using oauth to authenticate outgoing messages.
|
|
:param str version: Version number for the shapeshfter protocol (3.0.0 or 3.1.0)
|
|
"""
|
|
if recipient_domain is None and recipient_endpoint is None:
|
|
raise ValueError(
|
|
"One of recipient_domain or recipient_endpoint must be provided."
|
|
)
|
|
|
|
if version not in ("3.0.0", "3.1.0"):
|
|
raise ValueError(f"'version' should be one of '3.0.0' or '3.1.0', not '{version}'")
|
|
|
|
self.version = version
|
|
self.sender_domain = sender_domain
|
|
self.signing_key = signing_key
|
|
self.recipient_domain = recipient_domain
|
|
self.recipient_endpoint = recipient_endpoint
|
|
self.recipient_signing_key = recipient_signing_key
|
|
|
|
# The outgoing queue and scheduler are used when queueing
|
|
# messages for delivery later. This allows the Shapeshifter
|
|
# UFTP client to handle message retries on an exponential
|
|
# time schedule, and delivers the result in the provided
|
|
# callback function.
|
|
self.outgoing_queue = Queue()
|
|
self.outgoing_workers = None
|
|
self.scheduler = sched.scheduler(time.monotonic, time.sleep)
|
|
self.scheduler_event = Event()
|
|
self.scheduler_thread = None
|
|
|
|
if oauth_client:
|
|
self.oauth_client = oauth_client
|
|
else:
|
|
self.oauth_client = PassthroughOAuthClient()
|
|
|
|
# ------------------------------------------------------------ #
|
|
# Test Messages #
|
|
# ------------------------------------------------------------ #
|
|
def send_test_message(self, message: TestMessage | None = None):
|
|
if message is None:
|
|
message = TestMessage()
|
|
|
|
return self._send_message(message)
|
|
|
|
def send_test_message_response(self, message: TestMessageResponse):
|
|
return self._send_message(message)
|
|
|
|
def _send_message(self, message: PayloadMessage) -> None:
|
|
"""
|
|
Perform an operation. This will take the message object, pack
|
|
it up into a SignedMessage, sign and seal it, and send it to
|
|
the recipient. It returns an unsealed PayloadMessageResponse
|
|
that contains the functional status of the request. The
|
|
actual response always arrives asynchronously on your service
|
|
(which runs separately).
|
|
"""
|
|
if not isinstance(message, PayloadMessage):
|
|
raise TypeError(
|
|
f"'message' must be a (subclass of) PayloadMessage, you provided: {type(message)}"
|
|
)
|
|
|
|
# Fill the PayloadMessage's fields that are common to all
|
|
# messages. We don't require the developer to fill these out
|
|
# every time they create any message, in order to reduce the
|
|
# duplicated code that would result in, and all of these
|
|
# properties can be calculated in the framework anyway.
|
|
message.version = self.version
|
|
message.sender_domain = self.sender_domain
|
|
message.recipient_domain = self.recipient_domain
|
|
message.time_stamp = (
|
|
message.time_stamp or datetime.now(timezone.utc).isoformat()
|
|
)
|
|
message.message_id = message.message_id or str(uuid4())
|
|
message.conversation_id = message.conversation_id or str(uuid4())
|
|
|
|
logger.info(f"The PayloadMessage is: {message}")
|
|
|
|
# Seal the message using our own private signing key
|
|
sealed_message = transport.seal_message(message, self.signing_key)
|
|
|
|
# Pack up the message into a SignedMessage
|
|
signed_message = SignedMessage(
|
|
sender_domain=self.sender_domain,
|
|
sender_role=self.sender_role,
|
|
body=sealed_message,
|
|
)
|
|
|
|
# Serialize the message into an XML blob
|
|
serialized_message = transport.to_xml(signed_message)
|
|
|
|
logger.debug(f"Sending message to {self.recipient_endpoint}:")
|
|
logger.debug(serialized_message)
|
|
|
|
# Send the request to the relevant endpoint
|
|
with self.oauth_client.ensure_authenticated():
|
|
response = requests.post(
|
|
self.recipient_endpoint,
|
|
data=serialized_message,
|
|
headers={
|
|
"Content-Type": "text/xml; charset=utf-8",
|
|
**self.oauth_client.auth_header
|
|
},
|
|
timeout=self.request_timeout,
|
|
)
|
|
if response.status_code != 200:
|
|
error_msg = (
|
|
f"Request to {self.recipient_endpoint} was not succesful: "
|
|
f"HTTP {response.status_code}: {response.text}"
|
|
)
|
|
logger.error(error_msg)
|
|
raise ClientTransportException(error_msg, response=response)
|
|
|
|
# ------------------------------------------------------------ #
|
|
# Methods related to queueing and scheduling outgoing #
|
|
# messages. #
|
|
# ------------------------------------------------------------ #
|
|
|
|
def _queue_message(self, message, callback, attempt=1):
|
|
self.outgoing_queue.put((message, callback, attempt))
|
|
self._run_outgoing_workers()
|
|
|
|
def _outgoing_worker(self):
|
|
while True:
|
|
message, callback, attempt = self.outgoing_queue.get()
|
|
try:
|
|
response = self._send_message(message)
|
|
except Exception as exc: # pylint: disable=broad-exception-caught
|
|
if attempt <= self.num_delivery_attempts:
|
|
# Reschedule with exponential backoff
|
|
delay_time = (
|
|
self.exponential_retry_factor
|
|
* self.exponential_retry_base**attempt
|
|
)
|
|
logger.warning(
|
|
f"Outgoing message {message.__class__.__name__} to "
|
|
f"{message.recipient_domain} could not be delivered "
|
|
f"due to a {exc.__class__.__name__}, will try again in {delay_time:.0f} seconds."
|
|
)
|
|
self.scheduler.enter(
|
|
delay=delay_time,
|
|
priority=1,
|
|
action=self._queue_message,
|
|
argument=((message, callback, attempt + 1)),
|
|
)
|
|
self._run_scheduler()
|
|
else:
|
|
logger.error(
|
|
f"Could not deliver {message.__class__.__name__} "
|
|
f"to {self.recipient_role} at {self.recipient_domain}, "
|
|
f"even after {self.num_delivery_attempts} attempts."
|
|
)
|
|
else:
|
|
try:
|
|
callback(response)
|
|
except Exception as err: # pylint: disable=broad-exception-caught
|
|
logger.error(
|
|
"There was an exception during the callback "
|
|
f"for a {message.__class__.__name__} message: "
|
|
f"{err.__class__.__name__}: {err}"
|
|
)
|
|
finally:
|
|
self.outgoing_queue.task_done()
|
|
|
|
def _run_scheduler(self):
|
|
"""
|
|
Make sure the scheduler thread is running and awake.
|
|
"""
|
|
if not self.scheduler_thread:
|
|
self.scheduler_thread = Thread(target=self._scheduler_thread, daemon=True)
|
|
self.scheduler_thread.start()
|
|
self.scheduler_event.set()
|
|
|
|
def _scheduler_thread(self):
|
|
"""
|
|
Intended to run the python scheduler in a background thread.
|
|
You can wake it up anytime by setting the scheduler event.
|
|
"""
|
|
while True:
|
|
self.scheduler_event.wait()
|
|
self.scheduler_event.clear()
|
|
self.scheduler.run()
|
|
|
|
def _run_outgoing_workers(self):
|
|
"""
|
|
Start up the outgoing queue workers.
|
|
"""
|
|
if not self.outgoing_workers:
|
|
self.outgoing_workers = [
|
|
Thread(target=self._outgoing_worker, daemon=True)
|
|
for _ in range(self.num_outgoing_workers)
|
|
]
|
|
for thread in self.outgoing_workers:
|
|
thread.start()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *args, **kwargs):
|
|
pass
|