# SPDX-License-Identifier: Apache-2.0 # Copyright 2020 - 2023 Pionix GmbH and Contributors to EVerest import asyncio import sys from pathlib import Path import threading import math from everest.framework import Module, RuntimeSession, log # fmt: off JOSEV_WORK_DIR = Path(__file__).parent / '../../3rd_party/josev' sys.path.append(JOSEV_WORK_DIR.as_posix()) from iso15118.evcc import EVCCHandler from iso15118.evcc.controller.simulator import SimEVController from iso15118.evcc.evcc_config import EVCCConfig from iso15118.evcc.everest import context as JOSEV_CONTEXT from iso15118.shared.exificient_exi_codec import ExificientEXICodec from iso15118.shared.settings import set_PKI_PATH, enable_tls_1_3 from utilities import ( setup_everest_logging, determine_network_interface, patch_josev_config ) setup_everest_logging() EVEREST_CERTS_SUB_DIR = 'certs' async def evcc_handler_main_loop(module_config: dict, exi_codec: ExificientEXICodec): """ Entrypoint function that starts the ISO 15118 code running on the EVCC (EV Communication Controller) """ iface = determine_network_interface(module_config['device']) evcc_config = EVCCConfig() patch_josev_config(evcc_config, module_config) await EVCCHandler( evcc_config=evcc_config, iface=iface, exi_codec=exi_codec, ev_controller=SimEVController(evcc_config), ).start() class PyEVJosevModule(): def __init__(self) -> None: self._es = JOSEV_CONTEXT.ev_state self._session = RuntimeSession() m = Module(self._session) log.update_process_name(m.info.id) self._setup = m.say_hello() etc_certs_path = m.info.paths.etc / EVEREST_CERTS_SUB_DIR set_PKI_PATH(str(etc_certs_path.resolve())) if self._setup.configs.module['enable_tls_1_3']: enable_tls_1_3() self._es.internet_service_needed = self._setup.configs.module['is_internet_service_needed'] self._es.all_service_details = self._setup.configs.module['request_all_service_details'] self._es.all_vas_services = self._setup.configs.module['select_all_vas_services'] # setup publishing callback def publish_callback(variable_name: str, value: any): m.publish_variable('ev', variable_name, value) # set publish callback for context JOSEV_CONTEXT.set_publish_callback(publish_callback) # setup handlers for cmd in m.implementations['ev'].commands: m.implement_command( 'ev', cmd, getattr(self, f'_handler_{cmd}')) # init ready event self._ready_event = threading.Event() self._mod = m self._mod.init_done(self._ready) def start_evcc_handler(self): exi_codec = ExificientEXICodec() try: while True: self._ready_event.wait() try: asyncio.run(evcc_handler_main_loop(self._setup.configs.module, exi_codec)) self._mod.publish_variable('ev', 'v2g_session_finished', None) except KeyboardInterrupt: log.debug("SECC program terminated manually") break finally: self._ready_event.clear() finally: exi_codec.shutdown() def _ready(self): log.debug("ready!") # implementation handlers def _handler_start_charging(self, args) -> bool: self._es.DepartureTime = args['DepartureTime'] self._es.EAmount_kWh = args['EAmount'] self._es.EnergyTransferMode = args['EnergyTransferMode'] if "payment_option" in args['SelectedPaymentOption']: self._es.PaymentOption = args['SelectedPaymentOption']['payment_option'] if "enforce_payment_option" in args['SelectedPaymentOption']: self._es.enforce_payment_option = args['SelectedPaymentOption']['enforce_payment_option'] self._ready_event.set() return True def _handler_stop_charging(self, args): self._es.StopCharging = True def _handler_pause_charging(self, args): self._es.Pause = True def _handler_set_fault(self, args): pass def _handler_set_dc_params(self, args): parameters = args['EvParameters'] self._es.dc_max_current_limit = parameters['max_current_limit'] self._es.dc_max_power_limit = parameters['max_power_limit'] self._es.dc_max_voltage_limit = parameters['max_voltage_limit'] self._es.dc_energy_capacity = parameters['energy_capacity'] self._es.dc_target_current = parameters['target_current'] self._es.dc_target_voltage = parameters['target_voltage'] def _handler_set_bpt_dc_params(self, args): parameters = args['EvBPTParameters'] self._es.dc_discharge_max_current_limit = parameters["discharge_max_current_limit"] self._es.dc_discharge_max_power_limit = parameters['discharge_max_power_limit'] self._es.dc_discharge_target_current = parameters['discharge_target_current'] self._es.minimal_soc = parameters["discharge_minimal_soc"] def _handler_enable_sae_j2847_v2g_v2h(self, args): self._es.SAEJ2847_V2H_V2G_Active = True def _handler_update_soc(self, args): self._es.actual_soc = math.floor(args['SoC']) py_ev_josev = PyEVJosevModule() py_ev_josev.start_evcc_handler()