Files
Eric F d398a6ced2 Add extracted tools: CitrineOS, OpenOCPP, ShapeShifter
- CitrineOS core extracted (CSMS OCPP 2.0.1)
- OpenOCPP extracted (firmware OCPP 1.6J/2.0.1)
- ShapeShifter library installed (pip install -e)
- ShapeShifter specification extracted
- EVerest extracted

TODO updated with progress
2026-06-08 00:38:27 -04:00

157 lines
5.3 KiB
Python

# SPDX-License-Identifier: Apache-2.0
# Copyright 2020 - 2023 Pionix GmbH and Contributors to EVerest
import asyncio
import sys
from pathlib import Path
import threading
import math
from everest.framework import Module, RuntimeSession, log
# fmt: off
JOSEV_WORK_DIR = Path(__file__).parent / '../../3rd_party/josev'
sys.path.append(JOSEV_WORK_DIR.as_posix())
from iso15118.evcc import EVCCHandler
from iso15118.evcc.controller.simulator import SimEVController
from iso15118.evcc.evcc_config import EVCCConfig
from iso15118.evcc.everest import context as JOSEV_CONTEXT
from iso15118.shared.exificient_exi_codec import ExificientEXICodec
from iso15118.shared.settings import set_PKI_PATH, enable_tls_1_3
from utilities import (
setup_everest_logging,
determine_network_interface,
patch_josev_config
)
setup_everest_logging()
EVEREST_CERTS_SUB_DIR = 'certs'
async def evcc_handler_main_loop(module_config: dict, exi_codec: ExificientEXICodec):
"""
Entrypoint function that starts the ISO 15118 code running on
the EVCC (EV Communication Controller)
"""
iface = determine_network_interface(module_config['device'])
evcc_config = EVCCConfig()
patch_josev_config(evcc_config, module_config)
await EVCCHandler(
evcc_config=evcc_config,
iface=iface,
exi_codec=exi_codec,
ev_controller=SimEVController(evcc_config),
).start()
class PyEVJosevModule():
def __init__(self) -> None:
self._es = JOSEV_CONTEXT.ev_state
self._session = RuntimeSession()
m = Module(self._session)
log.update_process_name(m.info.id)
self._setup = m.say_hello()
etc_certs_path = m.info.paths.etc / EVEREST_CERTS_SUB_DIR
set_PKI_PATH(str(etc_certs_path.resolve()))
if self._setup.configs.module['enable_tls_1_3']:
enable_tls_1_3()
self._es.internet_service_needed = self._setup.configs.module['is_internet_service_needed']
self._es.all_service_details = self._setup.configs.module['request_all_service_details']
self._es.all_vas_services = self._setup.configs.module['select_all_vas_services']
# setup publishing callback
def publish_callback(variable_name: str, value: any):
m.publish_variable('ev', variable_name, value)
# set publish callback for context
JOSEV_CONTEXT.set_publish_callback(publish_callback)
# setup handlers
for cmd in m.implementations['ev'].commands:
m.implement_command(
'ev', cmd, getattr(self, f'_handler_{cmd}'))
# init ready event
self._ready_event = threading.Event()
self._mod = m
self._mod.init_done(self._ready)
def start_evcc_handler(self):
exi_codec = ExificientEXICodec()
try:
while True:
self._ready_event.wait()
try:
asyncio.run(evcc_handler_main_loop(self._setup.configs.module, exi_codec))
self._mod.publish_variable('ev', 'v2g_session_finished', None)
except KeyboardInterrupt:
log.debug("SECC program terminated manually")
break
finally:
self._ready_event.clear()
finally:
exi_codec.shutdown()
def _ready(self):
log.debug("ready!")
# implementation handlers
def _handler_start_charging(self, args) -> bool:
self._es.DepartureTime = args['DepartureTime']
self._es.EAmount_kWh = args['EAmount']
self._es.EnergyTransferMode = args['EnergyTransferMode']
if "payment_option" in args['SelectedPaymentOption']:
self._es.PaymentOption = args['SelectedPaymentOption']['payment_option']
if "enforce_payment_option" in args['SelectedPaymentOption']:
self._es.enforce_payment_option = args['SelectedPaymentOption']['enforce_payment_option']
self._ready_event.set()
return True
def _handler_stop_charging(self, args):
self._es.StopCharging = True
def _handler_pause_charging(self, args):
self._es.Pause = True
def _handler_set_fault(self, args):
pass
def _handler_set_dc_params(self, args):
parameters = args['EvParameters']
self._es.dc_max_current_limit = parameters['max_current_limit']
self._es.dc_max_power_limit = parameters['max_power_limit']
self._es.dc_max_voltage_limit = parameters['max_voltage_limit']
self._es.dc_energy_capacity = parameters['energy_capacity']
self._es.dc_target_current = parameters['target_current']
self._es.dc_target_voltage = parameters['target_voltage']
def _handler_set_bpt_dc_params(self, args):
parameters = args['EvBPTParameters']
self._es.dc_discharge_max_current_limit = parameters["discharge_max_current_limit"]
self._es.dc_discharge_max_power_limit = parameters['discharge_max_power_limit']
self._es.dc_discharge_target_current = parameters['discharge_target_current']
self._es.minimal_soc = parameters["discharge_minimal_soc"]
def _handler_enable_sae_j2847_v2g_v2h(self, args):
self._es.SAEJ2847_V2H_V2G_Active = True
def _handler_update_soc(self, args):
self._es.actual_soc = math.floor(args['SoC'])
py_ev_josev = PyEVJosevModule()
py_ev_josev.start_evcc_handler()