diff --git a/CONFIG.md b/CONFIG.md index 72d4af4..6df5e92 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -196,7 +196,7 @@ log: **Required**: No **Description**: OpenTelemetry configuration controlling whether the Prometheus exporter is enabled. -**Important**: The `/metrics` endpoint is only exposed when `enableMetrics: true`. +**Important**: This block is optional—omit it to run without telemetry. When present, the `/metrics` endpoint is exposed only if `enableMetrics: true`. #### Parameters: @@ -223,7 +223,7 @@ log: **Default**: `"development"` **Description**: Sets the `deployment.environment` attribute (e.g., `development`, `staging`, `production`). -**Example - Enable Metrics**: +**Example - Enable Metrics** (matches `config/local-simple.yaml`): ```yaml telemetry: enableMetrics: true @@ -244,10 +244,9 @@ http://your-server:port/metrics Metrics are organized by module for better maintainability and encapsulation: -#### HTTP Metrics (from `otelmetrics` plugin) -- `http_server_requests_total`, `http_server_request_duration_seconds`, `http_server_requests_in_flight` -- `http_server_request_size_bytes`, `http_server_response_size_bytes` -- `beckn_messages_total` - Total Beckn protocol messages processed +#### OTel Setup (from `otelsetup` plugin) +- Prometheus exporter & `/metrics` handler registration +- Go runtime instrumentation (`go_*`), resource attributes, and meter provider wiring #### Step Execution Metrics (from `telemetry` package) - `onix_step_executions_total`, `onix_step_execution_duration_seconds`, `onix_step_errors_total` @@ -269,7 +268,7 @@ Metrics are organized by module for better maintainability and encapsulation: Each metric includes consistent labels such as `module`, `role`, `action`, `status`, `step`, `plugin_id`, and `schema_version` to enable low-cardinality dashboards. **Note**: Metric definitions are now located in their respective modules: -- HTTP metrics: `pkg/plugin/implementation/otelmetrics/metrics.go` +- OTel setup: `pkg/plugin/implementation/otelsetup` - Step metrics: `pkg/telemetry/step_metrics.go` - Handler metrics: `core/module/handler/metrics.go` - Cache metrics: `pkg/plugin/implementation/cache/metrics.go` diff --git a/README.md b/README.md index 05c3959..b7521b3 100644 --- a/README.md +++ b/README.md @@ -69,8 +69,17 @@ The **Beckn Protocol** is an open protocol that enables location-aware, local co - Per-step histograms with error attribution - Cache, routing, plugin, and business KPIs (signature/schema validations, Beckn messages) - Native Prometheus exporter with Grafana dashboards & alert rules (`monitoring/`) + - Opt-in: add a `telemetry` block in your config to wire the `otelsetup` plugin; omit it to run without metrics. Example: + + ```yaml + telemetry: + enableMetrics: true + serviceName: "beckn-onix" + serviceVersion: "1.0.0" + environment: "development" + ``` - **Modular Metrics Architecture**: Metrics are organized by module for better maintainability: - - HTTP metrics in `otelmetrics` plugin + - OTel SDK wiring via `otelsetup` plugin - Step execution metrics in `telemetry` package - Handler metrics (signature, schema, routing) in `handler` module - Cache metrics in `cache` plugin diff --git a/cmd/adapter/main.go b/cmd/adapter/main.go index b2a12dd..cc1444f 100644 --- a/cmd/adapter/main.go +++ b/cmd/adapter/main.go @@ -24,7 +24,7 @@ import ( type Config struct { AppName string `yaml:"appName"` Log log.Config `yaml:"log"` - Telemetry telemetry.Config `yaml:"telemetry"` + Telemetry *telemetry.Config `yaml:"telemetry"` PluginManager *plugin.ManagerConfig `yaml:"pluginManager"` Modules []module.Config `yaml:"modules"` HTTP httpConfig `yaml:"http"` @@ -100,7 +100,7 @@ func newServer(ctx context.Context, mgr handler.PluginManager, cfg *Config, otel if otelProvider != nil && otelProvider.MetricsHandler != nil { mux.Handle("/metrics", otelProvider.MetricsHandler) - log.Infof(ctx, "Metrics endpoint registered at /metrics") + log.Infof(ctx, "Metrics endpoint registered at /metrics") } if err := module.Register(ctx, cfg.Modules, mux, mgr); err != nil { @@ -125,22 +125,6 @@ func run(ctx context.Context, configPath string) error { return fmt.Errorf("failed to initialize logger: %w", err) } - // Initialize telemetry. - log.Infof(ctx, "Initializing telemetry with config: %+v", cfg.Telemetry) - otelProvider, err := telemetry.NewProvider(ctx, &cfg.Telemetry) - if err != nil { - return fmt.Errorf("failed to initialize telemetry: %w", err) - } - if otelProvider != nil && otelProvider.Shutdown != nil { - closers = append(closers, func() { - shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := otelProvider.Shutdown(shutdownCtx); err != nil { - log.Errorf(ctx, err, "Failed to shutdown telemetry: %v", err) - } - }) - } - // Initialize plugin manager. log.Infof(ctx, "Initializing plugin manager") mgr, closer, err := newManagerFunc(ctx, cfg.PluginManager) @@ -150,6 +134,31 @@ func run(ctx context.Context, configPath string) error { closers = append(closers, closer) log.Debug(ctx, "Plugin manager loaded.") + // Initialize telemetry via plugin. + var otelProvider *telemetry.Provider + if cfg.Telemetry == nil { + log.Info(ctx, "Telemetry config not provided; skipping OpenTelemetry setup") + } else { + log.Infof(ctx, "Initializing telemetry via plugin id=otelsetup") + + // Convert telemetry.Config to plugin.Config + pluginConfig := &plugin.Config{ + ID: "otelsetup", + Config: map[string]string{ + "serviceName": cfg.Telemetry.ServiceName, + "serviceVersion": cfg.Telemetry.ServiceVersion, + "enableMetrics": fmt.Sprintf("%t", cfg.Telemetry.EnableMetrics), + "environment": cfg.Telemetry.Environment, + }, + } + + otelProvider, err = mgr.OtelSetup(ctx, pluginConfig) + if err != nil { + return fmt.Errorf("failed to initialize telemetry plugin: %w", err) + } + // Note: The closer is now handled by the plugin manager + } + // Initialize HTTP server. log.Infof(ctx, "Initializing HTTP server") srv, err := newServerFunc(ctx, mgr, cfg, otelProvider) diff --git a/config/local-simple.yaml b/config/local-simple.yaml index 287e50f..2c975bb 100644 --- a/config/local-simple.yaml +++ b/config/local-simple.yaml @@ -63,14 +63,10 @@ modules: id: router config: routingConfig: ./config/local-simple-routing.yaml - middleware: - id: reqpreprocessor config: uuidKeys: transaction_id,message_id role: bap - - id: otelmetrics - config: - enabled: "true" steps: - validateSign - addRoute @@ -112,7 +108,6 @@ modules: routingConfig: ./config/local-simple-routing-BAPCaller.yaml signer: id: signer - middleware: - id: reqpreprocessor config: uuidKeys: transaction_id,message_id @@ -163,9 +158,6 @@ modules: config: routingConfig: ./config/local-simple-routing-BPPReceiver.yaml middleware: - - id: otelmetrics - config: - enabled: "true" steps: - validateSign - addRoute @@ -208,9 +200,6 @@ modules: signer: id: signer middleware: - - id: otelmetrics - config: - enabled: "true" steps: - addRoute - sign diff --git a/install/build-plugins.sh b/install/build-plugins.sh index 61709af..8a359ba 100755 --- a/install/build-plugins.sh +++ b/install/build-plugins.sh @@ -16,7 +16,7 @@ plugins=( "registry" "dediregistry" "reqpreprocessor" - "otelmetrics" + "otelsetup" "router" "schemavalidator" "schemav2validator" diff --git a/pkg/plugin/definition/metrics.go b/pkg/plugin/definition/metrics.go new file mode 100644 index 0000000..b4f09ec --- /dev/null +++ b/pkg/plugin/definition/metrics.go @@ -0,0 +1,15 @@ +package definition + +import ( + "context" + + "github.com/beckn-one/beckn-onix/pkg/telemetry" +) + +// MetricsProvider encapsulates initialization of OpenTelemetry metrics +// providers. Implementations wire exporters and return a Provider that the core +// application can manage. +type MetricsProvider interface { + // New initializes a new telemetry provider instance with the given configuration. + New(ctx context.Context, config map[string]string) (*telemetry.Provider, func() error, error) +} diff --git a/pkg/plugin/implementation/otelmetrics/cmd/plugin.go b/pkg/plugin/implementation/otelmetrics/cmd/plugin.go deleted file mode 100644 index 1e839a2..0000000 --- a/pkg/plugin/implementation/otelmetrics/cmd/plugin.go +++ /dev/null @@ -1,21 +0,0 @@ -package main - -import ( - "context" - "net/http" - - "github.com/beckn-one/beckn-onix/pkg/plugin/implementation/otelmetrics" -) - -type middlewareProvider struct{} - -func (middlewareProvider) New(ctx context.Context, cfg map[string]string) (func(http.Handler) http.Handler, error) { - mw, err := otelmetrics.New(ctx, cfg) - if err != nil { - return nil, err - } - return mw.Handler, nil -} - -// Provider is exported for plugin loader. -var Provider = middlewareProvider{} diff --git a/pkg/plugin/implementation/otelmetrics/metrics.go b/pkg/plugin/implementation/otelmetrics/metrics.go deleted file mode 100644 index d7da87d..0000000 --- a/pkg/plugin/implementation/otelmetrics/metrics.go +++ /dev/null @@ -1,97 +0,0 @@ -package otelmetrics - -import ( - "context" - "fmt" - "sync" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/metric" -) - -// HTTPMetrics exposes HTTP-related metric instruments. -type HTTPMetrics struct { - HTTPRequestsTotal metric.Int64Counter - HTTPRequestDuration metric.Float64Histogram - HTTPRequestsInFlight metric.Int64UpDownCounter - HTTPRequestSize metric.Int64Histogram - HTTPResponseSize metric.Int64Histogram - BecknMessagesTotal metric.Int64Counter -} - -var ( - httpMetricsInstance *HTTPMetrics - httpMetricsOnce sync.Once - httpMetricsErr error -) - -// GetHTTPMetrics lazily initializes HTTP metric instruments and returns a cached reference. -func GetHTTPMetrics(ctx context.Context) (*HTTPMetrics, error) { - httpMetricsOnce.Do(func() { - httpMetricsInstance, httpMetricsErr = newHTTPMetrics() - }) - return httpMetricsInstance, httpMetricsErr -} - -func newHTTPMetrics() (*HTTPMetrics, error) { - meter := otel.GetMeterProvider().Meter( - "github.com/beckn-one/beckn-onix/otelmetrics", - metric.WithInstrumentationVersion("1.0.0"), - ) - - m := &HTTPMetrics{} - var err error - - if m.HTTPRequestsTotal, err = meter.Int64Counter( - "http_server_requests_total", - metric.WithDescription("Total number of HTTP requests processed"), - metric.WithUnit("{request}"), - ); err != nil { - return nil, fmt.Errorf("http_server_requests_total: %w", err) - } - - if m.HTTPRequestDuration, err = meter.Float64Histogram( - "http_server_request_duration_seconds", - metric.WithDescription("HTTP request duration in seconds"), - metric.WithUnit("s"), - metric.WithExplicitBucketBoundaries(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10), - ); err != nil { - return nil, fmt.Errorf("http_server_request_duration_seconds: %w", err) - } - - if m.HTTPRequestsInFlight, err = meter.Int64UpDownCounter( - "http_server_requests_in_flight", - metric.WithDescription("Number of HTTP requests currently being processed"), - metric.WithUnit("{request}"), - ); err != nil { - return nil, fmt.Errorf("http_server_requests_in_flight: %w", err) - } - - if m.HTTPRequestSize, err = meter.Int64Histogram( - "http_server_request_size_bytes", - metric.WithDescription("Size of HTTP request payloads"), - metric.WithUnit("By"), - metric.WithExplicitBucketBoundaries(100, 1000, 10000, 100000, 1000000), - ); err != nil { - return nil, fmt.Errorf("http_server_request_size_bytes: %w", err) - } - - if m.HTTPResponseSize, err = meter.Int64Histogram( - "http_server_response_size_bytes", - metric.WithDescription("Size of HTTP responses"), - metric.WithUnit("By"), - metric.WithExplicitBucketBoundaries(100, 1000, 10000, 100000, 1000000), - ); err != nil { - return nil, fmt.Errorf("http_server_response_size_bytes: %w", err) - } - - if m.BecknMessagesTotal, err = meter.Int64Counter( - "beckn_messages_total", - metric.WithDescription("Total Beckn protocol messages processed"), - metric.WithUnit("{message}"), - ); err != nil { - return nil, fmt.Errorf("beckn_messages_total: %w", err) - } - - return m, nil -} diff --git a/pkg/plugin/implementation/otelmetrics/otelmetrics.go b/pkg/plugin/implementation/otelmetrics/otelmetrics.go deleted file mode 100644 index 673e321..0000000 --- a/pkg/plugin/implementation/otelmetrics/otelmetrics.go +++ /dev/null @@ -1,134 +0,0 @@ -package otelmetrics - -import ( - "context" - "net/http" - "strings" - "time" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - - "github.com/beckn-one/beckn-onix/pkg/log" - "github.com/beckn-one/beckn-onix/pkg/telemetry" -) - -// Middleware instruments inbound HTTP handlers with OpenTelemetry metrics. -type Middleware struct { - metrics *HTTPMetrics - enabled bool -} - -// New constructs middleware based on plugin configuration. -func New(ctx context.Context, cfg map[string]string) (*Middleware, error) { - enabled := cfg["enabled"] != "false" - - metrics, err := GetHTTPMetrics(ctx) - if err != nil { - log.Warnf(ctx, "OpenTelemetry metrics unavailable: %v", err) - } - - return &Middleware{ - metrics: metrics, - enabled: enabled, - }, nil -} - -// Handler returns an http.Handler middleware compatible with plugin expectations. -func (m *Middleware) Handler(next http.Handler) http.Handler { - if !m.enabled || m.metrics == nil { - return next - } - - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - action := extractAction(r.URL.Path) - module := r.Header.Get("X-Module-Name") - role := r.Header.Get("X-Role") - - attrs := []attribute.KeyValue{ - telemetry.AttrModule.String(module), - telemetry.AttrRole.String(role), - telemetry.AttrAction.String(action), - telemetry.AttrHTTPMethod.String(r.Method), - } - - m.metrics.HTTPRequestsInFlight.Add(ctx, 1, metric.WithAttributes(attrs...)) - defer m.metrics.HTTPRequestsInFlight.Add(ctx, -1, metric.WithAttributes(attrs...)) - - if r.ContentLength > 0 { - m.metrics.HTTPRequestSize.Record(ctx, r.ContentLength, metric.WithAttributes(attrs...)) - } - - rw := &responseWriter{ResponseWriter: w, statusCode: http.StatusOK} - start := time.Now() - next.ServeHTTP(rw, r) - duration := time.Since(start).Seconds() - - status := "success" - if rw.statusCode >= 400 { - status = "error" - } - - statusAttrs := append(attrs, - telemetry.AttrHTTPStatus.Int(rw.statusCode), - telemetry.AttrStatus.String(status), - ) - - m.metrics.HTTPRequestsTotal.Add(ctx, 1, metric.WithAttributes(statusAttrs...)) - m.metrics.HTTPRequestDuration.Record(ctx, duration, metric.WithAttributes(statusAttrs...)) - if rw.bytesWritten > 0 { - m.metrics.HTTPResponseSize.Record(ctx, int64(rw.bytesWritten), metric.WithAttributes(statusAttrs...)) - } - - if isBecknAction(action) { - m.metrics.BecknMessagesTotal.Add(ctx, 1, - metric.WithAttributes( - telemetry.AttrAction.String(action), - telemetry.AttrRole.String(role), - telemetry.AttrStatus.String(status), - )) - } - }) -} - -type responseWriter struct { - http.ResponseWriter - statusCode int - bytesWritten int -} - -func (rw *responseWriter) WriteHeader(code int) { - rw.statusCode = code - rw.ResponseWriter.WriteHeader(code) -} - -func (rw *responseWriter) Write(b []byte) (int, error) { - n, err := rw.ResponseWriter.Write(b) - rw.bytesWritten += n - return n, err -} - -func extractAction(path string) string { - trimmed := strings.Trim(path, "/") - if trimmed == "" { - return "root" - } - parts := strings.Split(trimmed, "/") - return parts[len(parts)-1] -} - -func isBecknAction(action string) bool { - actions := []string{ - "discover", "select", "init", "confirm", "status", "track", - "cancel", "update", "rating", "support", - "on_discover", "on_select", "on_init", "on_confirm", "on_status", - "on_track", "on_cancel", "on_update", "on_rating", "on_support", - } - for _, a := range actions { - if a == action { - return true - } - } - return false -} diff --git a/pkg/plugin/implementation/otelsetup/cmd/plugin.go b/pkg/plugin/implementation/otelsetup/cmd/plugin.go new file mode 100644 index 0000000..8aac8f1 --- /dev/null +++ b/pkg/plugin/implementation/otelsetup/cmd/plugin.go @@ -0,0 +1,78 @@ +package main + +import ( + "context" + "errors" + "strconv" + "time" + + "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/beckn-one/beckn-onix/pkg/plugin/implementation/otelsetup" + "github.com/beckn-one/beckn-onix/pkg/telemetry" +) + +// metricsProvider implements the MetricsProvider interface for the otelsetup plugin. +type metricsProvider struct { + impl otelsetup.Setup +} + +// New creates a new telemetry provider instance. +func (m metricsProvider) New(ctx context.Context, config map[string]string) (*telemetry.Provider, func() error, error) { + if ctx == nil { + return nil, nil, errors.New("context cannot be nil") + } + + // Convert map[string]string to telemetry.Config + telemetryConfig := &telemetry.Config{ + ServiceName: config["serviceName"], + ServiceVersion: config["serviceVersion"], + Environment: config["environment"], + } + + // Parse enableMetrics as boolean + if enableMetricsStr, ok := config["enableMetrics"]; ok && enableMetricsStr != "" { + enableMetrics, err := strconv.ParseBool(enableMetricsStr) + if err != nil { + log.Warnf(ctx, "Invalid enableMetrics value '%s', defaulting to true: %v", enableMetricsStr, err) + telemetryConfig.EnableMetrics = true + } else { + telemetryConfig.EnableMetrics = enableMetrics + } + } else { + telemetryConfig.EnableMetrics = true // Default to true if not specified or empty + } + + // Apply defaults if fields are empty + if telemetryConfig.ServiceName == "" { + telemetryConfig.ServiceName = telemetry.DefaultConfig().ServiceName + } + if telemetryConfig.ServiceVersion == "" { + telemetryConfig.ServiceVersion = telemetry.DefaultConfig().ServiceVersion + } + if telemetryConfig.Environment == "" { + telemetryConfig.Environment = telemetry.DefaultConfig().Environment + } + + log.Debugf(ctx, "Telemetry config mapped: %+v", telemetryConfig) + provider, err := m.impl.New(ctx, telemetryConfig) + if err != nil { + log.Errorf(ctx, err, "Failed to create telemetry provider instance") + return nil, nil, err + } + + // Wrap the Shutdown function to match the closer signature + var closer func() error + if provider != nil && provider.Shutdown != nil { + closer = func() error { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return provider.Shutdown(shutdownCtx) + } + } + + log.Infof(ctx, "Telemetry provider instance created successfully") + return provider, closer, nil +} + +// Provider is the exported plugin instance +var Provider = metricsProvider{} diff --git a/pkg/plugin/implementation/otelsetup/cmd/plugin_test.go b/pkg/plugin/implementation/otelsetup/cmd/plugin_test.go new file mode 100644 index 0000000..d10a784 --- /dev/null +++ b/pkg/plugin/implementation/otelsetup/cmd/plugin_test.go @@ -0,0 +1,311 @@ +package main + +import ( + "context" + "testing" + + "github.com/beckn-one/beckn-onix/pkg/telemetry" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMetricsProviderNew_Success(t *testing.T) { + provider := metricsProvider{} + + tests := []struct { + name string + ctx context.Context + config map[string]string + }{ + { + name: "Valid config with all fields", + ctx: context.Background(), + config: map[string]string{ + "serviceName": "test-service", + "serviceVersion": "1.0.0", + "enableMetrics": "true", + "environment": "test", + }, + }, + { + name: "Valid config with minimal fields (uses defaults)", + ctx: context.Background(), + config: map[string]string{}, + }, + { + name: "Valid config with enableMetrics false", + ctx: context.Background(), + config: map[string]string{ + "enableMetrics": "false", + }, + }, + { + name: "Valid config with partial fields", + ctx: context.Background(), + config: map[string]string{ + "serviceName": "custom-service", + "serviceVersion": "2.0.0", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + telemetryProvider, cleanup, err := provider.New(tt.ctx, tt.config) + + require.NoError(t, err, "New() should not return error") + require.NotNil(t, telemetryProvider, "New() should return non-nil provider") + + // Test cleanup function if it exists + if cleanup != nil { + err := cleanup() + assert.NoError(t, err, "cleanup() should not return error") + } + }) + } +} + +func TestMetricsProviderNew_Failure(t *testing.T) { + provider := metricsProvider{} + + tests := []struct { + name string + ctx context.Context + config map[string]string + wantErr bool + }{ + { + name: "Nil context", + ctx: nil, + config: map[string]string{}, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + telemetryProvider, cleanup, err := provider.New(tt.ctx, tt.config) + + if tt.wantErr { + assert.Error(t, err, "New() should return error for nil context") + assert.Nil(t, telemetryProvider, "New() should return nil provider on error") + assert.Nil(t, cleanup, "New() should return nil cleanup on error") + } else { + assert.NoError(t, err, "New() should not return error") + assert.NotNil(t, telemetryProvider, "New() should return non-nil provider") + } + }) + } +} + +func TestMetricsProviderNew_ConfigConversion(t *testing.T) { + provider := metricsProvider{} + ctx := context.Background() + + tests := []struct { + name string + config map[string]string + expectedConfig *telemetry.Config + }{ + { + name: "All fields provided", + config: map[string]string{ + "serviceName": "my-service", + "serviceVersion": "3.0.0", + "enableMetrics": "true", + "environment": "production", + }, + expectedConfig: &telemetry.Config{ + ServiceName: "my-service", + ServiceVersion: "3.0.0", + EnableMetrics: true, + Environment: "production", + }, + }, + { + name: "Empty config uses defaults", + config: map[string]string{}, + expectedConfig: &telemetry.Config{ + ServiceName: telemetry.DefaultConfig().ServiceName, + ServiceVersion: telemetry.DefaultConfig().ServiceVersion, + EnableMetrics: true, // Default when not specified + Environment: telemetry.DefaultConfig().Environment, + }, + }, + { + name: "EnableMetrics false", + config: map[string]string{ + "enableMetrics": "false", + }, + expectedConfig: &telemetry.Config{ + ServiceName: telemetry.DefaultConfig().ServiceName, + ServiceVersion: telemetry.DefaultConfig().ServiceVersion, + EnableMetrics: false, + Environment: telemetry.DefaultConfig().Environment, + }, + }, + { + name: "Invalid enableMetrics defaults to true", + config: map[string]string{ + "enableMetrics": "invalid", + }, + expectedConfig: &telemetry.Config{ + ServiceName: telemetry.DefaultConfig().ServiceName, + ServiceVersion: telemetry.DefaultConfig().ServiceVersion, + EnableMetrics: true, // Defaults to true on parse error + Environment: telemetry.DefaultConfig().Environment, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + telemetryProvider, cleanup, err := provider.New(ctx, tt.config) + + require.NoError(t, err, "New() should not return error") + require.NotNil(t, telemetryProvider, "New() should return non-nil provider") + + // Verify that the provider was created (we can't directly check internal config, + // but we can verify the provider is functional) + if tt.expectedConfig.EnableMetrics { + assert.NotNil(t, telemetryProvider.MetricsHandler, "MetricsHandler should be set when metrics enabled") + } + + if cleanup != nil { + err := cleanup() + assert.NoError(t, err, "cleanup() should not return error") + } + }) + } +} + +func TestMetricsProviderNew_BooleanParsing(t *testing.T) { + provider := metricsProvider{} + ctx := context.Background() + + tests := []struct { + name string + enableMetrics string + expected bool + }{ + { + name: "True string", + enableMetrics: "true", + expected: true, + }, + { + name: "False string", + enableMetrics: "false", + expected: false, + }, + { + name: "True uppercase", + enableMetrics: "TRUE", + expected: true, + }, + { + name: "False uppercase", + enableMetrics: "FALSE", + expected: false, + }, + { + name: "Invalid value defaults to true", + enableMetrics: "invalid", + expected: true, // Defaults to true on parse error + }, + { + name: "Empty string defaults to true", + enableMetrics: "", + expected: true, // Defaults to true when not specified + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + config := map[string]string{ + "enableMetrics": tt.enableMetrics, + } + + telemetryProvider, cleanup, err := provider.New(ctx, config) + + require.NoError(t, err, "New() should not return error") + require.NotNil(t, telemetryProvider, "New() should return non-nil provider") + + // Verify metrics handler is set based on enableMetrics + if tt.expected { + assert.NotNil(t, telemetryProvider.MetricsHandler, "MetricsHandler should be set when metrics enabled") + } + + if cleanup != nil { + err := cleanup() + assert.NoError(t, err, "cleanup() should not return error") + } + }) + } +} + +func TestMetricsProviderNew_CleanupFunction(t *testing.T) { + provider := metricsProvider{} + ctx := context.Background() + + config := map[string]string{ + "serviceName": "test-service", + "serviceVersion": "1.0.0", + "enableMetrics": "true", + "environment": "test", + } + + telemetryProvider, cleanup, err := provider.New(ctx, config) + + require.NoError(t, err, "New() should not return error") + require.NotNil(t, telemetryProvider, "New() should return non-nil provider") + require.NotNil(t, cleanup, "New() should return non-nil cleanup function") + + // Test that cleanup can be called successfully + err = cleanup() + assert.NoError(t, err, "cleanup() should not return error") +} + +func TestProviderVariable(t *testing.T) { + assert.NotNil(t, Provider, "Provider should not be nil") + + // Verify Provider implements the interface correctly + ctx := context.Background() + config := map[string]string{ + "serviceName": "test", + "serviceVersion": "1.0.0", + "enableMetrics": "true", + } + + telemetryProvider, cleanup, err := Provider.New(ctx, config) + + require.NoError(t, err, "Provider.New() should not return error") + require.NotNil(t, telemetryProvider, "Provider.New() should return non-nil provider") + + if cleanup != nil { + err := cleanup() + assert.NoError(t, err, "cleanup() should not return error") + } +} + +func TestMetricsProviderNew_DefaultValues(t *testing.T) { + provider := metricsProvider{} + ctx := context.Background() + + // Test with completely empty config + config := map[string]string{} + + telemetryProvider, cleanup, err := provider.New(ctx, config) + + require.NoError(t, err, "New() should not return error with empty config") + require.NotNil(t, telemetryProvider, "New() should return non-nil provider") + + // Verify defaults are applied by checking that provider is functional + assert.NotNil(t, telemetryProvider.MetricsHandler, "MetricsHandler should be set with defaults") + + if cleanup != nil { + err := cleanup() + assert.NoError(t, err, "cleanup() should not return error") + } +} + diff --git a/pkg/plugin/implementation/otelsetup/otelsetup.go b/pkg/plugin/implementation/otelsetup/otelsetup.go new file mode 100644 index 0000000..6d48525 --- /dev/null +++ b/pkg/plugin/implementation/otelsetup/otelsetup.go @@ -0,0 +1,22 @@ +package otelsetup + +import ( + "context" + "fmt" + + "github.com/beckn-one/beckn-onix/pkg/telemetry" +) + +// Setup wires the telemetry provider using the shared telemetry package. This +// is the concrete implementation behind the MetricsProvider interface. +type Setup struct{} + +// New initializes the underlying telemetry provider. The returned provider +// exposes the HTTP handler and shutdown hooks that the core application can +// manage directly. +func (Setup) New(ctx context.Context, cfg *telemetry.Config) (*telemetry.Provider, error) { + if cfg == nil { + return nil, fmt.Errorf("telemetry config cannot be nil") + } + return telemetry.NewProvider(ctx, cfg) +} diff --git a/pkg/plugin/manager.go b/pkg/plugin/manager.go index fd5afe5..a27782a 100644 --- a/pkg/plugin/manager.go +++ b/pkg/plugin/manager.go @@ -15,6 +15,7 @@ import ( "github.com/beckn-one/beckn-onix/pkg/log" "github.com/beckn-one/beckn-onix/pkg/plugin/definition" + "github.com/beckn-one/beckn-onix/pkg/telemetry" ) type onixPlugin interface { @@ -196,6 +197,33 @@ func (m *Manager) Middleware(ctx context.Context, cfg *Config) (func(http.Handle return mwp.New(ctx, cfg.Config) } +// OtelSetup initializes OpenTelemetry via a dedicated plugin. The plugin is +// expected to return a telemetry Provider that the core application can use for +// instrumentation. +func (m *Manager) OtelSetup(ctx context.Context, cfg *Config) (*telemetry.Provider, error) { + if cfg == nil { + log.Info(ctx, "Telemetry config not provided; skipping OpenTelemetry setup") + return nil, nil + } + + otp, err := provider[definition.MetricsProvider](m.plugins, cfg.ID) + if err != nil { + return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err) + } + provider, closer, err := otp.New(ctx, cfg.Config) + if err != nil { + return nil, err + } + if closer != nil { + m.closers = append(m.closers, func() { + if err := closer(); err != nil { + panic(err) + } + }) + } + return provider, nil +} + // Step returns a Step instance based on the provided configuration. func (m *Manager) Step(ctx context.Context, cfg *Config) (definition.Step, error) { sp, err := provider[definition.StepProvider](m.plugins, cfg.ID) diff --git a/pkg/telemetry/metrics.go b/pkg/telemetry/metrics.go index 89d32ad..6e4e920 100644 --- a/pkg/telemetry/metrics.go +++ b/pkg/telemetry/metrics.go @@ -13,7 +13,7 @@ import ( // Metrics exposes strongly typed metric instruments used across the adapter. // Note: Most metrics have been moved to their respective modules. Only plugin-level // metrics remain here. See: -// - HTTP metrics: pkg/plugin/implementation/otelmetrics/metrics.go +// - OTel setup: pkg/plugin/implementation/otelsetup // - Step metrics: pkg/telemetry/step_metrics.go // - Cache metrics: pkg/plugin/implementation/cache/metrics.go // - Handler metrics: core/module/handler/metrics.go diff --git a/pkg/telemetry/step_instrumentor.go b/pkg/telemetry/step_instrumentor.go index e6a7ac7..b72643c 100644 --- a/pkg/telemetry/step_instrumentor.go +++ b/pkg/telemetry/step_instrumentor.go @@ -11,19 +11,23 @@ import ( "github.com/beckn-one/beckn-onix/pkg/log" "github.com/beckn-one/beckn-onix/pkg/model" - "github.com/beckn-one/beckn-onix/pkg/plugin/definition" ) +// StepRunner represents the minimal contract required for step instrumentation. +type StepRunner interface { + Run(*model.StepContext) error +} + // InstrumentedStep wraps a processing step with telemetry instrumentation. type InstrumentedStep struct { - step definition.Step + step StepRunner stepName string moduleName string metrics *StepMetrics } // NewInstrumentedStep returns a telemetry enabled wrapper around a definition.Step. -func NewInstrumentedStep(step definition.Step, stepName, moduleName string) (*InstrumentedStep, error) { +func NewInstrumentedStep(step StepRunner, stepName, moduleName string) (*InstrumentedStep, error) { metrics, err := GetStepMetrics(context.Background()) if err != nil { return nil, err