update and merged with latest master

This commit is contained in:
Manendra Pal Singh
2025-12-15 10:09:39 +05:30
25 changed files with 1009 additions and 107 deletions

155
CONFIG.md
View File

@@ -189,35 +189,50 @@ log:
---
## Telemetry Configuration
## Application-Level Plugins Configuration
### `telemetry`
### `plugins`
**Type**: `object`
**Required**: No
**Description**: Application-level plugin configurations. These plugins apply to the entire application and are shared across all modules.
#### `plugins.otelsetup`
**Type**: `object`
**Required**: No
**Description**: OpenTelemetry configuration controlling whether the Prometheus exporter is enabled.
**Important**: This block is optional—omit it to run without telemetry. When present, the `/metrics` endpoint is exposed only if `enableMetrics: true`.
#### Parameters:
##### Parameters:
##### `enableMetrics`
**Type**: `boolean`
###### `id`
**Type**: `string`
**Required**: Yes
**Description**: Plugin identifier. Must be `"otelsetup"`.
###### `config`
**Type**: `object`
**Required**: Yes
**Description**: Plugin configuration parameters.
###### `config.enableMetrics`
**Type**: `string` (boolean)
**Required**: No
**Default**: `false`
**Description**: Enables metrics collection and the `/metrics` endpoint.
**Default**: `"true"`
**Description**: Enables metrics collection and the `/metrics` endpoint. Must be `"true"` or `"false"` as a string.
##### `serviceName`
###### `config.serviceName`
**Type**: `string`
**Required**: No
**Default**: `"beckn-onix"`
**Description**: Sets the `service.name` resource attribute.
##### `serviceVersion`
###### `config.serviceVersion`
**Type**: `string`
**Required**: No
**Description**: Sets the `service.version` resource attribute.
##### `environment`
###### `config.environment`
**Type**: `string`
**Required**: No
**Default**: `"development"`
@@ -225,16 +240,19 @@ log:
**Example - Enable Metrics** (matches `config/local-simple.yaml`):
```yaml
telemetry:
enableMetrics: true
serviceName: beckn-onix
serviceVersion: "1.0.0"
environment: "development"
plugins:
otelsetup:
id: otelsetup
config:
serviceName: "beckn-onix"
serviceVersion: "1.0.0"
enableMetrics: "true"
environment: "development"
```
### Accessing Metrics
When `telemetry.enableMetrics: true`, scrape metrics at:
When `plugins.otelsetup.config.enableMetrics: "true"`, scrape metrics at:
```
http://your-server:port/metrics
@@ -713,6 +731,109 @@ middleware:
---
#### 11. Reqmapper Plugin
**Purpose**: Transform Beckn payloads between protocol versions or shapes using JSONata before the request continues through the handler. Mount it inside the `middleware` list wherever translation is required.
```yaml
middleware:
- id: reqmapper
config:
role: bap # Use `bpp` when running inside a BPP handler
mappingsFile: ./config/mappings.yaml
```
**Parameters**:
- `role`: Required. Determines which JSONata expression is evaluated (`bapMappings` or `bppMappings`) for the current action.
- `mappingsFile`: Required. Absolute or relative path to a YAML file that contains the JSONata expressions for every action.
**Mapping file structure**:
```yaml
mappings:
<action-name>:
bapMappings: |
# JSONata expression applied when `role: bap`
bppMappings: |
# JSONata expression applied when `role: bpp`
```
Each action entry is optional—if no mapping exists for the current action, the original request body is passed through unchanged. JSONata expressions receive the entire Beckn request as input (`$`) and must return the full payload that should replace it.
**Sample mapping file**:
```yaml
mappings:
search:
bapMappings: |
{
"context": {
"action": "discover",
"version": "2.0.0",
"domain": "beckn.one:retail",
"bap_id": $.context.bap_id,
"bap_uri": $.context.bap_uri,
"transaction_id": $.context.transaction_id,
"message_id": $.context.message_id,
"timestamp": $.context.timestamp
},
"message": {
"filters": $.message.intent.category ? {
"type": "jsonpath",
"expression": "$[?(@.category.code == '" & $.message.intent.category.descriptor.code & "')]"
} : null
}
}
bppMappings: |
{
"context": {
"action": "search",
"version": "1.1.0",
"domain": "retail",
"bap_id": $.context.bap_id,
"bap_uri": $.context.bap_uri,
"transaction_id": $.context.transaction_id,
"message_id": $.context.message_id,
"timestamp": $.context.timestamp
},
"message": {
"intent": {
"category": $.message.filters ? {
"descriptor": {
"code": $substringAfter($substringBefore($.message.filters.expression, "'"), "== '")
}
} : null
}
}
}
on_search:
bapMappings: |
{
"context": $.context,
"message": {
"catalog": {
"descriptor": $.message.catalogs[0]."beckn:descriptor" ? {
"name": $.message.catalogs[0]."beckn:descriptor"."schema:name"
} : null
}
}
}
bppMappings: |
{
"context": $.context,
"message": {
"catalogs": [{
"@type": "beckn:Catalog",
"beckn:items": $.message.catalog.providers[].items[].
{
"@type": "beckn:Item",
"beckn:id": id
}
}]
}
}
```
The sample illustrates how a single mapping file can convert `search` requests and `on_search` responses between Beckn 1.1.0 (BAP) and Beckn 2.0.0 (BPP) payload shapes. You can define as many action entries as needed, and the plugin will compile and cache the JSONata expressions on startup.
---
## Routing Configuration
### Routing Rules File Structure
@@ -1176,4 +1297,4 @@ modules:
httpClientConfig:
maxIdleConns: 1000
maxIdleConnsPerHost: 200
idleConnTimeout: 300
idleConnTimeout: 300

View File

@@ -69,14 +69,17 @@ The **Beckn Protocol** is an open protocol that enables location-aware, local co
- Per-step histograms with error attribution
- Cache, routing, plugin, and business KPIs (signature/schema validations, Beckn messages)
- Native Prometheus exporter with Grafana dashboards & alert rules (`monitoring/`)
- Opt-in: add a `telemetry` block in your config to wire the `otelsetup` plugin; omit it to run without metrics. Example:
- Opt-in: add a `plugins.otelsetup` block in your config to wire the `otelsetup` plugin; omit it to run without metrics. Example:
```yaml
telemetry:
enableMetrics: true
serviceName: "beckn-onix"
serviceVersion: "1.0.0"
environment: "development"
plugins:
otelsetup:
id: otelsetup
config:
serviceName: "beckn-onix"
serviceVersion: "1.0.0"
enableMetrics: "true"
environment: "development"
```
- **Modular Metrics Architecture**: Metrics are organized by module for better maintainability:
- OTel SDK wiring via `otelsetup` plugin
@@ -162,8 +165,9 @@ Resources:
- **Encrypter**: AES encryption for sensitive data protection
- **Decrypter**: AES decryption for encrypted data processing
- **ReqPreprocessor**: Request preprocessing (UUID generation, headers)
- **Registry**: Standard Beckn registry lookup for participant information
- **DeDiRegistry**: DeDi registry to lookup public keys for NP.
- **ReqMapper**: Middleware to transform payload either between Beckn versions or against other platforms.
- **Registry**: Standard Beckn registry or Beckn One DeDi registry lookup for participant information
## Quick Start
### Prerequisites

View File

@@ -536,10 +536,19 @@ curl http://localhost:8081/bap/receiver/
## Production Setup
### Integrating with Public cloud providers
Public cloud providers have built integration plugins and other tools to deploy Beckn ONIX easily. They provide specialized plugins that integrate with managed services on those platforms. Network participants can choose to consume them.
1. [Google Cloud Platform](https://github.com/GoogleCloudPlatform/dpi-accelerator-beckn-onix)
2. [Amazon Web Services](https://github.com/Beckn-One/beckn-onix/tree/main-pre-plugins/aws-cdk)
The rest of the document focuses on how to deploy ONIX in Production with cloud agnostic plugins and tools.
### Additional Requirements for Production
1. **HashiCorp Vault** for key management
2. **RabbitMQ** for message queuing
2. **RabbitMQ** for message queuing (needed for async flows)
3. **TLS certificates** for secure communication
4. **Load balancer** for high availability

BIN
adapter Executable file

Binary file not shown.

BIN
adapter.test Executable file

Binary file not shown.

View File

@@ -17,15 +17,19 @@ import (
"github.com/beckn-one/beckn-onix/core/module/handler"
"github.com/beckn-one/beckn-onix/pkg/log"
"github.com/beckn-one/beckn-onix/pkg/plugin"
"github.com/beckn-one/beckn-onix/pkg/plugin/implementation/otelsetup"
"github.com/beckn-one/beckn-onix/pkg/telemetry"
)
// ApplicationPlugins holds application-level plugin configurations.
type ApplicationPlugins struct {
OtelSetup *plugin.Config `yaml:"otelsetup,omitempty"`
}
// Config struct holds all configurations.
type Config struct {
AppName string `yaml:"appName"`
Log log.Config `yaml:"log"`
Telemetry *otelsetup.Config `yaml:"telemetry"`
Plugins ApplicationPlugins `yaml:"plugins,omitempty"`
PluginManager *plugin.ManagerConfig `yaml:"pluginManager"`
Modules []module.Config `yaml:"modules"`
HTTP httpConfig `yaml:"http"`
@@ -95,16 +99,14 @@ func validateConfig(cfg *Config) error {
}
// initPlugins initializes application-level plugins including telemetry.
func initPlugins(ctx context.Context, mgr *plugin.Manager, telemetryCfg *otelsetup.Config) (*telemetry.Provider, error) {
if telemetryCfg == nil {
func initPlugins(ctx context.Context, mgr *plugin.Manager, otelSetupCfg *plugin.Config) (*telemetry.Provider, error) {
if otelSetupCfg == nil {
log.Info(ctx, "Telemetry config not provided; skipping OpenTelemetry setup")
return nil, nil
}
log.Infof(ctx, "Initializing telemetry via plugin id=otelsetup")
pluginConfig := otelsetup.ToPluginConfig(telemetryCfg)
otelProvider, err := mgr.OtelSetup(ctx, pluginConfig)
log.Infof(ctx, "Initializing telemetry via plugin id=%s", otelSetupCfg.ID)
otelProvider, err := mgr.OtelSetup(ctx, otelSetupCfg)
if err != nil {
return nil, fmt.Errorf("failed to initialize telemetry plugin: %w", err)
}
@@ -152,7 +154,7 @@ func run(ctx context.Context, configPath string) error {
log.Debug(ctx, "Plugin manager loaded.")
// Initialize plugins including telemetry.
otelProvider, err := initPlugins(ctx, mgr, cfg.Telemetry)
otelProvider, err := initPlugins(ctx, mgr, cfg.Plugins.OtelSetup)
if err != nil {
return fmt.Errorf("failed to initialize plugins: %w", err)
}

View File

@@ -64,13 +64,18 @@ func (m *MockPluginManager) Cache(ctx context.Context, cfg *plugin.Config) (defi
return nil, nil
}
// Registry returns a mock implementation of the RegistryLookup interface.
func (m *MockPluginManager) Registry(ctx context.Context, cfg *plugin.Config) (definition.RegistryLookup, error) {
return nil, nil
}
// KeyManager returns a mock implementation of the KeyManager interface.
func (m *MockPluginManager) KeyManager(ctx context.Context, cache definition.Cache, rLookup definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error) {
return nil, nil
}
// Registry returns a mock implementation of the RegistryLookup interface.
func (m *MockPluginManager) Registry(ctx context.Context, cfg *plugin.Config) (definition.RegistryLookup, error) {
// TransportWrapper returns a mock implementation of the TransportWrapper interface.
func (m *MockPluginManager) TransportWrapper(ctx context.Context, cfg *plugin.Config) (definition.TransportWrapper, error) {
return nil, nil
}

View File

@@ -31,8 +31,8 @@ modules:
registry:
id: dediregistry
config:
url: http://34.14.173.68:8080/dedi
registryName: subscribers.beckn.one
url: https://api.testnet.beckn.one/registry/dedi # This is the testnet URL. The production URL is https://api.beckn.one/registry/dedi
registryName: subscribers.beckn.one # This is the wildcard string used to lookup across registries in Beckn One. Do not change this.
timeout: 10
retry_max: 3
retry_wait_min: 100ms
@@ -83,8 +83,8 @@ modules:
registry:
id: dediregistry
config:
url: http://34.14.173.68:8080/dedi
registryName: subscribers.beckn.one
url: https://api.testnet.beckn.one/registry/dedi # This is the testnet URL. The production URL is https://api.beckn.one/registry/dedi
registryName: subscribers.beckn.one # This is the wildcard string used to lookup across registries in Beckn One. Do not change this.
timeout: 10
retry_max: 3
retry_wait_min: 100ms

View File

@@ -31,8 +31,8 @@ modules:
registry:
id: dediregistry
config:
url: http://34.14.173.68:8080/dedi
registryName: subscribers.beckn.one
url: https://api.testnet.beckn.one/registry/dedi # This is the testnet URL. The production URL is https://api.beckn.one/registry/dedi
registryName: subscribers.beckn.one # This is the wildcard string used to lookup across registries in Beckn One. Do not change this.
timeout: 10
retry_max: 3
retry_wait_min: 100ms
@@ -78,8 +78,8 @@ modules:
registry:
id: dediregistry
config:
url: http://34.14.173.68:8080/dedi
registryName: subscribers.beckn.one
url: https://api.testnet.beckn.one/registry/dedi # This is the testnet URL. The production URL is https://api.beckn.one/registry/dedi
registryName: subscribers.beckn.one # This is the wildcard string used to lookup across registries in Beckn One. Do not change this.
timeout: 10
retry_max: 3
retry_wait_min: 100ms

View File

@@ -8,11 +8,14 @@ log:
- message_id
- subscriber_id
- module_id
telemetry:
serviceName: "beckn-onix"
serviceVersion: "1.0.0"
enableMetrics: true
environment: "development"
plugins:
otelsetup:
id: otelsetup
config:
serviceName: "beckn-onix"
serviceVersion: "1.0.0"
enableMetrics: "true"
environment: "development"
http:
port: 8081
timeout:

View File

@@ -22,6 +22,7 @@ type PluginManager interface {
Cache(ctx context.Context, cfg *plugin.Config) (definition.Cache, error)
Registry(ctx context.Context, cfg *plugin.Config) (definition.RegistryLookup, error)
KeyManager(ctx context.Context, cache definition.Cache, rLookup definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error)
TransportWrapper(ctx context.Context, cfg *plugin.Config) (definition.TransportWrapper, error)
SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error)
}
@@ -35,16 +36,17 @@ const (
// PluginCfg holds the configuration for various plugins.
type PluginCfg struct {
SchemaValidator *plugin.Config `yaml:"schemaValidator,omitempty"`
SignValidator *plugin.Config `yaml:"signValidator,omitempty"`
Publisher *plugin.Config `yaml:"publisher,omitempty"`
Signer *plugin.Config `yaml:"signer,omitempty"`
Router *plugin.Config `yaml:"router,omitempty"`
Cache *plugin.Config `yaml:"cache,omitempty"`
Registry *plugin.Config `yaml:"registry,omitempty"`
KeyManager *plugin.Config `yaml:"keyManager,omitempty"`
Middleware []plugin.Config `yaml:"middleware,omitempty"`
Steps []plugin.Config
SchemaValidator *plugin.Config `yaml:"schemaValidator,omitempty"`
SignValidator *plugin.Config `yaml:"signValidator,omitempty"`
Publisher *plugin.Config `yaml:"publisher,omitempty"`
Signer *plugin.Config `yaml:"signer,omitempty"`
Router *plugin.Config `yaml:"router,omitempty"`
Cache *plugin.Config `yaml:"cache,omitempty"`
Registry *plugin.Config `yaml:"registry,omitempty"`
KeyManager *plugin.Config `yaml:"keyManager,omitempty"`
TransportWrapper *plugin.Config `yaml:"transportWrapper,omitempty"`
Middleware []plugin.Config `yaml:"middleware,omitempty"`
Steps []plugin.Config
}
// HttpClientConfig defines the configuration for the HTTP transport layer.

View File

@@ -18,23 +18,24 @@ import (
// stdHandler orchestrates the execution of defined processing steps.
type stdHandler struct {
signer definition.Signer
steps []definition.Step
signValidator definition.SignValidator
cache definition.Cache
registry definition.RegistryLookup
km definition.KeyManager
schemaValidator definition.SchemaValidator
router definition.Router
publisher definition.Publisher
SubscriberID string
role model.Role
httpClient *http.Client
moduleName string
signer definition.Signer
steps []definition.Step
signValidator definition.SignValidator
cache definition.Cache
registry definition.RegistryLookup
km definition.KeyManager
schemaValidator definition.SchemaValidator
router definition.Router
publisher definition.Publisher
transportWrapper definition.TransportWrapper
SubscriberID string
role model.Role
httpClient *http.Client
moduleName string
}
// newHTTPClient creates a new HTTP client with a custom transport configuration.
func newHTTPClient(cfg *HttpClientConfig) *http.Client {
func newHTTPClient(cfg *HttpClientConfig, wrapper definition.TransportWrapper) *http.Client {
// Clone the default transport to inherit its sensible defaults.
transport := http.DefaultTransport.(*http.Transport).Clone()
@@ -53,7 +54,12 @@ func newHTTPClient(cfg *HttpClientConfig) *http.Client {
transport.ResponseHeaderTimeout = cfg.ResponseHeaderTimeout
}
return &http.Client{Transport: transport}
var finalTransport http.RoundTripper = transport
if wrapper != nil {
log.Debugf(context.Background(), "Applying custom transport wrapper")
finalTransport = wrapper.Wrap(transport)
}
return &http.Client{Transport: finalTransport}
}
// NewStdHandler initializes a new processor with plugins and steps.
@@ -62,13 +68,14 @@ func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config, moduleNa
steps: []definition.Step{},
SubscriberID: cfg.SubscriberID,
role: cfg.Role,
httpClient: newHTTPClient(&cfg.HttpClientConfig),
moduleName: moduleName,
}
// Initialize plugins.
if err := h.initPlugins(ctx, mgr, &cfg.Plugins); err != nil {
return nil, fmt.Errorf("failed to initialize plugins: %w", err)
}
// Initialize HTTP client after plugins so transport wrapper can be applied.
h.httpClient = newHTTPClient(&cfg.HttpClientConfig, h.transportWrapper)
// Initialize steps.
if err := h.initSteps(ctx, mgr, cfg); err != nil {
return nil, fmt.Errorf("failed to initialize steps: %w", err)
@@ -81,6 +88,13 @@ func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Header.Set("X-Module-Name", h.moduleName)
r.Header.Set("X-Role", string(h.role))
// These headers are only needed for internal instrumentation; avoid leaking them downstream.
// Use defer to ensure cleanup regardless of return path.
defer func() {
r.Header.Del("X-Module-Name")
r.Header.Del("X-Role")
}()
ctx, err := h.stepCtx(r, w.Header())
if err != nil {
log.Errorf(r.Context(), err, "stepCtx(r):%v", err)
@@ -92,7 +106,7 @@ func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Execute processing steps.
for _, step := range h.steps {
if err := step.Run(ctx); err != nil {
log.Errorf(ctx, err, "%T.run(%v):%v", step, ctx, err)
log.Errorf(ctx, err, "%T.run():%v", step, err)
response.SendNack(ctx, w, err)
return
}
@@ -104,10 +118,6 @@ func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
// These headers are only needed for internal instrumentation; avoid leaking them downstream.
r.Header.Del("X-Module-Name")
r.Header.Del("X-Role")
// Handle routing based on the defined route type.
route(ctx, r, w, h.publisher, h.httpClient)
}
@@ -255,6 +265,9 @@ func (h *stdHandler) initPlugins(ctx context.Context, mgr PluginManager, cfg *Pl
if h.signer, err = loadPlugin(ctx, "Signer", cfg.Signer, mgr.Signer); err != nil {
return err
}
if h.transportWrapper, err = loadPlugin(ctx, "TransportWrapper", cfg.TransportWrapper, mgr.TransportWrapper); err != nil {
return err
}
log.Debugf(ctx, "All required plugins successfully loaded for stdHandler")
return nil
@@ -301,7 +314,7 @@ func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Conf
instrumentedStep, wrapErr := telemetry.NewInstrumentedStep(s, step, h.moduleName)
if wrapErr != nil {
log.Warnf(ctx, "Failed to instrument step %s: %v", step, wrapErr)
h.steps = append(h.steps, s)
h.steps = append(h.steps, s)
continue
}
h.steps = append(h.steps, instrumentedStep)

View File

@@ -55,8 +55,8 @@ func TestNewHTTPClient(t *testing.T) {
{
name: "partial configuration",
config: HttpClientConfig{
MaxIdleConns: 500,
IdleConnTimeout: 180 * time.Second,
MaxIdleConns: 500,
IdleConnTimeout: 180 * time.Second,
},
expected: struct {
maxIdleConns int
@@ -74,8 +74,8 @@ func TestNewHTTPClient(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client := newHTTPClient(&tt.config)
client := newHTTPClient(&tt.config, nil)
if client == nil {
t.Fatal("newHTTPClient returned nil")
}
@@ -107,15 +107,15 @@ func TestNewHTTPClient(t *testing.T) {
func TestHttpClientConfigDefaults(t *testing.T) {
// Test that zero config values don't override defaults
config := &HttpClientConfig{}
client := newHTTPClient(config)
client := newHTTPClient(config, nil)
transport := client.Transport.(*http.Transport)
// Verify defaults are preserved when config values are zero
if transport.MaxIdleConns == 0 {
t.Error("MaxIdleConns should not be zero when using defaults")
}
// MaxIdleConnsPerHost default is 0 (unlimited), which is correct
if transport.MaxIdleConns != 100 {
t.Errorf("Expected default MaxIdleConns=100, got %d", transport.MaxIdleConns)
@@ -130,24 +130,66 @@ func TestHttpClientConfigPerformanceValues(t *testing.T) {
IdleConnTimeout: 300 * time.Second,
ResponseHeaderTimeout: 5 * time.Second,
}
client := newHTTPClient(config)
client := newHTTPClient(config, nil)
transport := client.Transport.(*http.Transport)
// Verify performance-optimized values
if transport.MaxIdleConns != 1000 {
t.Errorf("Expected MaxIdleConns=1000, got %d", transport.MaxIdleConns)
}
if transport.MaxIdleConnsPerHost != 200 {
t.Errorf("Expected MaxIdleConnsPerHost=200, got %d", transport.MaxIdleConnsPerHost)
}
if transport.IdleConnTimeout != 300*time.Second {
t.Errorf("Expected IdleConnTimeout=300s, got %v", transport.IdleConnTimeout)
}
if transport.ResponseHeaderTimeout != 5*time.Second {
t.Errorf("Expected ResponseHeaderTimeout=5s, got %v", transport.ResponseHeaderTimeout)
}
}
}
func TestNewHTTPClientWithTransportWrapper(t *testing.T) {
wrappedTransport := &mockRoundTripper{}
wrapper := &mockTransportWrapper{
returnTransport: wrappedTransport,
}
client := newHTTPClient(&HttpClientConfig{}, wrapper)
if !wrapper.wrapCalled {
t.Fatal("expected transport wrapper to be invoked")
}
if wrapper.wrappedTransport == nil {
t.Fatal("expected base transport to be passed to wrapper")
}
if client.Transport != wrappedTransport {
t.Errorf("expected client transport to use wrapper transport")
}
}
type mockTransportWrapper struct {
wrapCalled bool
wrappedTransport http.RoundTripper
returnTransport http.RoundTripper
}
func (m *mockTransportWrapper) Wrap(base http.RoundTripper) http.RoundTripper {
m.wrapCalled = true
m.wrappedTransport = base
if m.returnTransport != nil {
return m.returnTransport
}
return base
}
type mockRoundTripper struct{}
func (m *mockRoundTripper) RoundTrip(_ *http.Request) (*http.Response, error) {
return nil, nil
}

View File

@@ -267,7 +267,6 @@ func (s *addRouteStep) Run(ctx *model.StepContext) error {
if s.metrics != nil && ctx.Route != nil {
s.metrics.RoutingDecisionsTotal.Add(ctx.Context, 1,
metric.WithAttributes(
telemetry.AttrRouteType.String(ctx.Route.TargetType),
telemetry.AttrTargetType.String(ctx.Route.TargetType),
))
}

View File

@@ -59,13 +59,18 @@ func (m *mockPluginManager) Cache(ctx context.Context, cfg *plugin.Config) (defi
return nil, nil
}
// Registry returns a mock registry lookup implementation.
func (m *mockPluginManager) Registry(ctx context.Context, cfg *plugin.Config) (definition.RegistryLookup, error) {
return nil, nil
}
// KeyManager returns a mock key manager implementation.
func (m *mockPluginManager) KeyManager(ctx context.Context, cache definition.Cache, rLookup definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error) {
return nil, nil
}
// Registry returns a mock registry lookup implementation.
func (m *mockPluginManager) Registry(ctx context.Context, cfg *plugin.Config) (definition.RegistryLookup, error) {
// TransportWrapper returns a mock transport wrapper implementation.
func (m *mockPluginManager) TransportWrapper(ctx context.Context, cfg *plugin.Config) (definition.TransportWrapper, error) {
return nil, nil
}

5
go.mod
View File

@@ -19,7 +19,7 @@ require (
require github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03
require golang.org/x/text v0.23.0 // indirect
require golang.org/x/text v0.26.0 // indirect
require (
github.com/beorn7/perks v1.0.1 // indirect
@@ -58,7 +58,7 @@ require (
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/trace v1.38.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect
google.golang.org/protobuf v1.32.0 // indirect
)
@@ -68,6 +68,7 @@ require (
github.com/google/uuid v1.6.0
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/hashicorp/vault/api v1.16.0
github.com/jsonata-go/jsonata v0.0.0-20250709164031-599f35f32e5f
github.com/prometheus/client_golang v1.18.0
github.com/rabbitmq/amqp091-go v1.10.0
github.com/redis/go-redis/extra/redisotel/v9 v9.16.0

12
go.sum
View File

@@ -75,6 +75,8 @@ github.com/hashicorp/vault/api v1.16.0 h1:nbEYGJiAPGzT9U4oWgaaB0g+Rj8E59QuHKyA5L
github.com/hashicorp/vault/api v1.16.0/go.mod h1:KhuUhzOD8lDSk29AtzNjgAu2kxRA9jL9NAbkFlqvkBA=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jsonata-go/jsonata v0.0.0-20250709164031-599f35f32e5f h1:JnGon8QHtmjFPq0NcSu8OTEnQDDEgFME7gtj/NkjCUo=
github.com/jsonata-go/jsonata v0.0.0-20250709164031-599f35f32e5f/go.mod h1:rYUEOEiieWXHNCE/eDXV/o5s7jZ2VyUzQKbqVns9pik=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@@ -175,10 +177,12 @@ golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=

View File

@@ -17,6 +17,7 @@ plugins=(
"dediregistry"
"reqpreprocessor"
"otelsetup"
"reqmapper"
"router"
"schemavalidator"
"schemav2validator"

View File

@@ -0,0 +1,18 @@
package definition
import (
"context"
"net/http"
)
// TransportWrapper is a plugin that wraps an http.RoundTripper,
// allowing modification of outbound requests (like adding auth).
type TransportWrapper interface {
// Wrap takes a base transport and returns a new transport that wraps it.
Wrap(base http.RoundTripper) http.RoundTripper
}
// TransportWrapperProvider defines the factory for a TransportWrapper.
type TransportWrapperProvider interface {
New(ctx context.Context, config map[string]any) (TransportWrapper, func(), error)
}

View File

@@ -0,0 +1,23 @@
package main
import (
"context"
"net/http"
"github.com/beckn-one/beckn-onix/pkg/plugin/implementation/reqmapper"
)
type provider struct{}
func (p provider) New(ctx context.Context, c map[string]string) (func(http.Handler) http.Handler, error) {
config := &reqmapper.Config{}
if role, ok := c["role"]; ok {
config.Role = role
}
if mappingsFile, ok := c["mappingsFile"]; ok {
config.MappingsFile = mappingsFile
}
return reqmapper.NewReqMapper(config)
}
var Provider = provider{}

View File

@@ -0,0 +1,84 @@
package main
import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
)
func TestProviderNewSuccess(t *testing.T) {
p := provider{}
middleware, err := p.New(context.Background(), map[string]string{"role": "bap"})
if err != nil {
t.Fatalf("provider.New returned unexpected error: %v", err)
}
if middleware == nil {
t.Fatalf("provider.New returned nil middleware")
}
payload := map[string]interface{}{
"context": map[string]interface{}{
"action": "search",
"domain": "retail",
"version": "1.1.0",
"bap_id": "bap.example",
"bap_uri": "https://bap.example/api",
"transaction_id": "txn-1",
"message_id": "msg-1",
"timestamp": "2023-01-01T10:00:00Z",
},
"message": map[string]interface{}{
"intent": map[string]interface{}{
"fulfillment": map[string]interface{}{
"start": map[string]interface{}{
"location": map[string]interface{}{"gps": "0,0"},
},
"end": map[string]interface{}{
"location": map[string]interface{}{"gps": "1,1"},
},
},
},
},
}
body, err := json.Marshal(payload)
if err != nil {
t.Fatalf("failed to marshal payload: %v", err)
}
called := false
next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
called = true
w.WriteHeader(http.StatusNoContent)
})
req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body))
rec := httptest.NewRecorder()
middleware(next).ServeHTTP(rec, req)
if !called {
t.Fatalf("expected downstream handler to be invoked")
}
if rec.Code != http.StatusNoContent {
t.Fatalf("unexpected response code: got %d want %d", rec.Code, http.StatusNoContent)
}
}
func TestProviderNewMissingRole(t *testing.T) {
p := provider{}
if _, err := p.New(context.Background(), map[string]string{}); err == nil {
t.Fatalf("expected error when role is missing")
}
}
func TestProviderNewInvalidRole(t *testing.T) {
p := provider{}
_, err := p.New(context.Background(), map[string]string{"role": "invalid"})
if err == nil {
t.Fatalf("expected error for invalid role")
}
}

View File

@@ -0,0 +1,287 @@
package reqmapper
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"sync"
"github.com/beckn-one/beckn-onix/pkg/log"
"github.com/jsonata-go/jsonata"
"gopkg.in/yaml.v3"
)
// Config represents the configuration for the request mapper middleware.
type Config struct {
Role string `yaml:"role"` // "bap" or "bpp"
MappingsFile string `yaml:"mappingsFile"` // required path to mappings YAML
}
// MappingEngine handles JSONata-based transformations
type MappingEngine struct {
config *Config
jsonataInstance jsonata.JSONataInstance
bapMaps map[string]jsonata.Expression
bppMaps map[string]jsonata.Expression
mappings map[string]builtinMapping
mappingSource string
mutex sync.RWMutex
initialized bool
}
type builtinMapping struct {
BAP string `yaml:"bapMappings"`
BPP string `yaml:"bppMappings"`
}
type mappingFile struct {
Mappings map[string]builtinMapping `yaml:"mappings"`
}
var (
engineInstance *MappingEngine
engineOnce sync.Once
)
// NewReqMapper returns a middleware that maps requests using JSONata expressions
func NewReqMapper(cfg *Config) (func(http.Handler) http.Handler, error) {
if err := validateConfig(cfg); err != nil {
return nil, err
}
// Initialize the mapping engine
engine, err := initMappingEngine(cfg)
if err != nil {
return nil, fmt.Errorf("failed to initialize mapping engine: %w", err)
}
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read request body", http.StatusBadRequest)
return
}
var req map[string]interface{}
ctx := r.Context()
if err := json.Unmarshal(body, &req); err != nil {
http.Error(w, "Failed to decode request body", http.StatusBadRequest)
return
}
reqContext, ok := req["context"].(map[string]interface{})
if !ok {
http.Error(w, "context field not found or invalid", http.StatusBadRequest)
return
}
action, ok := reqContext["action"].(string)
if !ok {
http.Error(w, "action field not found or invalid", http.StatusBadRequest)
return
}
// Apply transformation
mappedBody, err := engine.Transform(ctx, action, req, cfg.Role)
if err != nil {
log.Errorf(ctx, err, "Transformation failed for action %s", action)
// Fall back to original body on error
mappedBody = body
}
r.Body = io.NopCloser(bytes.NewBuffer(mappedBody))
r.ContentLength = int64(len(mappedBody))
r = r.WithContext(ctx)
next.ServeHTTP(w, r)
})
}, nil
}
// initMappingEngine initializes or returns existing mapping engine
func initMappingEngine(cfg *Config) (*MappingEngine, error) {
var initErr error
engineOnce.Do(func() {
engineInstance = &MappingEngine{
config: cfg,
bapMaps: make(map[string]jsonata.Expression),
bppMaps: make(map[string]jsonata.Expression),
}
instance, err := jsonata.OpenLatest()
if err != nil {
initErr = fmt.Errorf("failed to initialize jsonata: %w", err)
return
}
engineInstance.jsonataInstance = instance
if err := engineInstance.loadBuiltinMappings(); err != nil {
initErr = err
return
}
engineInstance.initialized = true
})
if initErr != nil {
return nil, initErr
}
if !engineInstance.initialized {
return nil, errors.New("mapping engine failed to initialize")
}
return engineInstance, nil
}
func (e *MappingEngine) loadMappingsFromConfig() (map[string]builtinMapping, string, error) {
if e.config == nil || e.config.MappingsFile == "" {
return nil, "", errors.New("mappingsFile must be provided in config")
}
data, err := os.ReadFile(e.config.MappingsFile)
if err != nil {
return nil, "", fmt.Errorf("failed to read mappings file %s: %w", e.config.MappingsFile, err)
}
source := e.config.MappingsFile
var parsed mappingFile
if err := yaml.Unmarshal(data, &parsed); err != nil {
return nil, "", fmt.Errorf("failed to parse mappings from %s: %w", source, err)
}
if len(parsed.Mappings) == 0 {
return nil, "", fmt.Errorf("no mappings found in %s", source)
}
return parsed.Mappings, source, nil
}
// loadBuiltinMappings compiles JSONata expressions for every action/direction pair from the configured mappings file.
func (e *MappingEngine) loadBuiltinMappings() error {
mappings, source, err := e.loadMappingsFromConfig()
if err != nil {
return err
}
e.bapMaps = make(map[string]jsonata.Expression, len(mappings))
e.bppMaps = make(map[string]jsonata.Expression, len(mappings))
for action, mapping := range mappings {
bapExpr, err := e.jsonataInstance.Compile(mapping.BAP, false)
if err != nil {
return fmt.Errorf("failed to compile BAP mapping for action %s: %w", action, err)
}
bppExpr, err := e.jsonataInstance.Compile(mapping.BPP, false)
if err != nil {
return fmt.Errorf("failed to compile BPP mapping for action %s: %w", action, err)
}
e.bapMaps[action] = bapExpr
e.bppMaps[action] = bppExpr
}
e.mappings = mappings
e.mappingSource = source
log.Infof(
context.Background(),
"Loaded %d BAP mappings and %d BPP mappings from %s",
len(e.bapMaps),
len(e.bppMaps),
source,
)
return nil
}
// Transform applies the appropriate mapping based on role and action
func (e *MappingEngine) Transform(ctx context.Context, action string, req map[string]interface{}, role string) ([]byte, error) {
e.mutex.RLock()
defer e.mutex.RUnlock()
var expr jsonata.Expression
var found bool
// Select appropriate mapping based on role
switch role {
case "bap":
expr, found = e.bapMaps[action]
case "bpp":
expr, found = e.bppMaps[action]
default:
return json.Marshal(req)
}
// If no mapping found, return original request
if !found || expr == nil {
log.Debugf(ctx, "No mapping found for action: %s, role: %s", action, role)
return json.Marshal(req)
}
// Marshal request for JSONata evaluation
input, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("failed to marshal request for mapping: %w", err)
}
// Apply JSONata transformation
result, err := expr.Evaluate(input, nil)
if err != nil {
return nil, fmt.Errorf("JSONata evaluation failed: %w", err)
}
log.Debugf(ctx, "Successfully transformed %s request using %s mapping, %s", action, role, result)
return result, nil
}
// ReloadMappings reloads all mapping files (useful for hot-reload scenarios)
func (e *MappingEngine) ReloadMappings() error {
e.mutex.Lock()
defer e.mutex.Unlock()
return e.loadBuiltinMappings()
}
// GetMappingInfo returns information about loaded mappings
func (e *MappingEngine) GetMappingInfo() map[string]interface{} {
e.mutex.RLock()
defer e.mutex.RUnlock()
bapActions := make([]string, 0, len(e.bapMaps))
for action := range e.bapMaps {
bapActions = append(bapActions, action)
}
bppActions := make([]string, 0, len(e.bppMaps))
for action := range e.bppMaps {
bppActions = append(bppActions, action)
}
return map[string]interface{}{
"bap_mappings": bapActions,
"bpp_mappings": bppActions,
"mappings_source": e.mappingSource,
"action_count": len(e.mappings),
}
}
func validateConfig(cfg *Config) error {
if cfg == nil {
return errors.New("config cannot be nil")
}
if cfg.Role != "bap" && cfg.Role != "bpp" {
return errors.New("role must be either 'bap' or 'bpp'")
}
if cfg.MappingsFile == "" {
return errors.New("mappingsFile is required")
}
return nil
}

View File

@@ -0,0 +1,240 @@
package reqmapper
import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"reflect"
"sync"
"testing"
)
func resetEngineState(t *testing.T) {
t.Helper()
engineInstance = nil
engineOnce = sync.Once{}
}
func testMappingsFile(t *testing.T) string {
t.Helper()
path := filepath.Join("testdata", "mappings.yaml")
if _, err := os.Stat(path); err != nil {
t.Fatalf("test mappings file missing: %v", err)
}
return path
}
func initTestEngine(t *testing.T) *MappingEngine {
t.Helper()
resetEngineState(t)
engine, err := initMappingEngine(&Config{
Role: "bap",
MappingsFile: testMappingsFile(t),
})
if err != nil {
t.Fatalf("failed to init mapping engine: %v", err)
}
return engine
}
func TestNewReqMapper_InvalidConfig(t *testing.T) {
t.Run("nil config", func(t *testing.T) {
if _, err := NewReqMapper(nil); err == nil {
t.Fatalf("expected error for nil config")
}
})
t.Run("invalid role", func(t *testing.T) {
if _, err := NewReqMapper(&Config{Role: "invalid"}); err == nil {
t.Fatalf("expected error for invalid role")
}
})
}
func TestNewReqMapper_MiddlewareTransformsRequest(t *testing.T) {
resetEngineState(t)
mw, err := NewReqMapper(&Config{
Role: "bap",
MappingsFile: testMappingsFile(t),
})
if err != nil {
t.Fatalf("NewReqMapper returned error: %v", err)
}
startLocation := map[string]interface{}{
"gps": "12.9716,77.5946",
"city": "Bengaluru",
}
endLocation := map[string]interface{}{
"gps": "13.0827,80.2707",
"city": "Chennai",
}
requestPayload := map[string]interface{}{
"context": map[string]interface{}{
"domain": "retail",
"action": "search",
"version": "1.1.0",
"bap_id": "bap.example",
"bap_uri": "https://bap.example/api",
"transaction_id": "txn-1",
"message_id": "msg-1",
"timestamp": "2023-01-01T10:00:00Z",
},
"message": map[string]interface{}{
"intent": map[string]interface{}{
"item": map[string]interface{}{
"id": "item-1",
},
"provider": map[string]interface{}{
"id": "provider-1",
},
"fulfillment": map[string]interface{}{
"start": map[string]interface{}{
"location": startLocation,
},
"end": map[string]interface{}{
"location": endLocation,
},
},
},
},
}
body, err := json.Marshal(requestPayload)
if err != nil {
t.Fatalf("failed to marshal request payload: %v", err)
}
var captured map[string]interface{}
next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
data, err := io.ReadAll(r.Body)
if err != nil {
t.Fatalf("failed to read request in handler: %v", err)
}
if err := json.Unmarshal(data, &captured); err != nil {
t.Fatalf("failed to unmarshal transformed payload: %v", err)
}
w.WriteHeader(http.StatusOK)
})
req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body))
rec := httptest.NewRecorder()
mw(next).ServeHTTP(rec, req)
if captured == nil {
t.Fatalf("middleware did not forward request to next handler")
}
message, ok := captured["message"].(map[string]interface{})
if !ok {
t.Fatalf("expected message field in transformed payload")
}
filters, ok := message["filters"].(map[string]interface{})
if !ok {
t.Fatalf("expected filters in transformed payload")
}
if pickup := filters["pickup"]; !reflect.DeepEqual(pickup, startLocation) {
t.Fatalf("pickup location mismatch\ngot: %#v\nwant: %#v", pickup, startLocation)
}
if drop := filters["drop"]; !reflect.DeepEqual(drop, endLocation) {
t.Fatalf("drop location mismatch\ngot: %#v\nwant: %#v", drop, endLocation)
}
}
func TestMappingEngine_TransformFallbackForUnknownAction(t *testing.T) {
engine := initTestEngine(t)
req := map[string]interface{}{
"context": map[string]interface{}{
"action": "unknown_action",
},
"message": map[string]interface{}{},
}
expected, err := json.Marshal(req)
if err != nil {
t.Fatalf("failed to marshal expected payload: %v", err)
}
result, err := engine.Transform(context.Background(), "unknown_action", req, "bap")
if err != nil {
t.Fatalf("Transform returned error: %v", err)
}
if !bytes.Equal(result, expected) {
t.Fatalf("expected Transform to return original payload")
}
}
func TestMappingEngine_TransformFallbackForUnknownRole(t *testing.T) {
engine := initTestEngine(t)
req := map[string]interface{}{
"context": map[string]interface{}{
"action": "search",
},
"message": map[string]interface{}{},
}
expected, err := json.Marshal(req)
if err != nil {
t.Fatalf("failed to marshal expected payload: %v", err)
}
result, err := engine.Transform(context.Background(), "search", req, "unknown-role")
if err != nil {
t.Fatalf("Transform returned error: %v", err)
}
if !bytes.Equal(result, expected) {
t.Fatalf("expected Transform to return original payload when role is unknown")
}
}
func TestMappingEngine_ReloadMappings(t *testing.T) {
engine := initTestEngine(t)
engine.mutex.RLock()
originalBAP := len(engine.bapMaps)
originalBPP := len(engine.bppMaps)
engine.mutex.RUnlock()
if originalBAP == 0 || originalBPP == 0 {
t.Fatalf("expected test mappings to be loaded")
}
engine.mutex.Lock()
for action := range engine.bapMaps {
delete(engine.bapMaps, action)
break
}
engine.mutex.Unlock()
engine.mutex.RLock()
if len(engine.bapMaps) == originalBAP {
engine.mutex.RUnlock()
t.Fatalf("expected BAP map to be altered before reload")
}
engine.mutex.RUnlock()
if err := engine.ReloadMappings(); err != nil {
t.Fatalf("ReloadMappings returned error: %v", err)
}
engine.mutex.RLock()
defer engine.mutex.RUnlock()
if len(engine.bapMaps) != originalBAP {
t.Fatalf("expected BAP mappings to be reloaded, got %d want %d", len(engine.bapMaps), originalBAP)
}
if len(engine.bppMaps) != originalBPP {
t.Fatalf("expected BPP mappings to be reloaded, got %d want %d", len(engine.bppMaps), originalBPP)
}
}

View File

@@ -0,0 +1,17 @@
mappings:
search:
bapMappings: |
{
"context": $.context,
"message": {
"filters": {
"pickup": $.message.intent.fulfillment.start.location,
"drop": $.message.intent.fulfillment.end.location
}
}
}
bppMappings: |
{
"context": $.context,
"message": $.message
}

View File

@@ -217,13 +217,35 @@ func (m *Manager) OtelSetup(ctx context.Context, cfg *Config) (*telemetry.Provid
if closer != nil {
m.closers = append(m.closers, func() {
if err := closer(); err != nil {
panic(err)
log.Errorf(context.Background(), err, "Failed to shutdown telemetry provider")
}
})
}
return provider, nil
}
// TransportWrapper returns a TransportWrapper instance based on the provided configuration.
func (m *Manager) TransportWrapper(ctx context.Context, cfg *Config) (definition.TransportWrapper, error) {
twp, err := provider[definition.TransportWrapperProvider](m.plugins, cfg.ID)
if err != nil {
return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err)
}
config := make(map[string]any, len(cfg.Config))
for k, v := range cfg.Config {
config[k] = v
}
wrapper, closer, err := twp.New(ctx, config)
if err != nil {
return nil, err
}
if closer != nil {
m.closers = append(m.closers, closer)
}
return wrapper, nil
}
// Step returns a Step instance based on the provided configuration.
func (m *Manager) Step(ctx context.Context, cfg *Config) (definition.Step, error) {
sp, err := provider[definition.StepProvider](m.plugins, cfg.ID)