diff --git a/cmd/adapter/main.go b/cmd/adapter/main.go index cc1444f..0bd4646 100644 --- a/cmd/adapter/main.go +++ b/cmd/adapter/main.go @@ -17,6 +17,7 @@ import ( "github.com/beckn-one/beckn-onix/core/module/handler" "github.com/beckn-one/beckn-onix/pkg/log" "github.com/beckn-one/beckn-onix/pkg/plugin" + "github.com/beckn-one/beckn-onix/pkg/plugin/implementation/otelsetup" "github.com/beckn-one/beckn-onix/pkg/telemetry" ) @@ -93,6 +94,23 @@ func validateConfig(cfg *Config) error { return nil } +// initPlugins initializes application-level plugins including telemetry. +func initPlugins(ctx context.Context, mgr *plugin.Manager, telemetryCfg *telemetry.Config) (*telemetry.Provider, error) { + if telemetryCfg == nil { + log.Info(ctx, "Telemetry config not provided; skipping OpenTelemetry setup") + return nil, nil + } + + log.Infof(ctx, "Initializing telemetry via plugin id=otelsetup") + pluginConfig := otelsetup.ToPluginConfig(telemetryCfg) + + otelProvider, err := mgr.OtelSetup(ctx, pluginConfig) + if err != nil { + return nil, fmt.Errorf("failed to initialize telemetry plugin: %w", err) + } + return otelProvider, nil +} + // newServer creates and initializes the HTTP server. func newServer(ctx context.Context, mgr handler.PluginManager, cfg *Config, otelProvider *telemetry.Provider) (http.Handler, error) { mux := http.NewServeMux() @@ -134,29 +152,10 @@ func run(ctx context.Context, configPath string) error { closers = append(closers, closer) log.Debug(ctx, "Plugin manager loaded.") - // Initialize telemetry via plugin. - var otelProvider *telemetry.Provider - if cfg.Telemetry == nil { - log.Info(ctx, "Telemetry config not provided; skipping OpenTelemetry setup") - } else { - log.Infof(ctx, "Initializing telemetry via plugin id=otelsetup") - - // Convert telemetry.Config to plugin.Config - pluginConfig := &plugin.Config{ - ID: "otelsetup", - Config: map[string]string{ - "serviceName": cfg.Telemetry.ServiceName, - "serviceVersion": cfg.Telemetry.ServiceVersion, - "enableMetrics": fmt.Sprintf("%t", cfg.Telemetry.EnableMetrics), - "environment": cfg.Telemetry.Environment, - }, - } - - otelProvider, err = mgr.OtelSetup(ctx, pluginConfig) - if err != nil { - return fmt.Errorf("failed to initialize telemetry plugin: %w", err) - } - // Note: The closer is now handled by the plugin manager + // Initialize plugins including telemetry. + otelProvider, err := initPlugins(ctx, mgr, cfg.Telemetry) + if err != nil { + return fmt.Errorf("failed to initialize plugins: %w", err) } // Initialize HTTP server. diff --git a/config/local-simple.yaml b/config/local-simple.yaml index 2c975bb..486e40b 100644 --- a/config/local-simple.yaml +++ b/config/local-simple.yaml @@ -63,10 +63,11 @@ modules: id: router config: routingConfig: ./config/local-simple-routing.yaml - - id: reqpreprocessor - config: - uuidKeys: transaction_id,message_id - role: bap + reqpreprocessor: + id: reqpreprocessor + config: + uuidKeys: transaction_id,message_id + role: bap steps: - validateSign - addRoute @@ -108,10 +109,11 @@ modules: routingConfig: ./config/local-simple-routing-BAPCaller.yaml signer: id: signer - - id: reqpreprocessor - config: - uuidKeys: transaction_id,message_id - role: bap + reqpreprocessor: + id: reqpreprocessor + config: + uuidKeys: transaction_id,message_id + role: bap steps: - addRoute - sign @@ -157,7 +159,6 @@ modules: id: router config: routingConfig: ./config/local-simple-routing-BPPReceiver.yaml - middleware: steps: - validateSign - addRoute @@ -199,7 +200,6 @@ modules: routingConfig: ./config/local-simple-routing.yaml signer: id: signer - middleware: steps: - addRoute - sign diff --git a/core/module/handler/step.go b/core/module/handler/step.go index a066954..de5cc4b 100644 --- a/core/module/handler/step.go +++ b/core/module/handler/step.go @@ -277,15 +277,11 @@ func (s *addRouteStep) Run(ctx *model.StepContext) error { func extractSchemaVersion(body []byte) string { type contextEnvelope struct { Context struct { - Version string `json:"version"` - CoreVersion string `json:"core_version"` + Version string `json:"version"` } `json:"context"` } var payload contextEnvelope if err := json.Unmarshal(body, &payload); err == nil { - if payload.Context.CoreVersion != "" { - return payload.Context.CoreVersion - } if payload.Context.Version != "" { return payload.Context.Version } diff --git a/pkg/plugin/implementation/otelsetup/otelsetup.go b/pkg/plugin/implementation/otelsetup/otelsetup.go index 6d48525..4419caa 100644 --- a/pkg/plugin/implementation/otelsetup/otelsetup.go +++ b/pkg/plugin/implementation/otelsetup/otelsetup.go @@ -4,13 +4,37 @@ import ( "context" "fmt" + clientprom "github.com/prometheus/client_golang/prometheus" + clientpromhttp "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/contrib/instrumentation/runtime" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + otelprom "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + + "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/beckn-one/beckn-onix/pkg/plugin" "github.com/beckn-one/beckn-onix/pkg/telemetry" ) -// Setup wires the telemetry provider using the shared telemetry package. This -// is the concrete implementation behind the MetricsProvider interface. +// Setup wires the telemetry provider. This is the concrete implementation +// behind the MetricsProvider interface. type Setup struct{} +// ToPluginConfig converts telemetry.Config to plugin.Config format. +func ToPluginConfig(cfg *telemetry.Config) *plugin.Config { + return &plugin.Config{ + ID: "otelsetup", + Config: map[string]string{ + "serviceName": cfg.ServiceName, + "serviceVersion": cfg.ServiceVersion, + "enableMetrics": fmt.Sprintf("%t", cfg.EnableMetrics), + "environment": cfg.Environment, + }, + } +} + // New initializes the underlying telemetry provider. The returned provider // exposes the HTTP handler and shutdown hooks that the core application can // manage directly. @@ -18,5 +42,66 @@ func (Setup) New(ctx context.Context, cfg *telemetry.Config) (*telemetry.Provide if cfg == nil { return nil, fmt.Errorf("telemetry config cannot be nil") } - return telemetry.NewProvider(ctx, cfg) + + // Apply defaults if fields are empty + if cfg.ServiceName == "" { + cfg.ServiceName = telemetry.DefaultConfig().ServiceName + } + if cfg.ServiceVersion == "" { + cfg.ServiceVersion = telemetry.DefaultConfig().ServiceVersion + } + if cfg.Environment == "" { + cfg.Environment = telemetry.DefaultConfig().Environment + } + + if !cfg.EnableMetrics { + log.Info(ctx, "OpenTelemetry metrics disabled") + return &telemetry.Provider{ + Shutdown: func(context.Context) error { return nil }, + }, nil + } + + res, err := resource.New( + ctx, + resource.WithAttributes( + attribute.String("service.name", cfg.ServiceName), + attribute.String("service.version", cfg.ServiceVersion), + attribute.String("deployment.environment", cfg.Environment), + ), + ) + if err != nil { + return nil, fmt.Errorf("failed to create telemetry resource: %w", err) + } + + registry := clientprom.NewRegistry() + + exporter, err := otelprom.New( + otelprom.WithRegisterer(registry), + otelprom.WithoutUnits(), + otelprom.WithoutScopeInfo(), + ) + if err != nil { + return nil, fmt.Errorf("failed to create prometheus exporter: %w", err) + } + + meterProvider := metric.NewMeterProvider( + metric.WithReader(exporter), + metric.WithResource(res), + ) + + otel.SetMeterProvider(meterProvider) + log.Infof(ctx, "OpenTelemetry metrics initialized for service=%s version=%s env=%s", + cfg.ServiceName, cfg.ServiceVersion, cfg.Environment) + + if err := runtime.Start(runtime.WithMinimumReadMemStatsInterval(0)); err != nil { + log.Warnf(ctx, "Failed to start Go runtime instrumentation: %v", err) + } + + return &telemetry.Provider{ + MeterProvider: meterProvider, + MetricsHandler: clientpromhttp.HandlerFor(registry, clientpromhttp.HandlerOpts{}), + Shutdown: func(ctx context.Context) error { + return meterProvider.Shutdown(ctx) + }, + }, nil } diff --git a/pkg/plugin/implementation/otelsetup/otelsetup_test.go b/pkg/plugin/implementation/otelsetup/otelsetup_test.go new file mode 100644 index 0000000..10732b0 --- /dev/null +++ b/pkg/plugin/implementation/otelsetup/otelsetup_test.go @@ -0,0 +1,274 @@ +package otelsetup + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/beckn-one/beckn-onix/pkg/telemetry" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSetup_New_Success(t *testing.T) { + setup := Setup{} + ctx := context.Background() + + tests := []struct { + name string + cfg *telemetry.Config + }{ + { + name: "Valid config with all fields", + cfg: &telemetry.Config{ + ServiceName: "test-service", + ServiceVersion: "1.0.0", + EnableMetrics: true, + Environment: "test", + }, + }, + { + name: "Valid config with metrics disabled", + cfg: &telemetry.Config{ + ServiceName: "test-service", + ServiceVersion: "1.0.0", + EnableMetrics: false, + Environment: "test", + }, + }, + { + name: "Config with empty fields uses defaults", + cfg: &telemetry.Config{ + ServiceName: "", + ServiceVersion: "", + EnableMetrics: true, + Environment: "", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + provider, err := setup.New(ctx, tt.cfg) + + require.NoError(t, err, "New() should not return error") + require.NotNil(t, provider, "New() should return non-nil provider") + require.NotNil(t, provider.Shutdown, "Provider should have shutdown function") + + if tt.cfg.EnableMetrics { + assert.NotNil(t, provider.MetricsHandler, "MetricsHandler should be set when metrics enabled") + assert.NotNil(t, provider.MeterProvider, "MeterProvider should be set when metrics enabled") + + // Test that metrics handler works + rec := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/metrics", nil) + provider.MetricsHandler.ServeHTTP(rec, req) + assert.Equal(t, http.StatusOK, rec.Code) + } else { + assert.Nil(t, provider.MetricsHandler, "MetricsHandler should be nil when metrics disabled") + } + + // Test shutdown + err = provider.Shutdown(ctx) + assert.NoError(t, err, "Shutdown should not return error") + }) + } +} + +func TestSetup_New_Failure(t *testing.T) { + setup := Setup{} + ctx := context.Background() + + tests := []struct { + name string + cfg *telemetry.Config + wantErr bool + }{ + { + name: "Nil config", + cfg: nil, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + provider, err := setup.New(ctx, tt.cfg) + + if tt.wantErr { + assert.Error(t, err, "New() should return error") + assert.Nil(t, provider, "New() should return nil provider on error") + } else { + assert.NoError(t, err, "New() should not return error") + assert.NotNil(t, provider, "New() should return non-nil provider") + } + }) + } +} + +func TestSetup_New_DefaultValues(t *testing.T) { + setup := Setup{} + ctx := context.Background() + + // Test with empty fields - should use defaults + cfg := &telemetry.Config{ + ServiceName: "", + ServiceVersion: "", + EnableMetrics: true, + Environment: "", + } + + provider, err := setup.New(ctx, cfg) + require.NoError(t, err) + require.NotNil(t, provider) + + // Verify defaults are applied by checking that provider is functional + assert.NotNil(t, provider.MetricsHandler, "MetricsHandler should be set with defaults") + assert.NotNil(t, provider.MeterProvider, "MeterProvider should be set with defaults") + + // Test metrics endpoint + rec := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/metrics", nil) + provider.MetricsHandler.ServeHTTP(rec, req) + assert.Equal(t, http.StatusOK, rec.Code) + + // Cleanup + err = provider.Shutdown(ctx) + assert.NoError(t, err) +} + +func TestSetup_New_MetricsDisabled(t *testing.T) { + setup := Setup{} + ctx := context.Background() + + cfg := &telemetry.Config{ + ServiceName: "test-service", + ServiceVersion: "1.0.0", + EnableMetrics: false, + Environment: "test", + } + + provider, err := setup.New(ctx, cfg) + require.NoError(t, err) + require.NotNil(t, provider) + + // When metrics are disabled, MetricsHandler should be nil + assert.Nil(t, provider.MetricsHandler, "MetricsHandler should be nil when metrics disabled") + assert.Nil(t, provider.MeterProvider, "MeterProvider should be nil when metrics disabled") + + // Shutdown should still work + err = provider.Shutdown(ctx) + assert.NoError(t, err, "Shutdown should work even when metrics disabled") +} + +func TestToPluginConfig_Success(t *testing.T) { + tests := []struct { + name string + cfg *telemetry.Config + expectedID string + expectedConfig map[string]string + }{ + { + name: "Valid config with all fields", + cfg: &telemetry.Config{ + ServiceName: "test-service", + ServiceVersion: "1.0.0", + EnableMetrics: true, + Environment: "test", + }, + expectedID: "otelsetup", + expectedConfig: map[string]string{ + "serviceName": "test-service", + "serviceVersion": "1.0.0", + "enableMetrics": "true", + "environment": "test", + }, + }, + { + name: "Config with enableMetrics false", + cfg: &telemetry.Config{ + ServiceName: "my-service", + ServiceVersion: "2.0.0", + EnableMetrics: false, + Environment: "production", + }, + expectedID: "otelsetup", + expectedConfig: map[string]string{ + "serviceName": "my-service", + "serviceVersion": "2.0.0", + "enableMetrics": "false", + "environment": "production", + }, + }, + { + name: "Config with empty fields", + cfg: &telemetry.Config{ + ServiceName: "", + ServiceVersion: "", + EnableMetrics: true, + Environment: "", + }, + expectedID: "otelsetup", + expectedConfig: map[string]string{ + "serviceName": "", + "serviceVersion": "", + "enableMetrics": "true", + "environment": "", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := ToPluginConfig(tt.cfg) + + require.NotNil(t, result, "ToPluginConfig should return non-nil config") + assert.Equal(t, tt.expectedID, result.ID, "Plugin ID should be 'otelsetup'") + assert.Equal(t, tt.expectedConfig, result.Config, "Config map should match expected values") + }) + } +} + +func TestToPluginConfig_NilConfig(t *testing.T) { + // Test that ToPluginConfig handles nil config + // Note: This will panic if nil is passed, which is acceptable behavior + // as the function expects a valid config. In practice, callers should check for nil. + assert.Panics(t, func() { + ToPluginConfig(nil) + }, "ToPluginConfig should panic when given nil config") +} + +func TestToPluginConfig_BooleanConversion(t *testing.T) { + tests := []struct { + name string + enableMetrics bool + expected string + }{ + { + name: "EnableMetrics true", + enableMetrics: true, + expected: "true", + }, + { + name: "EnableMetrics false", + enableMetrics: false, + expected: "false", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := &telemetry.Config{ + ServiceName: "test", + ServiceVersion: "1.0.0", + EnableMetrics: tt.enableMetrics, + Environment: "test", + } + + result := ToPluginConfig(cfg) + require.NotNil(t, result) + assert.Equal(t, tt.expected, result.Config["enableMetrics"], "enableMetrics should be converted to string correctly") + }) + } +} diff --git a/pkg/telemetry/step_metrics_test.go b/pkg/telemetry/step_metrics_test.go new file mode 100644 index 0000000..45dbede --- /dev/null +++ b/pkg/telemetry/step_metrics_test.go @@ -0,0 +1,149 @@ +package telemetry + +import ( + "context" + "net/http/httptest" + "testing" + + "go.opentelemetry.io/otel/metric" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGetStepMetrics_Success(t *testing.T) { + ctx := context.Background() + + // Initialize telemetry provider first + provider, err := NewProvider(ctx, &Config{ + ServiceName: "test-service", + ServiceVersion: "1.0.0", + EnableMetrics: true, + Environment: "test", + }) + require.NoError(t, err) + defer provider.Shutdown(context.Background()) + + // Test getting step metrics + metrics, err := GetStepMetrics(ctx) + require.NoError(t, err, "GetStepMetrics() should not return error") + require.NotNil(t, metrics, "GetStepMetrics() should return non-nil metrics") + + // Verify all metric instruments are initialized + assert.NotNil(t, metrics.StepExecutionDuration, "StepExecutionDuration should be initialized") + assert.NotNil(t, metrics.StepExecutionTotal, "StepExecutionTotal should be initialized") + assert.NotNil(t, metrics.StepErrorsTotal, "StepErrorsTotal should be initialized") +} + +func TestGetStepMetrics_ConcurrentAccess(t *testing.T) { + ctx := context.Background() + + // Initialize telemetry provider first + provider, err := NewProvider(ctx, &Config{ + ServiceName: "test-service", + ServiceVersion: "1.0.0", + EnableMetrics: true, + Environment: "test", + }) + require.NoError(t, err) + defer provider.Shutdown(context.Background()) + + // Test that GetStepMetrics is safe for concurrent access + // and returns the same instance (singleton pattern) + metrics1, err1 := GetStepMetrics(ctx) + require.NoError(t, err1) + require.NotNil(t, metrics1) + + metrics2, err2 := GetStepMetrics(ctx) + require.NoError(t, err2) + require.NotNil(t, metrics2) + + // Should return the same instance + assert.Equal(t, metrics1, metrics2, "GetStepMetrics should return the same instance") +} + +func TestGetStepMetrics_WithoutProvider(t *testing.T) { + ctx := context.Background() + + // Test getting step metrics without initializing provider + // This should still work but may not have a valid meter provider + metrics, err := GetStepMetrics(ctx) + // Note: This might succeed or fail depending on OTel's default behavior + // We're just checking it doesn't panic + if err != nil { + t.Logf("GetStepMetrics returned error (expected if no provider): %v", err) + } else { + assert.NotNil(t, metrics, "Metrics should be returned even without explicit provider") + } +} + +func TestStepMetrics_Instruments(t *testing.T) { + ctx := context.Background() + + // Initialize telemetry provider + provider, err := NewProvider(ctx, &Config{ + ServiceName: "test-service", + ServiceVersion: "1.0.0", + EnableMetrics: true, + Environment: "test", + }) + require.NoError(t, err) + defer provider.Shutdown(context.Background()) + + // Get step metrics + metrics, err := GetStepMetrics(ctx) + require.NoError(t, err) + require.NotNil(t, metrics) + + // Test that we can record metrics (this tests the instruments are functional) + // Note: We can't easily verify the metrics were recorded without querying the exporter, + // but we can verify the instruments are not nil and can be called without panicking + + // Test StepExecutionDuration + require.NotPanics(t, func() { + metrics.StepExecutionDuration.Record(ctx, 0.5, + metric.WithAttributes(AttrStep.String("test-step"), AttrModule.String("test-module"))) + }, "StepExecutionDuration.Record should not panic") + + // Test StepExecutionTotal + require.NotPanics(t, func() { + metrics.StepExecutionTotal.Add(ctx, 1, + metric.WithAttributes(AttrStep.String("test-step"), AttrModule.String("test-module"))) + }, "StepExecutionTotal.Add should not panic") + + // Test StepErrorsTotal + require.NotPanics(t, func() { + metrics.StepErrorsTotal.Add(ctx, 1, + metric.WithAttributes(AttrStep.String("test-step"), AttrModule.String("test-module"))) + }, "StepErrorsTotal.Add should not panic") + + // Verify metrics are exposed via HTTP handler + rec := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/metrics", nil) + provider.MetricsHandler.ServeHTTP(rec, req) + assert.Equal(t, 200, rec.Code, "Metrics endpoint should return 200") +} + +func TestStepMetrics_MultipleCalls(t *testing.T) { + ctx := context.Background() + + // Initialize telemetry provider + provider, err := NewProvider(ctx, &Config{ + ServiceName: "test-service", + ServiceVersion: "1.0.0", + EnableMetrics: true, + Environment: "test", + }) + require.NoError(t, err) + defer provider.Shutdown(context.Background()) + + // Call GetStepMetrics multiple times + for i := 0; i < 10; i++ { + metrics, err := GetStepMetrics(ctx) + require.NoError(t, err, "GetStepMetrics should succeed on call %d", i) + require.NotNil(t, metrics, "GetStepMetrics should return non-nil on call %d", i) + assert.NotNil(t, metrics.StepExecutionDuration, "StepExecutionDuration should be initialized") + assert.NotNil(t, metrics.StepExecutionTotal, "StepExecutionTotal should be initialized") + assert.NotNil(t, metrics.StepErrorsTotal, "StepErrorsTotal should be initialized") + } +}