// SPDX-FileCopyrightText: 2026 Contributors to the CitrineOS Project // // SPDX-License-Identifier: Apache-2.0 import type { AbstractModule, BootstrapConfig, IApiAuthProvider, IAuthorizer, ICache, IFileStorage, IMessageHandler, IMessageRouter, IModule, IModuleApi, SystemConfig, } from '@citrineos/base'; import { Ajv, ConfigStoreFactory, EventGroup, eventGroupFromString, type IAuthenticator, OCPPValidator, OCPPVersion, } from '@citrineos/base'; import type { ISmartCharging } from '@citrineos/core'; import { AdminApi, apiAuthPluginFp, Authenticator, BasicAuthenticationFilter, BrokerAwareMessageSender, CertificateAuthorityService, CertificatesDataApi, CertificatesModule, CertificatesOcpp2Api, ConfigurationDataApi, ConfigurationModule, ConfigurationOcpp16Api, ConfigurationOcpp2Api, ConnectedStationFilter, DefaultDrizzleInstance, EVDriverDataApi, EVDriverModule, EVDriverOcpp16Api, EVDriverOcpp2Api, IdGenerator, initSwagger, InternalSmartCharging, LocalBypassAuthProvider, MemoryCache, MessageRouterImpl, MonitoringDataApi, MonitoringModule, MonitoringOcpp2Api, NetworkProfileFilter, OIDCAuthProvider, RabbitMQChannelManager, RabbitMQConnectionManager, RabbitMqReceiver, RabbitMqSender, RealTimeAuthorizer, RedisCache, ReportingModule, ReportingOcpp16Api, ReportingOcpp2Api, RepositoryStore, sequelize, Sequelize, SmartChargingModule, SmartChargingOcpp16Api, SmartChargingOcpp2Api, TenantDataApi, TenantModule, TransactionsDataApi, TransactionsModule, TransactionsOcpp2Api, UnknownStationFilter, WebhookDispatcher, WebsocketNetworkConnection, } from '@citrineos/core'; import cors from '@fastify/cors'; import { type JsonSchemaToTsProvider } from '@fastify/type-provider-json-schema-to-ts'; import type { FastifyInstance, FastifyReply } from 'fastify'; import fastify from 'fastify'; import type { FastifyRouteSchemaDef, FastifySchemaCompiler, FastifyValidationResult, } from 'fastify/types/schema.js'; import type { RedisClientOptions } from 'redis'; import { type ILogObj, Logger } from 'tslog'; import { type HealthCheckResult, HealthCheckService } from './health/HealthCheckService.js'; export class CitrineOSServer { /** * Fields */ protected readonly _config: BootstrapConfig & SystemConfig; protected readonly _logger: Logger; protected readonly _server: FastifyInstance; protected readonly _cache: ICache; protected readonly _ajv: Ajv.Ajv; protected readonly _ocppValidator: OCPPValidator; protected readonly _fileStorage: IFileStorage; protected readonly modules: IModule[] = []; protected readonly apis: IModuleApi[] = []; protected _sequelizeInstance!: Sequelize; protected host?: string; protected port?: number; protected eventGroup?: EventGroup; protected _authenticator?: IAuthenticator; protected _router?: IMessageRouter; protected _networkConnection?: WebsocketNetworkConnection; protected _repositoryStore!: RepositoryStore; protected _idGenerator!: IdGenerator; protected _certificateAuthorityService!: CertificateAuthorityService; protected _smartChargingService!: ISmartCharging; protected _realTimeAuthorizer!: IAuthorizer; protected readonly appName: string; protected _isShuttingDown = false; protected _connectionManager?: RabbitMQConnectionManager; protected _channelManager?: RabbitMQChannelManager; protected _healthCheckService?: HealthCheckService; /** * Constructor for the class. * * @param {EventGroup} appName - app type * @param {BootstrapConfig} bootstrapConfig * @param {SystemConfig} systemConfig - config * @param {FastifyInstance} server - optional Fastify server instance * @param {Ajv} ajv - optional Ajv JSON schema validator instance * @param {ICache} cache - cache * @param {IFileStorage} _fileStorage - file storage */ // todo rename event group to type constructor( appName: string, bootstrapConfig: BootstrapConfig, systemConfig: SystemConfig, server?: FastifyInstance, ajv?: Ajv.Ajv, cache?: ICache, _fileStorage?: IFileStorage, ) { // TODO: Create and export config schemas for each util module, such as amqp, redis, etc, to avoid passing them possibly invalid configuration if (!systemConfig.util.messageBroker.amqp) { throw new Error('This server implementation requires amqp configuration for rabbitMQ.'); } this.appName = appName; this._config = { ...bootstrapConfig, ...systemConfig }; this._server = server || fastify().withTypeProvider(); // enable cors (this._server as any).register(cors, { origin: true, // This can be customized to specify allowed origins methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'], // Specify allowed HTTP methods }); console.log('Bootstrap configuration loaded'); // Add health check this.initHealthCheck(); // Create Ajv JSON schema validator instance this._ajv = OCPPValidator.createServerAjvInstance(ajv); // Initialize parent logger this._logger = this.initLogger(); // Create a separate OCPPValidator with its own Ajv instance for OCPP message validation. // This must be distinct from _ajv: OCPP messages are parsed JSON (no coercion needed), // whereas _ajv coerces types for Fastify's HTTP schema compilation. this._ocppValidator = new OCPPValidator(this._logger); // Set cache implementation this._cache = this.initCache(cache); // Initialize Swagger if enabled this.initSwagger() .then() .catch((error) => this._logger.error('Could not initialize swagger', { error })); // Register API authentication this.registerApiAuth(); // Initialize File Access Implementation this._fileStorage = ConfigStoreFactory.getInstance(); // Register AJV for schema validation this.registerAjv(); // Initialize repository store this.initRepositoryStore(); this.initIdGenerator(); this.initCertificateAuthorityService(); this.initSmartChargingService(); this.initRealTimeAuthorizer(); } async initialize(): Promise { await this.initMessageBrokerConnection(); // Initialize module & API // Always initialize API after SwaggerUI await this.initSystem(); // Initialize database await this.initDb(); this.initHealthCheckService(); // Set up shutdown handlers for (const event of ['SIGINT', 'SIGTERM', 'SIGQUIT']) { process.on(event, () => { this._logger.info(`Received ${event}`); this.shutdown().catch((err) => { console.error('Shutdown error:', err); process.exit(1); }); }); } } async shutdown() { if (this._isShuttingDown) return; this._isShuttingDown = true; this._logger.info('Shutdown initiated'); this._healthCheckService?.shutdown(); const forceExit = setTimeout(() => { console.log('Shutdown timed out, forcing exit'); process.exit(1); }, this._config.shutdownGracePeriodSeconds * 1000); // Default is 30 seconds forceExit.unref(); this._logger.info('Closing HTTP server...'); await new Promise((resolve, reject) => { try { this._server.close(() => resolve()); } catch (error) { reject(error); } }); this._logger.info('Closing WebSocket servers...'); await this._networkConnection?.shutdown(); this._logger.info('Closing RabbitMQ connections...'); await this._channelManager?.closeAll(); await this._connectionManager?.close(); this._logger.info('Closing PostgreSQL connections...'); await this._sequelizeInstance.connectionManager.close(); this._logger.info('Shutdown complete'); process.exitCode = 0; } async run(): Promise { try { await this.initialize(); await this._syncWebsocketConfig(); await this._server .listen({ host: this.host, port: this.port, }) .then((address) => { this._logger?.info(`Server listening at ${address}`); }) .catch((error) => { this._logger?.error(error); process.exit(1); }); // TODO Push config to microservices } catch (error) { await Promise.reject(error); } } protected async _syncWebsocketConfig() { for (const websocketServerConfig of this._config.util.networkConnection.websocketServers) { await this._repositoryStore.serverNetworkProfileRepository.upsertServerNetworkProfile( websocketServerConfig, this._config.maxCallLengthSeconds, ); } } async initMessageBrokerConnection(): Promise { const url = this._config.util.messageBroker.amqp?.url; if (!url) { throw new Error('RabbitMQ URL is not configured'); } this._connectionManager = new RabbitMQConnectionManager(this._config.maxReconnectDelay, url); this._channelManager = new RabbitMQChannelManager(this._connectionManager); await this._connectionManager.connect(); } protected _createSender(): BrokerAwareMessageSender { const exchange = this._config.util.messageBroker.amqp?.exchange; if (!exchange) { throw new Error('RabbitMQ exchange is not configured'); } if (!this._connectionManager || !this._channelManager) { throw new Error('RabbitMQ connection or channel manager is not initialized'); } return new BrokerAwareMessageSender( new RabbitMqSender(exchange, this._connectionManager, this._channelManager, this._logger), this._connectionManager, this._config.maxCallLengthSeconds, this._logger, ); } protected _createHandler(): IMessageHandler { return new RabbitMqReceiver(this._config, this._channelManager!, this._logger); } protected initHealthCheck() { const respond = (reply: FastifyReply, result: HealthCheckResult) => reply .code(result.status === 'pass' ? 200 : 503) .header('Content-Type', 'application/health+json') .send(result); const liveness = async (_req: any, reply: FastifyReply) => respond( reply, this._healthCheckService ? this._healthCheckService.checkLiveness() : { status: 'pass', checks: {} }, ); const readiness = async (_req: any, reply: FastifyReply) => { if (!this._healthCheckService) { return respond(reply, { status: 'fail', checks: { init: { status: 'fail', error: 'not yet initialized' } }, }); } return respond(reply, await this._healthCheckService.checkReadiness()); }; this._server.get('/health', liveness); this._server.get('/health/live', liveness); this._server.get('/health/ready', readiness); } protected initHealthCheckService() { this._healthCheckService = new HealthCheckService( this._networkConnection, this._connectionManager, this._cache, this._sequelizeInstance, this._config.notReadyThresholdSeconds, this._logger, ); } protected initLogger() { const isCloud = process.env.DEPLOYMENT_TARGET === 'cloud'; const loggerSettings = { name: 'CitrineOS Logger', minLevel: this._config.logLevel, hideLogPositionForProduction: this._config.env === 'production', type: isCloud ? ('json' as const) : ('pretty' as const), }; return new Logger(loggerSettings); } protected async initDb() { await sequelize.DefaultSequelizeInstance.initializeSequelize(); if (process.env.CITRINEOS_USE_DRIZZLE_SECURITY_EVENT === 'true') { await DefaultDrizzleInstance.initialize(); } } protected initCache(cache?: ICache): ICache { if (cache) return cache; if (this._config.util.cache.redis) { const redisClientOptions: RedisClientOptions = 'url' in this._config.util.cache.redis ? { url: this._config.util.cache.redis.url } : { socket: { host: this._config.util.cache.redis.host, port: this._config.util.cache.redis.port, }, }; return new RedisCache(redisClientOptions, this._logger); } return new MemoryCache(); } protected async initSwagger() { if (this._config.util.swagger) { await initSwagger(this._config, this._server); } } protected registerAjv() { // todo type schema instead of any const fastifySchemaCompiler: FastifySchemaCompiler = ( routeSchema: FastifyRouteSchemaDef, ) => this._ajv?.compile(routeSchema.schema) as FastifyValidationResult; this._server.setValidatorCompiler(fastifySchemaCompiler); } protected registerApiAuth() { const authProvider = this.initApiAuthProvider(); this._server.register(apiAuthPluginFp, { provider: authProvider, options: { excludedRoutes: [ '/health', '/health/live', '/health/ready', '/docs', // API documentation ], debug: this._config.logLevel <= 2, // Enable debug logs in dev mode }, logger: this._logger, }); } protected initNetworkConnection() { this._authenticator = new Authenticator( new UnknownStationFilter( new sequelize.SequelizeLocationRepository(this._config, this._logger), this._logger, ), new ConnectedStationFilter(this._cache, this._logger), new NetworkProfileFilter( new sequelize.SequelizeDeviceModelRepository(this._config, this._logger), this._logger, ), new BasicAuthenticationFilter( new sequelize.SequelizeDeviceModelRepository(this._config, this._logger), this._logger, ), this._logger, ); const webhookDispatcher = new WebhookDispatcher( this._repositoryStore.ocppMessageRepository, this._repositoryStore.subscriptionRepository, this._logger, ); const routerSender = this._createSender(); this._router = new MessageRouterImpl( this._config, this._cache, routerSender, this._createHandler(), webhookDispatcher, async (_identifier: string, _message: string) => {}, this._logger, this._ocppValidator, this._repositoryStore.locationRepository, ); this._networkConnection = new WebsocketNetworkConnection( this._config, this._cache, this._authenticator, this._router, this._fileStorage, this._logger, this._repositoryStore.locationRepository.doesChargingStationExistByStationId.bind( this._repositoryStore.locationRepository, ), async (tenantId: number) => { const tenant = await this._repositoryStore.tenantRepository.readByKey(tenantId, tenantId); return tenant?.maxChargingStations ?? null; }, this._connectionManager, ); routerSender.onCallTimeout = (ocppConnectionName, tenantId) => this._networkConnection!.disconnect(tenantId, ocppConnectionName).then(() => undefined); this._router.networkHook = this._networkConnection.bindNetworkHook(); this.apis.push( new AdminApi( this._router, this._networkConnection, this._server, this._config, this._logger, this._repositoryStore.subscriptionRepository, this._repositoryStore.serverNetworkProfileRepository, ), ); } protected async initHandlersAndAddModule(module: AbstractModule) { await module.initHandlers(); this.modules.push(module); } protected async initAllModules() { if (this._config.modules.certificates) { await this.initCertificatesModule(); } if (this._config.modules.configuration) { await this.initConfigurationModule(); } if (this._config.modules.evdriver) { await this.initEVDriverModule(); } if (this._config.modules.monitoring) { await this.initMonitoringModule(); } if (this._config.modules.reporting) { await this.initReportingModule(); } if (this._config.modules.smartcharging) { await this.initSmartChargingModule(); } if (this._config.modules.transactions) { await this.initTransactionsModule(); } if (this._config.modules.tenant) { await this.initTenantModule(); } } protected initApiAuthProvider(): IApiAuthProvider { this._logger.info('Initializing API authentication provider,', this._config.util.authProvider); if (this._config.util.authProvider.oidc) { return new OIDCAuthProvider(this._config.util.authProvider.oidc, this._logger); } else if (this._config.util.authProvider.localByPass) { return new LocalBypassAuthProvider(this._logger); } else { throw new Error('No valid API authentication provider configured'); } } protected async initCertificatesModule() { const module = new CertificatesModule( this._config, this._cache, this._createSender(), this._createHandler(), this._fileStorage, this._networkConnection!, this._logger, this._ocppValidator, this._repositoryStore.deviceModelRepository, this._repositoryStore.certificateRepository, this._repositoryStore.installedCertificateRepository, this._repositoryStore.installCertificateAttemptRepository, this._repositoryStore.deleteCertificateAttemptRepository, this._repositoryStore.ocppMessageRepository, ); await this.initHandlersAndAddModule(module); this.apis.push( new CertificatesOcpp2Api(module, this._server, OCPPVersion.OCPP2_0_1, this._logger), new CertificatesOcpp2Api(module, this._server, OCPPVersion.OCPP2_1, this._logger), new CertificatesDataApi( module, this._server, this._fileStorage, this._config.util.networkConnection.websocketServers, this._logger, ), ); } protected async initConfigurationModule() { const module = new ConfigurationModule( this._config, this._cache, this._createSender(), this._createHandler(), this._logger, this._ocppValidator, this._repositoryStore.bootRepository, this._repositoryStore.deviceModelRepository, this._repositoryStore.messageInfoRepository, this._repositoryStore.locationRepository, this._repositoryStore.changeConfigurationRepository, this._repositoryStore.ocppMessageRepository, this._idGenerator, ); await this.initHandlersAndAddModule(module); this.apis.push( new ConfigurationOcpp2Api(module, this._server, OCPPVersion.OCPP2_0_1, this._logger), new ConfigurationOcpp2Api(module, this._server, OCPPVersion.OCPP2_1, this._logger), new ConfigurationOcpp16Api(module, this._server, this._logger), new ConfigurationDataApi(module, this._server, this._logger), ); } protected async initEVDriverModule() { const module = new EVDriverModule( this._config, this._cache, this._createSender(), this._createHandler(), this._logger, this._ocppValidator, this._repositoryStore.authorizationRepository, this._repositoryStore.localAuthListRepository, this._repositoryStore.deviceModelRepository, this._repositoryStore.tariffRepository, this._repositoryStore.transactionEventRepository, this._repositoryStore.chargingProfileRepository, this._repositoryStore.reservationRepository, this._repositoryStore.ocppMessageRepository, this._repositoryStore.locationRepository, this._certificateAuthorityService, this._realTimeAuthorizer, [], this._idGenerator, ); await this.initHandlersAndAddModule(module); this.apis.push( new EVDriverOcpp2Api(module, this._server, OCPPVersion.OCPP2_0_1, this._logger), new EVDriverOcpp2Api(module, this._server, OCPPVersion.OCPP2_1, this._logger), new EVDriverOcpp16Api(module, this._server, this._logger), new EVDriverDataApi(module, this._server, this._logger), ); } protected async initMonitoringModule() { const module = new MonitoringModule( this._config, this._cache, this._createSender(), this._createHandler(), this._logger, this._ocppValidator, this._repositoryStore.deviceModelRepository, this._repositoryStore.variableMonitoringRepository, this._repositoryStore.ocppMessageRepository, this._idGenerator, ); await this.initHandlersAndAddModule(module); this.apis.push( new MonitoringOcpp2Api(module, this._server, OCPPVersion.OCPP2_0_1, this._logger), new MonitoringOcpp2Api(module, this._server, OCPPVersion.OCPP2_1, this._logger), new MonitoringDataApi(module, this._server, this._logger), ); } protected async initReportingModule() { const module = new ReportingModule( this._config, this._cache, this._createSender(), this._createHandler(), this._logger, this._ocppValidator, this._repositoryStore.deviceModelRepository, this._repositoryStore.securityEventRepository, this._repositoryStore.variableMonitoringRepository, ); await this.initHandlersAndAddModule(module); this.apis.push( new ReportingOcpp2Api(module, this._server, OCPPVersion.OCPP2_0_1, this._logger), new ReportingOcpp2Api(module, this._server, OCPPVersion.OCPP2_1, this._logger), new ReportingOcpp16Api(module, this._server, this._logger), ); } protected async initSmartChargingModule() { const module = new SmartChargingModule( this._config, this._cache, this._createSender(), this._createHandler(), this._logger, this._ocppValidator, this._repositoryStore.transactionEventRepository, this._repositoryStore.deviceModelRepository, this._repositoryStore.chargingProfileRepository, this._smartChargingService, this._idGenerator, ); await this.initHandlersAndAddModule(module); this.apis.push( new SmartChargingOcpp2Api(module, this._server, OCPPVersion.OCPP2_0_1, this._logger), new SmartChargingOcpp2Api(module, this._server, OCPPVersion.OCPP2_1, this._logger), new SmartChargingOcpp16Api(module, this._server, this._logger), ); } protected async initTransactionsModule() { const module = new TransactionsModule( this._config, this._cache, this._fileStorage, this._createSender(), this._createHandler(), this._logger, this._ocppValidator, this._repositoryStore.transactionEventRepository, this._repositoryStore.authorizationRepository, this._repositoryStore.deviceModelRepository, this._repositoryStore.componentRepository, this._repositoryStore.locationRepository, this._repositoryStore.tariffRepository, this._repositoryStore.reservationRepository, this._repositoryStore.ocppMessageRepository, this._realTimeAuthorizer, ); await this.initHandlersAndAddModule(module); this.apis.push( new TransactionsOcpp2Api(module, this._server, OCPPVersion.OCPP2_0_1, this._logger), new TransactionsOcpp2Api(module, this._server, OCPPVersion.OCPP2_1, this._logger), new TransactionsDataApi(module, this._server, this._logger), ); } protected async initTenantModule() { const module = new TenantModule( this._config, this._cache, this._createSender(), this._createHandler(), this._logger, this._ocppValidator, this._repositoryStore.tenantRepository, ); await this.initHandlersAndAddModule(module); this.apis.push(new TenantDataApi(module, this._server, this._logger)); this._logger.info('Tenant module initialized'); } protected async initModule(eventGroup = this.eventGroup) { this._logger.info(`Initializing module: ${this.appName}`); switch (eventGroup) { case EventGroup.Certificates: await this.initCertificatesModule(); break; case EventGroup.Configuration: await this.initConfigurationModule(); break; case EventGroup.EVDriver: await this.initEVDriverModule(); break; case EventGroup.Monitoring: await this.initMonitoringModule(); break; case EventGroup.Reporting: await this.initReportingModule(); break; case EventGroup.SmartCharging: await this.initSmartChargingModule(); break; case EventGroup.Transactions: await this.initTransactionsModule(); break; case EventGroup.Tenant: await this.initTenantModule(); break; default: throw new Error('Unhandled module type: ' + this.appName); } } protected async initSystem() { this.eventGroup = eventGroupFromString(this.appName); this.host = this._config.centralSystem.host; this.port = this._config.centralSystem.port; if (this.eventGroup === EventGroup.All) { this._logger.info('Initializing in ALL mode: WebSocket server and all modules'); this.initNetworkConnection(); await this.initAllModules(); } else if (this.eventGroup === EventGroup.Router) { this._logger.info('Initializing in ROUTER mode: WebSocket server, no modules'); // OCPP Router only: WebSocket server, no modules this.initNetworkConnection(); } else if (this.eventGroup === EventGroup.Modules) { // All modules, no WebSocket server this._logger.info('Initializing in MODULES mode: all modules without NetworkConnection'); await this.initAllModules(); } else { await this.initModule(); } } protected initRepositoryStore() { this._sequelizeInstance = sequelize.DefaultSequelizeInstance.getInstance( this._config, this._logger, ); this._repositoryStore = new RepositoryStore( this._config, this._logger, this._sequelizeInstance, ); } protected initIdGenerator() { this._idGenerator = new IdGenerator(this._repositoryStore.chargingStationSequenceRepository); } protected initCertificateAuthorityService() { this._certificateAuthorityService = new CertificateAuthorityService( this._config, this._cache, this._logger, undefined, undefined, this._fileStorage, ); } protected initSmartChargingService() { this._smartChargingService = new InternalSmartCharging( this._repositoryStore.chargingProfileRepository, ); } protected initRealTimeAuthorizer() { this._realTimeAuthorizer = new RealTimeAuthorizer( this._repositoryStore.locationRepository, this._config, this._logger, ); } }