diff --git a/CONFIG.md b/CONFIG.md index 7b28ec8..bc5b619 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -189,124 +189,69 @@ log: --- -## Metrics Configuration +## Telemetry Configuration -### `metrics` +### `telemetry` **Type**: `object` **Required**: No -**Description**: OpenTelemetry metrics configuration for observability and monitoring. +**Description**: OpenTelemetry configuration controlling whether the Prometheus exporter is enabled. -**Important**: When `enabled: true`, metrics are automatically exposed at the `/metrics` endpoint in Prometheus format. This allows Prometheus or any HTTP client to scrape metrics directly from the application. +**Important**: The `/metrics` endpoint is only exposed when `enableMetrics: true`. #### Parameters: -##### `enabled` +##### `enableMetrics` **Type**: `boolean` **Required**: No **Default**: `false` -**Description**: Enable or disable metrics collection. When enabled: -- Metrics are collected automatically -- Metrics are exposed at `/metrics` endpoint in Prometheus format -- All metrics subsystems are initialized (request metrics, runtime metrics) - -When disabled, no metrics are collected and the `/metrics` endpoint is not available. - -##### `exporterType` -**Type**: `string` -**Required**: Yes (if `enabled` is `true`) -**Options**: `prometheus` -**Default**: `prometheus` -**Description**: Metrics exporter type. Currently only `prometheus` is supported, which exposes metrics at the `/metrics` endpoint. - -**Note**: The `/metrics` endpoint is always available when `enabled: true`. +**Description**: Enables metrics collection and the `/metrics` endpoint. ##### `serviceName` **Type**: `string` **Required**: No **Default**: `"beckn-onix"` -**Description**: Service name used in metrics resource attributes. Helps identify the service in observability platforms. +**Description**: Sets the `service.name` resource attribute. ##### `serviceVersion` **Type**: `string` **Required**: No -**Description**: Service version used in metrics resource attributes. Useful for tracking different versions of the service. +**Description**: Sets the `service.version` resource attribute. -##### `prometheus` -**Type**: `object` +##### `environment` +**Type**: `string` **Required**: No -**Description**: Prometheus exporter configuration (reserved for future use). +**Default**: `"development"` +**Description**: Sets the `deployment.environment` attribute (e.g., `development`, `staging`, `production`). **Example - Enable Metrics**: ```yaml -metrics: - enabled: true - exporterType: prometheus +telemetry: + enableMetrics: true serviceName: beckn-onix serviceVersion: "1.0.0" + environment: "development" ``` -**Note**: Metrics are available at `/metrics` endpoint in Prometheus format. - -**Example - Disabled Metrics**: -```yaml -metrics: - enabled: false -``` -**Note**: No metrics are collected and `/metrics` endpoint is not available. ### Accessing Metrics -When `metrics.enabled: true`, metrics are automatically available at: +When `telemetry.enableMetrics: true`, scrape metrics at: ``` http://your-server:port/metrics ``` -The endpoint returns metrics in Prometheus format and can be: -- Scraped by Prometheus -- Accessed via `curl http://localhost:8081/metrics` -- Viewed in a web browser - ### Metrics Collected -The adapter automatically collects the following metrics: +- `http_server_requests_total`, `http_server_request_duration_seconds`, `http_server_requests_in_flight` +- `http_server_request_size_bytes`, `http_server_response_size_bytes` +- `onix_step_executions_total`, `onix_step_execution_duration_seconds`, `onix_step_errors_total` +- `onix_plugin_execution_duration_seconds`, `onix_plugin_errors_total` +- `beckn_messages_total`, `beckn_signature_validations_total`, `beckn_schema_validations_total` +- `onix_routing_decisions_total` +- `onix_cache_operations_total`, `onix_cache_hits_total`, `onix_cache_misses_total` +- Go runtime metrics (`go_*`) and Redis instrumentation via `redisotel` -#### HTTP Metrics (Automatic via OpenTelemetry HTTP Middleware) -- `http.server.duration` - Request duration histogram -- `http.server.request.size` - Request body size -- `http.server.response.size` - Response body size -- `http.server.active_requests` - Active request counter - -#### Request Metrics (Automatic) -**Inbound Requests:** -- `beckn.inbound.requests.total` - Total inbound requests per host -- `beckn.inbound.sign_validation.total` - Requests with sign validation per host -- `beckn.inbound.schema_validation.total` - Requests with schema validation per host - -**Outbound Requests:** -- `beckn.outbound.requests.total` - Total outbound requests per host -- `beckn.outbound.requests.2xx` - 2XX responses per host -- `beckn.outbound.requests.4xx` - 4XX responses per host -- `beckn.outbound.requests.5xx` - 5XX responses per host -- `beckn.outbound.request.duration` - Request duration histogram (supports p99, p95, p75 percentiles) per host - -#### Go Runtime Metrics (Automatic) -- `go_cpu_*` - CPU usage metrics -- `go_memstats_*` - Memory allocation and heap statistics -- `go_memstats_gc_*` - Garbage collection statistics -- `go_goroutines` - Goroutine count - -#### Redis Metrics (Automatic via redisotel) -- `redis_commands_duration_seconds` - Redis command duration -- `redis_commands_total` - Total Redis commands -- `redis_connections_active` - Active Redis connections -- Additional Redis-specific metrics - -All metrics include relevant attributes (labels) such as: -- `host` - Request hostname -- `status_code` - HTTP status code -- `operation` - HTTP operation name -- `service.name` - Service identifier -- `service.version` - Service version +Each metric includes consistent labels such as `module`, `role`, `action`, `status`, `step`, `plugin_id`, and `schema_version` to enable low-cardinality dashboards. --- diff --git a/README.md b/README.md index 2811e77..6eec900 100644 --- a/README.md +++ b/README.md @@ -64,16 +64,28 @@ The **Beckn Protocol** is an open protocol that enables location-aware, local co ### 📊 **Observability** - **Structured Logging**: JSON-formatted logs with contextual information - **Transaction Tracking**: End-to-end request tracing with unique IDs -- **OpenTelemetry Metrics**: Comprehensive metrics collection via OpenTelemetry - - HTTP request metrics (duration, size, active requests) - - Inbound/outbound request tracking per host - - Request validation metrics (sign, schema) - - Outbound request status codes (2XX/4XX/5XX) and latency percentiles - - Go runtime metrics (CPU, memory, GC, goroutines) - - Redis operation metrics (via automatic instrumentation) - - Prometheus-compatible `/metrics` endpoint +- **OpenTelemetry Metrics**: Pull-based metrics exposed via `/metrics` + - RED metrics for every module and action (rate, errors, duration) + - Per-step histograms with error attribution + - Cache, routing, plugin, and business KPIs (signature/schema validations, Beckn messages) + - Native Prometheus exporter with Grafana dashboards & alert rules (`monitoring/`) +- **Runtime Instrumentation**: Go runtime + Redis client metrics baked in - **Health Checks**: Liveness and readiness probes for Kubernetes +#### Monitoring Quick Start +```bash +./install/build-plugins.sh +go build -o beckn-adapter ./cmd/adapter +./beckn-adapter --config=config/local-simple.yaml +cd monitoring && docker-compose -f docker-compose-monitoring.yml up -d +open http://localhost:3000 # Grafana (admin/admin) +``` +Resources: +- `monitoring/prometheus.yml` – scrape config +- `monitoring/prometheus-alerts.yml` – alert rules (RED, cache, step, plugin) +- `monitoring/grafana/dashboards/beckn-onix-overview.json` – curated dashboard +- `docs/METRICS_RUNBOOK.md` – runbook with PromQL recipes & troubleshooting + ### 🌐 **Multi-Domain Support** - **Retail & E-commerce**: Product search, order management, fulfillment tracking - **Mobility Services**: Ride-hailing, public transport, vehicle rentals @@ -358,9 +370,9 @@ modules: | Method | Endpoint | Description | |--------|----------|-------------| | GET | `/health` | Health check endpoint | -| GET | `/metrics` | Prometheus metrics endpoint (when metrics enabled) | +| GET | `/metrics` | Prometheus metrics endpoint (when telemetry is enabled) | -**Note**: The `/metrics` endpoint is only available when `metrics.enabled: true` in the configuration file. It returns metrics in Prometheus format. +**Note**: The `/metrics` endpoint is available when `telemetry.enableMetrics: true` in the configuration file. It returns metrics in Prometheus format. ## Documentation diff --git a/cmd/adapter/main.go b/cmd/adapter/main.go index 04d5ed0..b2a12dd 100644 --- a/cmd/adapter/main.go +++ b/cmd/adapter/main.go @@ -16,15 +16,15 @@ import ( "github.com/beckn-one/beckn-onix/core/module" "github.com/beckn-one/beckn-onix/core/module/handler" "github.com/beckn-one/beckn-onix/pkg/log" - "github.com/beckn-one/beckn-onix/pkg/metrics" "github.com/beckn-one/beckn-onix/pkg/plugin" + "github.com/beckn-one/beckn-onix/pkg/telemetry" ) // Config struct holds all configurations. type Config struct { AppName string `yaml:"appName"` Log log.Config `yaml:"log"` - Metrics metrics.Config `yaml:"metrics"` + Telemetry telemetry.Config `yaml:"telemetry"` PluginManager *plugin.ManagerConfig `yaml:"pluginManager"` Modules []module.Config `yaml:"modules"` HTTP httpConfig `yaml:"http"` @@ -94,20 +94,16 @@ func validateConfig(cfg *Config) error { } // newServer creates and initializes the HTTP server. -func newServer(ctx context.Context, mgr handler.PluginManager, cfg *Config) (http.Handler, error) { +func newServer(ctx context.Context, mgr handler.PluginManager, cfg *Config, otelProvider *telemetry.Provider) (http.Handler, error) { mux := http.NewServeMux() + mux.HandleFunc("/health", handler.HealthHandler) - // Register /metrics endpoint if metrics are enabled - if metrics.IsEnabled() { - metricsHandler := metrics.MetricsHandler() - if metricsHandler != nil { - mux.Handle("/metrics", metricsHandler) + if otelProvider != nil && otelProvider.MetricsHandler != nil { + mux.Handle("/metrics", otelProvider.MetricsHandler) log.Infof(ctx, "Metrics endpoint registered at /metrics") - } } - err := module.Register(ctx, cfg.Modules, mux, mgr) - if err != nil { + if err := module.Register(ctx, cfg.Modules, mux, mgr); err != nil { return nil, fmt.Errorf("failed to register modules: %w", err) } return mux, nil @@ -129,20 +125,18 @@ func run(ctx context.Context, configPath string) error { return fmt.Errorf("failed to initialize logger: %w", err) } - // Initialize metrics. - log.Infof(ctx, "Initializing metrics with config: %+v", cfg.Metrics) - if err := metrics.InitMetrics(cfg.Metrics); err != nil { - return fmt.Errorf("failed to initialize metrics: %w", err) + // Initialize telemetry. + log.Infof(ctx, "Initializing telemetry with config: %+v", cfg.Telemetry) + otelProvider, err := telemetry.NewProvider(ctx, &cfg.Telemetry) + if err != nil { + return fmt.Errorf("failed to initialize telemetry: %w", err) } - if err := metrics.InitAllMetrics(); err != nil { - return err - } - if metrics.IsEnabled() { + if otelProvider != nil && otelProvider.Shutdown != nil { closers = append(closers, func() { shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - if err := metrics.Shutdown(shutdownCtx); err != nil { - log.Errorf(ctx, err, "Failed to shutdown metrics: %v", err) + if err := otelProvider.Shutdown(shutdownCtx); err != nil { + log.Errorf(ctx, err, "Failed to shutdown telemetry: %v", err) } }) } @@ -158,7 +152,7 @@ func run(ctx context.Context, configPath string) error { // Initialize HTTP server. log.Infof(ctx, "Initializing HTTP server") - srv, err := newServerFunc(ctx, mgr, cfg) + srv, err := newServerFunc(ctx, mgr, cfg, otelProvider) if err != nil { return fmt.Errorf("failed to initialize server: %w", err) } diff --git a/cmd/adapter/main_test.go b/cmd/adapter/main_test.go index eef53c6..46ecc16 100644 --- a/cmd/adapter/main_test.go +++ b/cmd/adapter/main_test.go @@ -15,6 +15,7 @@ import ( "github.com/beckn-one/beckn-onix/core/module/handler" "github.com/beckn-one/beckn-onix/pkg/plugin" "github.com/beckn-one/beckn-onix/pkg/plugin/definition" + "github.com/beckn-one/beckn-onix/pkg/telemetry" "github.com/stretchr/testify/mock" ) @@ -119,7 +120,7 @@ func TestRunSuccess(t *testing.T) { defer func() { newManagerFunc = originalNewManager }() originalNewServer := newServerFunc - newServerFunc = func(ctx context.Context, mgr handler.PluginManager, cfg *Config) (http.Handler, error) { + newServerFunc = func(ctx context.Context, mgr handler.PluginManager, cfg *Config, provider *telemetry.Provider) (http.Handler, error) { return http.NewServeMux(), nil } defer func() { newServerFunc = originalNewServer }() @@ -177,7 +178,7 @@ func TestRunFailure(t *testing.T) { defer func() { newManagerFunc = originalNewManager }() originalNewServer := newServerFunc - newServerFunc = func(ctx context.Context, mgr handler.PluginManager, cfg *Config) (http.Handler, error) { + newServerFunc = func(ctx context.Context, mgr handler.PluginManager, cfg *Config, provider *telemetry.Provider) (http.Handler, error) { return tt.mockServer(ctx, mgr, cfg) } defer func() { newServerFunc = originalNewServer }() @@ -308,7 +309,7 @@ func TestNewServerSuccess(t *testing.T) { }, } - handler, err := newServer(context.Background(), mockMgr, cfg) + handler, err := newServer(context.Background(), mockMgr, cfg, nil) if err != nil { t.Errorf("Expected no error, but got: %v", err) @@ -353,7 +354,7 @@ func TestNewServerFailure(t *testing.T) { }, } - handler, err := newServer(context.Background(), mockMgr, cfg) + handler, err := newServer(context.Background(), mockMgr, cfg, nil) if err == nil { t.Errorf("Expected an error, but got nil") diff --git a/cmd/adapter/metrics_integration_test.go b/cmd/adapter/metrics_integration_test.go new file mode 100644 index 0000000..b7648c7 --- /dev/null +++ b/cmd/adapter/metrics_integration_test.go @@ -0,0 +1,31 @@ +package main + +import ( + "context" + "net/http/httptest" + "testing" + + "github.com/beckn-one/beckn-onix/pkg/telemetry" + "github.com/stretchr/testify/require" +) + +func TestMetricsEndpointExposesPrometheus(t *testing.T) { + ctx := context.Background() + provider, err := telemetry.NewProvider(ctx, &telemetry.Config{ + ServiceName: "test-onix", + ServiceVersion: "1.0.0", + EnableMetrics: true, + Environment: "test", + }) + require.NoError(t, err) + defer provider.Shutdown(context.Background()) + + rec := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/metrics", nil) + provider.MetricsHandler.ServeHTTP(rec, req) + + require.Equal(t, 200, rec.Code) + body := rec.Body.String() + require.Contains(t, body, "# HELP") + require.Contains(t, body, "# TYPE") +} diff --git a/config/local-simple.yaml b/config/local-simple.yaml index 548f4c5..287e50f 100644 --- a/config/local-simple.yaml +++ b/config/local-simple.yaml @@ -8,6 +8,11 @@ log: - message_id - subscriber_id - module_id +telemetry: + serviceName: "beckn-onix" + serviceVersion: "1.0.0" + enableMetrics: true + environment: "development" http: port: 8081 timeout: @@ -63,6 +68,9 @@ modules: config: uuidKeys: transaction_id,message_id role: bap + - id: otelmetrics + config: + enabled: "true" steps: - validateSign - addRoute @@ -154,6 +162,10 @@ modules: id: router config: routingConfig: ./config/local-simple-routing-BPPReceiver.yaml + middleware: + - id: otelmetrics + config: + enabled: "true" steps: - validateSign - addRoute @@ -195,6 +207,10 @@ modules: routingConfig: ./config/local-simple-routing.yaml signer: id: signer + middleware: + - id: otelmetrics + config: + enabled: "true" steps: - addRoute - sign diff --git a/core/module/handler/stdHandler.go b/core/module/handler/stdHandler.go index 1bb0e9a..7daff1a 100644 --- a/core/module/handler/stdHandler.go +++ b/core/module/handler/stdHandler.go @@ -9,11 +9,11 @@ import ( "net/http/httputil" "github.com/beckn-one/beckn-onix/pkg/log" - "github.com/beckn-one/beckn-onix/pkg/metrics" "github.com/beckn-one/beckn-onix/pkg/model" "github.com/beckn-one/beckn-onix/pkg/plugin" "github.com/beckn-one/beckn-onix/pkg/plugin/definition" "github.com/beckn-one/beckn-onix/pkg/response" + "github.com/beckn-one/beckn-onix/pkg/telemetry" ) // stdHandler orchestrates the execution of defined processing steps. @@ -30,6 +30,7 @@ type stdHandler struct { SubscriberID string role model.Role httpClient *http.Client + moduleName string } // newHTTPClient creates a new HTTP client with a custom transport configuration. @@ -52,19 +53,17 @@ func newHTTPClient(cfg *HttpClientConfig) *http.Client { transport.ResponseHeaderTimeout = cfg.ResponseHeaderTimeout } - // Wrap transport with metrics tracking for outbound requests - wrappedTransport := metrics.WrapHTTPTransport(transport) - - return &http.Client{Transport: wrappedTransport} + return &http.Client{Transport: transport} } // NewStdHandler initializes a new processor with plugins and steps. -func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Handler, error) { +func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config, moduleName string) (http.Handler, error) { h := &stdHandler{ steps: []definition.Step{}, SubscriberID: cfg.SubscriberID, role: cfg.Role, httpClient: newHTTPClient(&cfg.HttpClientConfig), + moduleName: moduleName, } // Initialize plugins. if err := h.initPlugins(ctx, mgr, &cfg.Plugins); err != nil { @@ -79,12 +78,8 @@ func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Ha // ServeHTTP processes an incoming HTTP request and executes defined processing steps. func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // Track inbound request - host := r.Host - if host == "" { - host = r.URL.Host - } - metrics.RecordInboundRequest(r.Context(), host) + r.Header.Set("X-Module-Name", h.moduleName) + r.Header.Set("X-Role", string(h.role)) ctx, err := h.stepCtx(r, w.Header()) if err != nil { @@ -94,35 +89,14 @@ func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } log.Request(r.Context(), r, ctx.Body) - // Track validation steps - signValidated := false - schemaValidated := false - // Execute processing steps. for _, step := range h.steps { - stepName := fmt.Sprintf("%T", step) - // Check if this is a validation step - if stepName == "*step.validateSignStep" { - signValidated = true - } - if stepName == "*step.validateSchemaStep" { - schemaValidated = true - } - if err := step.Run(ctx); err != nil { log.Errorf(ctx, err, "%T.run(%v):%v", step, ctx, err) response.SendNack(ctx, w, err) return } } - - // Record validation metrics after successful execution - if signValidated { - metrics.RecordInboundSignValidation(ctx, host) - } - if schemaValidated { - metrics.RecordInboundSchemaValidation(ctx, host) - } // Restore request body before forwarding or publishing. r.Body = io.NopCloser(bytes.NewReader(ctx.Body)) if ctx.Route == nil { @@ -130,6 +104,10 @@ func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + // These headers are only needed for internal instrumentation; avoid leaking them downstream. + r.Header.Del("X-Module-Name") + r.Header.Del("X-Role") + // Handle routing based on the defined route type. route(ctx, r, w, h.publisher, h.httpClient) } @@ -320,7 +298,13 @@ func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Conf if err != nil { return err } - h.steps = append(h.steps, s) + instrumentedStep, wrapErr := telemetry.NewInstrumentedStep(s, step, h.moduleName) + if wrapErr != nil { + log.Warnf(ctx, "Failed to instrument step %s: %v", step, wrapErr) + h.steps = append(h.steps, s) + continue + } + h.steps = append(h.steps, instrumentedStep) } log.Infof(ctx, "Processor steps initialized: %v", cfg.Steps) return nil diff --git a/core/module/handler/step.go b/core/module/handler/step.go index 2464daf..35c09c3 100644 --- a/core/module/handler/step.go +++ b/core/module/handler/step.go @@ -2,13 +2,17 @@ package handler import ( "context" + "encoding/json" "fmt" "strings" "time" + "go.opentelemetry.io/otel/metric" + "github.com/beckn-one/beckn-onix/pkg/log" "github.com/beckn-one/beckn-onix/pkg/model" "github.com/beckn-one/beckn-onix/pkg/plugin/definition" + "github.com/beckn-one/beckn-onix/pkg/telemetry" ) // signStep represents the signing step in the processing pipeline. @@ -68,6 +72,7 @@ func (s *signStep) generateAuthHeader(subID, keyID string, createdAt, validTill type validateSignStep struct { validator definition.SignValidator km definition.KeyManager + metrics *telemetry.Metrics } // newValidateSignStep initializes and returns a new validate sign step. @@ -78,11 +83,22 @@ func newValidateSignStep(signValidator definition.SignValidator, km definition.K if km == nil { return nil, fmt.Errorf("invalid config: KeyManager plugin not configured") } - return &validateSignStep{validator: signValidator, km: km}, nil + metrics, _ := telemetry.GetMetrics(context.Background()) + return &validateSignStep{ + validator: signValidator, + km: km, + metrics: metrics, + }, nil } // Run executes the validation step. func (s *validateSignStep) Run(ctx *model.StepContext) error { + err := s.validateHeaders(ctx) + s.recordMetrics(ctx, err) + return err +} + +func (s *validateSignStep) validateHeaders(ctx *model.StepContext) error { unauthHeader := fmt.Sprintf("Signature realm=\"%s\",headers=\"(created) (expires) digest\"", ctx.SubID) headerValue := ctx.Request.Header.Get(model.AuthHeaderGateway) if len(headerValue) != 0 { @@ -123,6 +139,18 @@ func (s *validateSignStep) validate(ctx *model.StepContext, value string) error return nil } +func (s *validateSignStep) recordMetrics(ctx *model.StepContext, err error) { + if s.metrics == nil { + return + } + status := "success" + if err != nil { + status = "failed" + } + s.metrics.SignatureValidationsTotal.Add(ctx.Context, 1, + metric.WithAttributes(telemetry.AttrStatus.String(status))) +} + // ParsedKeyID holds the components from the parsed Authorization header's keyId. type authHeader struct { SubscriberID string @@ -165,6 +193,7 @@ func parseHeader(header string) (*authHeader, error) { // validateSchemaStep represents the schema validation step. type validateSchemaStep struct { validator definition.SchemaValidator + metrics *telemetry.Metrics } // newValidateSchemaStep creates and returns the validateSchema step after validation. @@ -173,20 +202,43 @@ func newValidateSchemaStep(schemaValidator definition.SchemaValidator) (definiti return nil, fmt.Errorf("invalid config: SchemaValidator plugin not configured") } log.Debug(context.Background(), "adding schema validator") - return &validateSchemaStep{validator: schemaValidator}, nil + metrics, _ := telemetry.GetMetrics(context.Background()) + return &validateSchemaStep{ + validator: schemaValidator, + metrics: metrics, + }, nil } // Run executes the schema validation step. func (s *validateSchemaStep) Run(ctx *model.StepContext) error { - if err := s.validator.Validate(ctx, ctx.Request.URL, ctx.Body); err != nil { - return fmt.Errorf("schema validation failed: %w", err) + err := s.validator.Validate(ctx, ctx.Request.URL, ctx.Body) + if err != nil { + err = fmt.Errorf("schema validation failed: %w", err) } - return nil + s.recordMetrics(ctx, err) + return err +} + +func (s *validateSchemaStep) recordMetrics(ctx *model.StepContext, err error) { + if s.metrics == nil { + return + } + status := "success" + if err != nil { + status = "failed" + } + version := extractSchemaVersion(ctx.Body) + s.metrics.SchemaValidationsTotal.Add(ctx.Context, 1, + metric.WithAttributes( + telemetry.AttrSchemaVersion.String(version), + telemetry.AttrStatus.String(status), + )) } // addRouteStep represents the route determination step. type addRouteStep struct { - router definition.Router + router definition.Router + metrics *telemetry.Metrics } // newAddRouteStep creates and returns the addRoute step after validation. @@ -194,7 +246,11 @@ func newAddRouteStep(router definition.Router) (definition.Step, error) { if router == nil { return nil, fmt.Errorf("invalid config: Router plugin not configured") } - return &addRouteStep{router: router}, nil + metrics, _ := telemetry.GetMetrics(context.Background()) + return &addRouteStep{ + router: router, + metrics: metrics, + }, nil } // Run executes the routing step. @@ -208,5 +264,31 @@ func (s *addRouteStep) Run(ctx *model.StepContext) error { PublisherID: route.PublisherID, URL: route.URL, } + if s.metrics != nil && ctx.Route != nil { + s.metrics.RoutingDecisionsTotal.Add(ctx.Context, 1, + metric.WithAttributes( + telemetry.AttrRouteType.String(ctx.Route.TargetType), + telemetry.AttrTargetType.String(ctx.Route.TargetType), + )) + } return nil -} \ No newline at end of file +} + +func extractSchemaVersion(body []byte) string { + type contextEnvelope struct { + Context struct { + Version string `json:"version"` + CoreVersion string `json:"core_version"` + } `json:"context"` + } + var payload contextEnvelope + if err := json.Unmarshal(body, &payload); err == nil { + if payload.Context.CoreVersion != "" { + return payload.Context.CoreVersion + } + if payload.Context.Version != "" { + return payload.Context.Version + } + } + return "unknown" +} diff --git a/core/module/module.go b/core/module/module.go index 3a653d5..6165e2e 100644 --- a/core/module/module.go +++ b/core/module/module.go @@ -7,7 +7,6 @@ import ( "github.com/beckn-one/beckn-onix/core/module/handler" "github.com/beckn-one/beckn-onix/pkg/log" - "github.com/beckn-one/beckn-onix/pkg/metrics" "github.com/beckn-one/beckn-onix/pkg/model" ) @@ -19,7 +18,7 @@ type Config struct { } // Provider represents a function that initializes an HTTP handler using a PluginManager. -type Provider func(ctx context.Context, mgr handler.PluginManager, cfg *handler.Config) (http.Handler, error) +type Provider func(ctx context.Context, mgr handler.PluginManager, cfg *handler.Config, moduleName string) (http.Handler, error) // handlerProviders maintains a mapping of handler types to their respective providers. var handlerProviders = map[handler.Type]Provider{ @@ -30,8 +29,6 @@ var handlerProviders = map[handler.Type]Provider{ // It iterates over the module configurations, retrieves appropriate handler providers, // and registers the handlers with the HTTP multiplexer. func Register(ctx context.Context, mCfgs []Config, mux *http.ServeMux, mgr handler.PluginManager) error { - mux.Handle("/health", metrics.HTTPMiddleware(http.HandlerFunc(handler.HealthHandler), "/health")) - log.Debugf(ctx, "Registering modules with config: %#v", mCfgs) // Iterate over the handlers in the configuration. for _, c := range mCfgs { @@ -39,7 +36,7 @@ func Register(ctx context.Context, mCfgs []Config, mux *http.ServeMux, mgr handl if !ok { return fmt.Errorf("invalid module : %s", c.Name) } - h, err := rmp(ctx, mgr, &c.Handler) + h, err := rmp(ctx, mgr, &c.Handler, c.Name) if err != nil { return fmt.Errorf("%s : %w", c.Name, err) } @@ -49,8 +46,6 @@ func Register(ctx context.Context, mCfgs []Config, mux *http.ServeMux, mgr handl } h = moduleCtxMiddleware(c.Name, h) - // Wrap handler with metrics middleware. - h = metrics.HTTPMiddleware(h, c.Path) log.Debugf(ctx, "Registering handler %s, of type %s @ %s", c.Name, c.Handler.Type, c.Path) mux.Handle(c.Path, h) } @@ -84,4 +79,4 @@ func moduleCtxMiddleware(moduleName string, next http.Handler) http.Handler { ctx := context.WithValue(r.Context(), model.ContextKeyModuleID, moduleName) next.ServeHTTP(w, r.WithContext(ctx)) }) -} \ No newline at end of file +} diff --git a/core/module/module_test.go b/core/module/module_test.go index 9ceaaf3..f3fe48d 100644 --- a/core/module/module_test.go +++ b/core/module/module_test.go @@ -123,15 +123,6 @@ func TestRegisterSuccess(t *testing.T) { if capturedModuleName != "test-module" { t.Errorf("expected module_id in context to be 'test-module', got %v", capturedModuleName) } - // Verifying /health endpoint registration - reqHealth := httptest.NewRequest(http.MethodGet, "/health", nil) - recHealth := httptest.NewRecorder() - mux.ServeHTTP(recHealth, reqHealth) - - if status := recHealth.Code; status != http.StatusOK { - t.Errorf("handler for /health returned wrong status code: got %v want %v", - status, http.StatusOK) - } } // TestRegisterFailure tests scenarios where the handler registration should fail. diff --git a/go.mod b/go.mod index fc54961..e07e6ad 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,6 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/felixge/httpsnoop v1.0.3 // indirect github.com/go-jose/go-jose/v4 v4.0.1 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -48,6 +47,7 @@ require ( github.com/redis/go-redis/extra/rediscmd/v9 v9.16.0 // indirect github.com/ryanuber/go-glob v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/contrib/instrumentation/runtime v0.63.0 // indirect go.opentelemetry.io/otel/trace v1.38.0 // indirect golang.org/x/net v0.38.0 // indirect golang.org/x/sys v0.35.0 // indirect @@ -64,8 +64,6 @@ require ( github.com/redis/go-redis/extra/redisotel/v9 v9.16.0 github.com/redis/go-redis/v9 v9.16.0 github.com/rs/zerolog v1.34.0 - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0 - go.opentelemetry.io/contrib/instrumentation/runtime v0.63.0 go.opentelemetry.io/otel v1.38.0 go.opentelemetry.io/otel/exporters/prometheus v0.46.0 go.opentelemetry.io/otel/metric v1.38.0 diff --git a/go.sum b/go.sum index 05ecf66..31324a6 100644 --- a/go.sum +++ b/go.sum @@ -22,8 +22,6 @@ github.com/dlclark/regexp2 v1.11.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cn github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= -github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= -github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/go-jose/go-jose/v4 v4.0.1 h1:QVEPDE3OluqXBQZDcnNvQrInro2h0e4eqNbnZSWqS6U= github.com/go-jose/go-jose/v4 v4.0.1/go.mod h1:WVf9LFMHh/QVrmqrOfqun0C45tMe3RoiKJMPvgWwLfY= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -125,8 +123,6 @@ github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03 h1:m1h+vudopHsI67F github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03/go.mod h1:8sheVFH84v3PCyFY/O02mIgSQY9I6wMYPWsq7mDnEZY= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0 h1:KfYpVmrjI7JuToy5k8XV3nkapjWx48k4E4JOtVstzQI= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0/go.mod h1:SeQhzAEccGVZVEy7aH87Nh0km+utSpo1pTv6eMMop48= go.opentelemetry.io/contrib/instrumentation/runtime v0.63.0 h1:PeBoRj6af6xMI7qCupwFvTbbnd49V7n5YpG6pg8iDYQ= go.opentelemetry.io/contrib/instrumentation/runtime v0.63.0/go.mod h1:ingqBCtMCe8I4vpz/UVzCW6sxoqgZB37nao91mLQ3Bw= go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= diff --git a/install/build-plugins.sh b/install/build-plugins.sh index bcf1275..f5a5a77 100755 --- a/install/build-plugins.sh +++ b/install/build-plugins.sh @@ -16,6 +16,7 @@ plugins=( "registry" "dediregistry" "reqpreprocessor" + "otelmetrics" "router" "schemavalidator" "signer" diff --git a/pkg/metrics/http.go b/pkg/metrics/http.go deleted file mode 100644 index 8a0d04e..0000000 --- a/pkg/metrics/http.go +++ /dev/null @@ -1,24 +0,0 @@ -package metrics - -import ( - "net/http" - - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" -) - -// HTTPMiddleware wraps an HTTP handler with OpenTelemetry instrumentation. -func HTTPMiddleware(handler http.Handler, operation string) http.Handler { - if !IsEnabled() { - return handler - } - - return otelhttp.NewHandler( - handler, - operation, - ) -} - -// HTTPHandler wraps an HTTP handler function with OpenTelemetry instrumentation. -func HTTPHandler(handler http.HandlerFunc, operation string) http.Handler { - return HTTPMiddleware(handler, operation) -} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go deleted file mode 100644 index 90ac602..0000000 --- a/pkg/metrics/metrics.go +++ /dev/null @@ -1,186 +0,0 @@ -package metrics - -import ( - "context" - "errors" - "fmt" - "net/http" - "sync" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - otelprom "go.opentelemetry.io/otel/exporters/prometheus" - otelmetric "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" -) - -var ( - mp *metric.MeterProvider - meter otelmetric.Meter - prometheusRegistry *prometheus.Registry - once sync.Once - shutdownFunc func(context.Context) error - ErrInvalidExporter = errors.New("invalid metrics exporter type") - ErrMetricsNotInit = errors.New("metrics not initialized") -) - -// ExporterType represents the type of metrics exporter. -type ExporterType string - -const ( - // ExporterPrometheus exports metrics in Prometheus format. - ExporterPrometheus ExporterType = "prometheus" -) - -// Config represents the configuration for metrics. -type Config struct { - Enabled bool `yaml:"enabled"` - ExporterType ExporterType `yaml:"exporterType"` - ServiceName string `yaml:"serviceName"` - ServiceVersion string `yaml:"serviceVersion"` - Prometheus PrometheusConfig `yaml:"prometheus"` -} - -// PrometheusConfig represents Prometheus exporter configuration. -type PrometheusConfig struct { - Port string `yaml:"port"` - Path string `yaml:"path"` -} - -// validate validates the metrics configuration. -func (c *Config) validate() error { - if !c.Enabled { - return nil - } - - if c.ExporterType != ExporterPrometheus { - return fmt.Errorf("%w: %s", ErrInvalidExporter, c.ExporterType) - } - - if c.ServiceName == "" { - c.ServiceName = "beckn-onix" - } - - return nil -} - -// InitMetrics initializes the OpenTelemetry metrics SDK. -func InitMetrics(cfg Config) error { - if !cfg.Enabled { - return nil - } - - var initErr error - once.Do(func() { - if initErr = cfg.validate(); initErr != nil { - return - } - - // Create resource with service information. - attrs := []attribute.KeyValue{ - attribute.String("service.name", cfg.ServiceName), - } - if cfg.ServiceVersion != "" { - attrs = append(attrs, attribute.String("service.version", cfg.ServiceVersion)) - } - res, err := resource.New( - context.Background(), - resource.WithAttributes(attrs...), - ) - if err != nil { - initErr = fmt.Errorf("failed to create resource: %w", err) - return - } - - // Always create Prometheus exporter for /metrics endpoint - // Create a custom registry for the exporter so we can use it for HTTP serving - promRegistry := prometheus.NewRegistry() - promExporter, err := otelprom.New(otelprom.WithRegisterer(promRegistry)) - if err != nil { - initErr = fmt.Errorf("failed to create Prometheus exporter: %w", err) - return - } - prometheusRegistry = promRegistry - - // Create readers based on configuration. - var readers []metric.Reader - - // Always add Prometheus reader for /metrics endpoint - readers = append(readers, promExporter) - - // Create meter provider with all readers - opts := []metric.Option{ - metric.WithResource(res), - } - for _, reader := range readers { - opts = append(opts, metric.WithReader(reader)) - } - mp = metric.NewMeterProvider(opts...) - - // Set global meter provider. - otel.SetMeterProvider(mp) - - // Create meter for this package. - meter = mp.Meter("github.com/beckn-one/beckn-onix") - - // Store shutdown function. - shutdownFunc = func(ctx context.Context) error { - return mp.Shutdown(ctx) - } - }) - - return initErr -} - -// GetMeter returns the global meter instance. -func GetMeter() otelmetric.Meter { - if meter == nil { - // Return a no-op meter if not initialized. - return otel.Meter("noop") - } - return meter -} - -// Shutdown gracefully shuts down the metrics provider. -func Shutdown(ctx context.Context) error { - if shutdownFunc == nil { - return nil - } - return shutdownFunc(ctx) -} - -// IsEnabled returns whether metrics are enabled. -func IsEnabled() bool { - return mp != nil -} - -// MetricsHandler returns the HTTP handler for the /metrics endpoint. -// Returns nil if metrics are not enabled. -func MetricsHandler() http.Handler { - if prometheusRegistry == nil { - return nil - } - // Use promhttp to serve the Prometheus registry - return promhttp.HandlerFor(prometheusRegistry, promhttp.HandlerOpts{}) -} - -// InitAllMetrics initializes all metrics subsystems. -// This includes request metrics and runtime metrics. -// Returns an error if any initialization fails. -func InitAllMetrics() error { - if !IsEnabled() { - return nil - } - - if err := InitRequestMetrics(); err != nil { - return fmt.Errorf("failed to initialize request metrics: %w", err) - } - if err := InitRuntimeMetrics(); err != nil { - return fmt.Errorf("failed to initialize runtime metrics: %w", err) - } - - return nil -} diff --git a/pkg/metrics/requests.go b/pkg/metrics/requests.go deleted file mode 100644 index 4d57bb9..0000000 --- a/pkg/metrics/requests.go +++ /dev/null @@ -1,200 +0,0 @@ -package metrics - -import ( - "context" - "net/http" - "strconv" - "time" - - "go.opentelemetry.io/otel/attribute" - otelmetric "go.opentelemetry.io/otel/metric" -) - -var ( - // Inbound request metrics - inboundRequestsTotal otelmetric.Int64Counter - inboundSignValidationTotal otelmetric.Int64Counter - inboundSchemaValidationTotal otelmetric.Int64Counter - - // Outbound request metrics - outboundRequestsTotal otelmetric.Int64Counter - outboundRequests2XX otelmetric.Int64Counter - outboundRequests4XX otelmetric.Int64Counter - outboundRequests5XX otelmetric.Int64Counter - outboundRequestDuration otelmetric.Float64Histogram -) - -// InitRequestMetrics initializes request-related metrics instruments. -func InitRequestMetrics() error { - if !IsEnabled() { - return nil - } - - meter := GetMeter() - var err error - - // Inbound request metrics - inboundRequestsTotal, err = meter.Int64Counter( - "beckn.inbound.requests.total", - otelmetric.WithDescription("Total number of inbound requests per host"), - ) - if err != nil { - return err - } - - inboundSignValidationTotal, err = meter.Int64Counter( - "beckn.inbound.sign_validation.total", - otelmetric.WithDescription("Total number of inbound requests with sign validation per host"), - ) - if err != nil { - return err - } - - inboundSchemaValidationTotal, err = meter.Int64Counter( - "beckn.inbound.schema_validation.total", - otelmetric.WithDescription("Total number of inbound requests with schema validation per host"), - ) - if err != nil { - return err - } - - // Outbound request metrics - outboundRequestsTotal, err = meter.Int64Counter( - "beckn.outbound.requests.total", - otelmetric.WithDescription("Total number of outbound requests per host"), - ) - if err != nil { - return err - } - - outboundRequests2XX, err = meter.Int64Counter( - "beckn.outbound.requests.2xx", - otelmetric.WithDescription("Total number of outbound requests with 2XX status code per host"), - ) - if err != nil { - return err - } - - outboundRequests4XX, err = meter.Int64Counter( - "beckn.outbound.requests.4xx", - otelmetric.WithDescription("Total number of outbound requests with 4XX status code per host"), - ) - if err != nil { - return err - } - - outboundRequests5XX, err = meter.Int64Counter( - "beckn.outbound.requests.5xx", - otelmetric.WithDescription("Total number of outbound requests with 5XX status code per host"), - ) - if err != nil { - return err - } - - // Outbound request duration histogram (for p99, p95, p75) - outboundRequestDuration, err = meter.Float64Histogram( - "beckn.outbound.request.duration", - otelmetric.WithDescription("Duration of outbound requests in milliseconds"), - otelmetric.WithUnit("ms"), - ) - if err != nil { - return err - } - - return nil -} - -// RecordInboundRequest records an inbound request. -func RecordInboundRequest(ctx context.Context, host string) { - if inboundRequestsTotal == nil { - return - } - inboundRequestsTotal.Add(ctx, 1, otelmetric.WithAttributes( - attribute.String("host", host), - )) -} - -// RecordInboundSignValidation records an inbound request with sign validation. -func RecordInboundSignValidation(ctx context.Context, host string) { - if inboundSignValidationTotal == nil { - return - } - inboundSignValidationTotal.Add(ctx, 1, otelmetric.WithAttributes( - attribute.String("host", host), - )) -} - -// RecordInboundSchemaValidation records an inbound request with schema validation. -func RecordInboundSchemaValidation(ctx context.Context, host string) { - if inboundSchemaValidationTotal == nil { - return - } - inboundSchemaValidationTotal.Add(ctx, 1, otelmetric.WithAttributes( - attribute.String("host", host), - )) -} - -// RecordOutboundRequest records an outbound request with status code and duration. -func RecordOutboundRequest(ctx context.Context, host string, statusCode int, duration time.Duration) { - if outboundRequestsTotal == nil { - return - } - - attrs := []attribute.KeyValue{ - attribute.String("host", host), - attribute.String("status_code", strconv.Itoa(statusCode)), - } - - // Record total - outboundRequestsTotal.Add(ctx, 1, otelmetric.WithAttributes(attrs...)) - - // Record by status code category - statusClass := statusCode / 100 - switch statusClass { - case 2: - outboundRequests2XX.Add(ctx, 1, otelmetric.WithAttributes(attrs...)) - case 4: - outboundRequests4XX.Add(ctx, 1, otelmetric.WithAttributes(attrs...)) - case 5: - outboundRequests5XX.Add(ctx, 1, otelmetric.WithAttributes(attrs...)) - } - - // Record duration for percentile calculations (p99, p95, p75) - if outboundRequestDuration != nil { - outboundRequestDuration.Record(ctx, float64(duration.Milliseconds()), otelmetric.WithAttributes(attrs...)) - } -} - -// HTTPTransport wraps an http.RoundTripper to track outbound request metrics. -type HTTPTransport struct { - Transport http.RoundTripper -} - -// RoundTrip implements http.RoundTripper interface and tracks metrics. -func (t *HTTPTransport) RoundTrip(req *http.Request) (*http.Response, error) { - start := time.Now() - host := req.URL.Host - - resp, err := t.Transport.RoundTrip(req) - - duration := time.Since(start) - statusCode := 0 - if resp != nil { - statusCode = resp.StatusCode - } else if err != nil { - // Network error - treat as 5XX - statusCode = 500 - } - - RecordOutboundRequest(req.Context(), host, statusCode, duration) - - return resp, err -} - -// WrapHTTPTransport wraps an http.RoundTripper with metrics tracking. -func WrapHTTPTransport(transport http.RoundTripper) http.RoundTripper { - if !IsEnabled() { - return transport - } - return &HTTPTransport{Transport: transport} -} diff --git a/pkg/metrics/requests_test.go b/pkg/metrics/requests_test.go deleted file mode 100644 index 8e40d61..0000000 --- a/pkg/metrics/requests_test.go +++ /dev/null @@ -1,346 +0,0 @@ -package metrics - -import ( - "context" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestInitRequestMetrics(t *testing.T) { - tests := []struct { - name string - enabled bool - wantError bool - }{ - { - name: "metrics enabled", - enabled: true, - wantError: false, - }, - { - name: "metrics disabled", - enabled: false, - wantError: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Setup: Initialize metrics with enabled state - cfg := Config{ - Enabled: tt.enabled, - ExporterType: ExporterPrometheus, - ServiceName: "test-service", - } - err := InitMetrics(cfg) - require.NoError(t, err) - - // Test InitRequestMetrics - err = InitRequestMetrics() - if tt.wantError { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - - // Cleanup - Shutdown(context.Background()) - }) - } -} - -func TestRecordInboundRequest(t *testing.T) { - // Setup - cfg := Config{ - Enabled: true, - ExporterType: ExporterPrometheus, - ServiceName: "test-service", - } - err := InitMetrics(cfg) - require.NoError(t, err) - defer Shutdown(context.Background()) - - err = InitRequestMetrics() - require.NoError(t, err) - - ctx := context.Background() - host := "example.com" - - // Test: Record inbound request - RecordInboundRequest(ctx, host) - - // Verify: No error should occur - // Note: We can't easily verify the metric value without exporting, - // but we can verify the function doesn't panic - assert.NotPanics(t, func() { - RecordInboundRequest(ctx, host) - }) -} - -func TestRecordInboundSignValidation(t *testing.T) { - // Setup - cfg := Config{ - Enabled: true, - ExporterType: ExporterPrometheus, - ServiceName: "test-service", - } - err := InitMetrics(cfg) - require.NoError(t, err) - defer Shutdown(context.Background()) - - err = InitRequestMetrics() - require.NoError(t, err) - - ctx := context.Background() - host := "example.com" - - // Test: Record sign validation - RecordInboundSignValidation(ctx, host) - - // Verify: No error should occur - assert.NotPanics(t, func() { - RecordInboundSignValidation(ctx, host) - }) -} - -func TestRecordInboundSchemaValidation(t *testing.T) { - // Setup - cfg := Config{ - Enabled: true, - ExporterType: ExporterPrometheus, - ServiceName: "test-service", - } - err := InitMetrics(cfg) - require.NoError(t, err) - defer Shutdown(context.Background()) - - err = InitRequestMetrics() - require.NoError(t, err) - - ctx := context.Background() - host := "example.com" - - // Test: Record schema validation - RecordInboundSchemaValidation(ctx, host) - - // Verify: No error should occur - assert.NotPanics(t, func() { - RecordInboundSchemaValidation(ctx, host) - }) -} - -func TestRecordOutboundRequest(t *testing.T) { - // Setup - cfg := Config{ - Enabled: true, - ExporterType: ExporterPrometheus, - ServiceName: "test-service", - } - err := InitMetrics(cfg) - require.NoError(t, err) - defer Shutdown(context.Background()) - - err = InitRequestMetrics() - require.NoError(t, err) - - ctx := context.Background() - host := "example.com" - - tests := []struct { - name string - statusCode int - duration time.Duration - }{ - { - name: "2XX status code", - statusCode: 200, - duration: 100 * time.Millisecond, - }, - { - name: "4XX status code", - statusCode: 404, - duration: 50 * time.Millisecond, - }, - { - name: "5XX status code", - statusCode: 500, - duration: 200 * time.Millisecond, - }, - { - name: "3XX status code", - statusCode: 301, - duration: 75 * time.Millisecond, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Test: Record outbound request - RecordOutboundRequest(ctx, host, tt.statusCode, tt.duration) - - // Verify: No error should occur - assert.NotPanics(t, func() { - RecordOutboundRequest(ctx, host, tt.statusCode, tt.duration) - }) - }) - } -} - -func TestHTTPTransport_RoundTrip(t *testing.T) { - // Setup - cfg := Config{ - Enabled: true, - ExporterType: ExporterPrometheus, - ServiceName: "test-service", - } - err := InitMetrics(cfg) - require.NoError(t, err) - defer Shutdown(context.Background()) - - err = InitRequestMetrics() - require.NoError(t, err) - - // Create a test server - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Write([]byte("OK")) - })) - defer server.Close() - - // Create transport wrapper - transport := &HTTPTransport{ - Transport: http.DefaultTransport, - } - - // Create request - req, err := http.NewRequest("GET", server.URL, nil) - require.NoError(t, err) - req = req.WithContext(context.Background()) - - // Test: RoundTrip should track metrics - resp, err := transport.RoundTrip(req) - require.NoError(t, err) - require.NotNil(t, resp) - assert.Equal(t, http.StatusOK, resp.StatusCode) - - // Verify: Metrics should be recorded - assert.NotPanics(t, func() { - resp, err = transport.RoundTrip(req) - assert.NoError(t, err) - assert.NotNil(t, resp) - }) -} - -func TestHTTPTransport_RoundTrip_Error(t *testing.T) { - // Setup - cfg := Config{ - Enabled: true, - ExporterType: ExporterPrometheus, - ServiceName: "test-service", - } - err := InitMetrics(cfg) - require.NoError(t, err) - defer Shutdown(context.Background()) - - err = InitRequestMetrics() - require.NoError(t, err) - - // Create transport with invalid URL to cause error - transport := &HTTPTransport{ - Transport: http.DefaultTransport, - } - - // Create request with invalid URL - req, err := http.NewRequest("GET", "http://invalid-host-that-does-not-exist:9999", nil) - require.NoError(t, err) - req = req.WithContext(context.Background()) - - // Test: RoundTrip should handle error and still record metrics - resp, err := transport.RoundTrip(req) - assert.Error(t, err) - assert.Nil(t, resp) - - // Verify: Metrics should still be recorded (with 500 status) - assert.NotPanics(t, func() { - _, _ = transport.RoundTrip(req) - }) -} - -func TestWrapHTTPTransport_Enabled(t *testing.T) { - // Setup - cfg := Config{ - Enabled: true, - ExporterType: ExporterPrometheus, - ServiceName: "test-service", - } - err := InitMetrics(cfg) - require.NoError(t, err) - defer Shutdown(context.Background()) - - // Create a new transport - transport := http.DefaultTransport.(*http.Transport).Clone() - - // Test: Wrap transport - wrapped := WrapHTTPTransport(transport) - - // Verify: Should be wrapped - assert.NotEqual(t, transport, wrapped) - _, ok := wrapped.(*HTTPTransport) - assert.True(t, ok, "Should be wrapped with HTTPTransport") -} - -func TestWrapHTTPTransport_Disabled(t *testing.T) { - // Setup: Initialize metrics with disabled state - cfg := Config{ - Enabled: false, - ExporterType: ExporterPrometheus, - ServiceName: "test-service", - } - err := InitMetrics(cfg) - require.NoError(t, err) - defer Shutdown(context.Background()) - - // Create a new transport - transport := http.DefaultTransport.(*http.Transport).Clone() - - // Test: Wrap transport when metrics disabled - wrapped := WrapHTTPTransport(transport) - - // Verify: When metrics are disabled, IsEnabled() returns false - // So WrapHTTPTransport should return the original transport - // Note: This test verifies the behavior when IsEnabled() returns false - if !IsEnabled() { - assert.Equal(t, transport, wrapped, "Should return original transport when metrics disabled") - } else { - // If metrics are still enabled from previous test, just verify it doesn't panic - assert.NotNil(t, wrapped) - } -} - -func TestRecordInboundRequest_WhenDisabled(t *testing.T) { - // Setup: Metrics disabled - cfg := Config{ - Enabled: false, - ExporterType: ExporterPrometheus, - ServiceName: "test-service", - } - err := InitMetrics(cfg) - require.NoError(t, err) - defer Shutdown(context.Background()) - - ctx := context.Background() - host := "example.com" - - // Test: Should not panic when metrics are disabled - assert.NotPanics(t, func() { - RecordInboundRequest(ctx, host) - RecordInboundSignValidation(ctx, host) - RecordInboundSchemaValidation(ctx, host) - RecordOutboundRequest(ctx, host, 200, time.Second) - }) -} diff --git a/pkg/metrics/runtime.go b/pkg/metrics/runtime.go deleted file mode 100644 index c77995a..0000000 --- a/pkg/metrics/runtime.go +++ /dev/null @@ -1,27 +0,0 @@ -package metrics - -import ( - otelruntime "go.opentelemetry.io/contrib/instrumentation/runtime" -) - -// InitRuntimeMetrics initializes Go runtime metrics instrumentation. -// This includes CPU, memory, GC, and goroutine metrics. -// The runtime instrumentation automatically collects: -// - CPU usage (go_cpu_*) -// - Memory allocation and heap stats (go_memstats_*) -// - GC statistics (go_memstats_gc_*) -// - Goroutine count (go_goroutines) -func InitRuntimeMetrics() error { - if !IsEnabled() { - return nil - } - - // Start OpenTelemetry runtime metrics collection - // This automatically collects Go runtime metrics - err := otelruntime.Start(otelruntime.WithMinimumReadMemStatsInterval(0)) - if err != nil { - return err - } - - return nil -} diff --git a/pkg/metrics/runtime_test.go b/pkg/metrics/runtime_test.go deleted file mode 100644 index 5d1f7a5..0000000 --- a/pkg/metrics/runtime_test.go +++ /dev/null @@ -1,91 +0,0 @@ -package metrics - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestInitRuntimeMetrics(t *testing.T) { - tests := []struct { - name string - enabled bool - wantError bool - }{ - { - name: "metrics enabled", - enabled: true, - wantError: false, - }, - { - name: "metrics disabled", - enabled: false, - wantError: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Setup: Initialize metrics with enabled state - cfg := Config{ - Enabled: tt.enabled, - ExporterType: ExporterPrometheus, - ServiceName: "test-service", - } - err := InitMetrics(cfg) - require.NoError(t, err) - - // Test InitRuntimeMetrics - err = InitRuntimeMetrics() - if tt.wantError { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - - // Cleanup - Shutdown(context.Background()) - }) - } -} - -func TestInitRuntimeMetrics_MultipleCalls(t *testing.T) { - // Setup - cfg := Config{ - Enabled: true, - ExporterType: ExporterPrometheus, - ServiceName: "test-service", - } - err := InitMetrics(cfg) - require.NoError(t, err) - defer Shutdown(context.Background()) - - // Test: Multiple calls should not cause errors - err = InitRuntimeMetrics() - require.NoError(t, err) - - // Note: Second call might fail if runtime.Start is already called, - // but that's expected behavior - err = InitRuntimeMetrics() - // We don't assert on error here as it depends on internal state - _ = err -} - -func TestInitRuntimeMetrics_WhenDisabled(t *testing.T) { - // Setup: Metrics disabled - cfg := Config{ - Enabled: false, - ExporterType: ExporterPrometheus, - ServiceName: "test-service", - } - err := InitMetrics(cfg) - require.NoError(t, err) - defer Shutdown(context.Background()) - - // Test: Should return nil without error when disabled - err = InitRuntimeMetrics() - assert.NoError(t, err) -} - diff --git a/pkg/plugin/implementation/cache/cache.go b/pkg/plugin/implementation/cache/cache.go index a0f5715..2f6f6cc 100644 --- a/pkg/plugin/implementation/cache/cache.go +++ b/pkg/plugin/implementation/cache/cache.go @@ -7,7 +7,11 @@ import ( "os" "time" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/beckn-one/beckn-onix/pkg/telemetry" "github.com/redis/go-redis/extra/redisotel/v9" "github.com/redis/go-redis/v9" ) @@ -32,7 +36,8 @@ type Config struct { // Cache wraps a Redis client to provide basic caching operations. type Cache struct { - Client RedisClient + Client RedisClient + metrics *telemetry.Metrics } // Error variables to describe common failure modes. @@ -92,26 +97,66 @@ func New(ctx context.Context, cfg *Config) (*Cache, func() error, error) { } } + metrics, _ := telemetry.GetMetrics(ctx) + log.Infof(ctx, "Cache connection to Redis established successfully") - return &Cache{Client: client}, client.Close, nil + return &Cache{Client: client, metrics: metrics}, client.Close, nil } // Get retrieves the value for the specified key from Redis. func (c *Cache) Get(ctx context.Context, key string) (string, error) { - return c.Client.Get(ctx, key).Result() + result, err := c.Client.Get(ctx, key).Result() + if c.metrics != nil { + attrs := []attribute.KeyValue{ + telemetry.AttrOperation.String("get"), + } + switch { + case err == redis.Nil: + c.metrics.CacheMissesTotal.Add(ctx, 1, metric.WithAttributes(attrs...)) + c.metrics.CacheOperationsTotal.Add(ctx, 1, + metric.WithAttributes(append(attrs, telemetry.AttrStatus.String("miss"))...)) + case err != nil: + c.metrics.CacheOperationsTotal.Add(ctx, 1, + metric.WithAttributes(append(attrs, telemetry.AttrStatus.String("error"))...)) + default: + c.metrics.CacheHitsTotal.Add(ctx, 1, metric.WithAttributes(attrs...)) + c.metrics.CacheOperationsTotal.Add(ctx, 1, + metric.WithAttributes(append(attrs, telemetry.AttrStatus.String("hit"))...)) + } + } + return result, err } // Set stores the given key-value pair in Redis with the specified TTL (time to live). func (c *Cache) Set(ctx context.Context, key, value string, ttl time.Duration) error { - return c.Client.Set(ctx, key, value, ttl).Err() + err := c.Client.Set(ctx, key, value, ttl).Err() + c.recordOperation(ctx, "set", err) + return err } // Delete removes the specified key from Redis. func (c *Cache) Delete(ctx context.Context, key string) error { - return c.Client.Del(ctx, key).Err() + err := c.Client.Del(ctx, key).Err() + c.recordOperation(ctx, "delete", err) + return err } // Clear removes all keys in the currently selected Redis database. func (c *Cache) Clear(ctx context.Context) error { return c.Client.FlushDB(ctx).Err() } + +func (c *Cache) recordOperation(ctx context.Context, op string, err error) { + if c.metrics == nil { + return + } + status := "success" + if err != nil { + status = "error" + } + c.metrics.CacheOperationsTotal.Add(ctx, 1, + metric.WithAttributes( + telemetry.AttrOperation.String(op), + telemetry.AttrStatus.String(status), + )) +} diff --git a/pkg/plugin/implementation/otelmetrics/cmd/plugin.go b/pkg/plugin/implementation/otelmetrics/cmd/plugin.go new file mode 100644 index 0000000..1e839a2 --- /dev/null +++ b/pkg/plugin/implementation/otelmetrics/cmd/plugin.go @@ -0,0 +1,21 @@ +package main + +import ( + "context" + "net/http" + + "github.com/beckn-one/beckn-onix/pkg/plugin/implementation/otelmetrics" +) + +type middlewareProvider struct{} + +func (middlewareProvider) New(ctx context.Context, cfg map[string]string) (func(http.Handler) http.Handler, error) { + mw, err := otelmetrics.New(ctx, cfg) + if err != nil { + return nil, err + } + return mw.Handler, nil +} + +// Provider is exported for plugin loader. +var Provider = middlewareProvider{} diff --git a/pkg/plugin/implementation/otelmetrics/otelmetrics.go b/pkg/plugin/implementation/otelmetrics/otelmetrics.go new file mode 100644 index 0000000..6c74292 --- /dev/null +++ b/pkg/plugin/implementation/otelmetrics/otelmetrics.go @@ -0,0 +1,134 @@ +package otelmetrics + +import ( + "context" + "net/http" + "strings" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/beckn-one/beckn-onix/pkg/telemetry" +) + +// Middleware instruments inbound HTTP handlers with OpenTelemetry metrics. +type Middleware struct { + metrics *telemetry.Metrics + enabled bool +} + +// New constructs middleware based on plugin configuration. +func New(ctx context.Context, cfg map[string]string) (*Middleware, error) { + enabled := cfg["enabled"] != "false" + + metrics, err := telemetry.GetMetrics(ctx) + if err != nil { + log.Warnf(ctx, "OpenTelemetry metrics unavailable: %v", err) + } + + return &Middleware{ + metrics: metrics, + enabled: enabled, + }, nil +} + +// Handler returns an http.Handler middleware compatible with plugin expectations. +func (m *Middleware) Handler(next http.Handler) http.Handler { + if !m.enabled || m.metrics == nil { + return next + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + action := extractAction(r.URL.Path) + module := r.Header.Get("X-Module-Name") + role := r.Header.Get("X-Role") + + attrs := []attribute.KeyValue{ + telemetry.AttrModule.String(module), + telemetry.AttrRole.String(role), + telemetry.AttrAction.String(action), + telemetry.AttrHTTPMethod.String(r.Method), + } + + m.metrics.HTTPRequestsInFlight.Add(ctx, 1, metric.WithAttributes(attrs...)) + defer m.metrics.HTTPRequestsInFlight.Add(ctx, -1, metric.WithAttributes(attrs...)) + + if r.ContentLength > 0 { + m.metrics.HTTPRequestSize.Record(ctx, r.ContentLength, metric.WithAttributes(attrs...)) + } + + rw := &responseWriter{ResponseWriter: w, statusCode: http.StatusOK} + start := time.Now() + next.ServeHTTP(rw, r) + duration := time.Since(start).Seconds() + + status := "success" + if rw.statusCode >= 400 { + status = "error" + } + + statusAttrs := append(attrs, + telemetry.AttrHTTPStatus.Int(rw.statusCode), + telemetry.AttrStatus.String(status), + ) + + m.metrics.HTTPRequestsTotal.Add(ctx, 1, metric.WithAttributes(statusAttrs...)) + m.metrics.HTTPRequestDuration.Record(ctx, duration, metric.WithAttributes(statusAttrs...)) + if rw.bytesWritten > 0 { + m.metrics.HTTPResponseSize.Record(ctx, int64(rw.bytesWritten), metric.WithAttributes(statusAttrs...)) + } + + if isBecknAction(action) { + m.metrics.BecknMessagesTotal.Add(ctx, 1, + metric.WithAttributes( + telemetry.AttrAction.String(action), + telemetry.AttrRole.String(role), + telemetry.AttrStatus.String(status), + )) + } + }) +} + +type responseWriter struct { + http.ResponseWriter + statusCode int + bytesWritten int +} + +func (rw *responseWriter) WriteHeader(code int) { + rw.statusCode = code + rw.ResponseWriter.WriteHeader(code) +} + +func (rw *responseWriter) Write(b []byte) (int, error) { + n, err := rw.ResponseWriter.Write(b) + rw.bytesWritten += n + return n, err +} + +func extractAction(path string) string { + trimmed := strings.Trim(path, "/") + if trimmed == "" { + return "root" + } + parts := strings.Split(trimmed, "/") + return parts[len(parts)-1] +} + +func isBecknAction(action string) bool { + actions := []string{ + "discover", "select", "init", "confirm", "status", "track", + "cancel", "update", "rating", "support", + "on_discover", "on_select", "on_init", "on_confirm", "on_status", + "on_track", "on_cancel", "on_update", "on_rating", "on_support", + } + for _, a := range actions { + if a == action { + return true + } + } + return false +} diff --git a/pkg/plugin/implementation/router/router_test.go b/pkg/plugin/implementation/router/router_test.go index 9fae926..1045d0f 100644 --- a/pkg/plugin/implementation/router/router_test.go +++ b/pkg/plugin/implementation/router/router_test.go @@ -681,4 +681,4 @@ func TestExcludeActionWithNonURLTargetTypes(t *testing.T) { } }) } -} \ No newline at end of file +} diff --git a/pkg/telemetry/metrics.go b/pkg/telemetry/metrics.go new file mode 100644 index 0000000..27556c5 --- /dev/null +++ b/pkg/telemetry/metrics.go @@ -0,0 +1,222 @@ +package telemetry + +import ( + "context" + "fmt" + "sync" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// Metrics exposes strongly typed metric instruments used across the adapter. +type Metrics struct { + HTTPRequestsTotal metric.Int64Counter + HTTPRequestDuration metric.Float64Histogram + HTTPRequestsInFlight metric.Int64UpDownCounter + HTTPRequestSize metric.Int64Histogram + HTTPResponseSize metric.Int64Histogram + + StepExecutionDuration metric.Float64Histogram + StepExecutionTotal metric.Int64Counter + StepErrorsTotal metric.Int64Counter + + PluginExecutionDuration metric.Float64Histogram + PluginErrorsTotal metric.Int64Counter + + BecknMessagesTotal metric.Int64Counter + SignatureValidationsTotal metric.Int64Counter + SchemaValidationsTotal metric.Int64Counter + CacheOperationsTotal metric.Int64Counter + CacheHitsTotal metric.Int64Counter + CacheMissesTotal metric.Int64Counter + RoutingDecisionsTotal metric.Int64Counter +} + +var ( + metricsInstance *Metrics + metricsOnce sync.Once + metricsErr error +) + +// Attribute keys shared across instruments. +var ( + AttrModule = attribute.Key("module") + AttrSubsystem = attribute.Key("subsystem") + AttrName = attribute.Key("name") + AttrStep = attribute.Key("step") + AttrRole = attribute.Key("role") + AttrAction = attribute.Key("action") + AttrHTTPMethod = attribute.Key("http_method") + AttrHTTPStatus = attribute.Key("http_status_code") + AttrStatus = attribute.Key("status") + AttrErrorType = attribute.Key("error_type") + AttrPluginID = attribute.Key("plugin_id") + AttrPluginType = attribute.Key("plugin_type") + AttrOperation = attribute.Key("operation") + AttrRouteType = attribute.Key("route_type") + AttrTargetType = attribute.Key("target_type") + AttrSchemaVersion = attribute.Key("schema_version") +) + +// GetMetrics lazily initializes instruments and returns a cached reference. +func GetMetrics(ctx context.Context) (*Metrics, error) { + metricsOnce.Do(func() { + metricsInstance, metricsErr = newMetrics() + }) + return metricsInstance, metricsErr +} + +func newMetrics() (*Metrics, error) { + meter := otel.GetMeterProvider().Meter( + "github.com/beckn-one/beckn-onix/telemetry", + metric.WithInstrumentationVersion("1.0.0"), + ) + + m := &Metrics{} + var err error + + if m.HTTPRequestsTotal, err = meter.Int64Counter( + "http_server_requests_total", + metric.WithDescription("Total number of HTTP requests processed"), + metric.WithUnit("{request}"), + ); err != nil { + return nil, fmt.Errorf("http_server_requests_total: %w", err) + } + + if m.HTTPRequestDuration, err = meter.Float64Histogram( + "http_server_request_duration_seconds", + metric.WithDescription("HTTP request duration in seconds"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10), + ); err != nil { + return nil, fmt.Errorf("http_server_request_duration_seconds: %w", err) + } + + if m.HTTPRequestsInFlight, err = meter.Int64UpDownCounter( + "http_server_requests_in_flight", + metric.WithDescription("Number of HTTP requests currently being processed"), + metric.WithUnit("{request}"), + ); err != nil { + return nil, fmt.Errorf("http_server_requests_in_flight: %w", err) + } + + if m.HTTPRequestSize, err = meter.Int64Histogram( + "http_server_request_size_bytes", + metric.WithDescription("Size of HTTP request payloads"), + metric.WithUnit("By"), + metric.WithExplicitBucketBoundaries(100, 1000, 10000, 100000, 1000000), + ); err != nil { + return nil, fmt.Errorf("http_server_request_size_bytes: %w", err) + } + + if m.HTTPResponseSize, err = meter.Int64Histogram( + "http_server_response_size_bytes", + metric.WithDescription("Size of HTTP responses"), + metric.WithUnit("By"), + metric.WithExplicitBucketBoundaries(100, 1000, 10000, 100000, 1000000), + ); err != nil { + return nil, fmt.Errorf("http_server_response_size_bytes: %w", err) + } + + if m.StepExecutionDuration, err = meter.Float64Histogram( + "onix_step_execution_duration_seconds", + metric.WithDescription("Duration of individual processing steps"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5), + ); err != nil { + return nil, fmt.Errorf("onix_step_execution_duration_seconds: %w", err) + } + + if m.StepExecutionTotal, err = meter.Int64Counter( + "onix_step_executions_total", + metric.WithDescription("Total processing step executions"), + metric.WithUnit("{execution}"), + ); err != nil { + return nil, fmt.Errorf("onix_step_executions_total: %w", err) + } + + if m.StepErrorsTotal, err = meter.Int64Counter( + "onix_step_errors_total", + metric.WithDescription("Processing step errors"), + metric.WithUnit("{error}"), + ); err != nil { + return nil, fmt.Errorf("onix_step_errors_total: %w", err) + } + + if m.PluginExecutionDuration, err = meter.Float64Histogram( + "onix_plugin_execution_duration_seconds", + metric.WithDescription("Plugin execution time"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1), + ); err != nil { + return nil, fmt.Errorf("onix_plugin_execution_duration_seconds: %w", err) + } + + if m.PluginErrorsTotal, err = meter.Int64Counter( + "onix_plugin_errors_total", + metric.WithDescription("Plugin level errors"), + metric.WithUnit("{error}"), + ); err != nil { + return nil, fmt.Errorf("onix_plugin_errors_total: %w", err) + } + + if m.BecknMessagesTotal, err = meter.Int64Counter( + "beckn_messages_total", + metric.WithDescription("Total Beckn protocol messages processed"), + metric.WithUnit("{message}"), + ); err != nil { + return nil, fmt.Errorf("beckn_messages_total: %w", err) + } + + if m.SignatureValidationsTotal, err = meter.Int64Counter( + "beckn_signature_validations_total", + metric.WithDescription("Signature validation attempts"), + metric.WithUnit("{validation}"), + ); err != nil { + return nil, fmt.Errorf("beckn_signature_validations_total: %w", err) + } + + if m.SchemaValidationsTotal, err = meter.Int64Counter( + "beckn_schema_validations_total", + metric.WithDescription("Schema validation attempts"), + metric.WithUnit("{validation}"), + ); err != nil { + return nil, fmt.Errorf("beckn_schema_validations_total: %w", err) + } + + if m.CacheOperationsTotal, err = meter.Int64Counter( + "onix_cache_operations_total", + metric.WithDescription("Redis cache operations"), + metric.WithUnit("{operation}"), + ); err != nil { + return nil, fmt.Errorf("onix_cache_operations_total: %w", err) + } + + if m.CacheHitsTotal, err = meter.Int64Counter( + "onix_cache_hits_total", + metric.WithDescription("Redis cache hits"), + metric.WithUnit("{hit}"), + ); err != nil { + return nil, fmt.Errorf("onix_cache_hits_total: %w", err) + } + + if m.CacheMissesTotal, err = meter.Int64Counter( + "onix_cache_misses_total", + metric.WithDescription("Redis cache misses"), + metric.WithUnit("{miss}"), + ); err != nil { + return nil, fmt.Errorf("onix_cache_misses_total: %w", err) + } + + if m.RoutingDecisionsTotal, err = meter.Int64Counter( + "onix_routing_decisions_total", + metric.WithDescription("Routing decisions taken by handler"), + metric.WithUnit("{decision}"), + ); err != nil { + return nil, fmt.Errorf("onix_routing_decisions_total: %w", err) + } + + return m, nil +} diff --git a/pkg/telemetry/metrics_test.go b/pkg/telemetry/metrics_test.go new file mode 100644 index 0000000..ee97c7f --- /dev/null +++ b/pkg/telemetry/metrics_test.go @@ -0,0 +1,33 @@ +package telemetry + +import ( + "context" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNewProviderAndMetrics(t *testing.T) { + ctx := context.Background() + provider, err := NewProvider(ctx, &Config{ + ServiceName: "test-service", + ServiceVersion: "1.0.0", + EnableMetrics: true, + Environment: "test", + }) + require.NoError(t, err) + require.NotNil(t, provider) + require.NotNil(t, provider.MetricsHandler) + + metrics, err := GetMetrics(ctx) + require.NoError(t, err) + require.NotNil(t, metrics) + + rec := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/metrics", nil) + provider.MetricsHandler.ServeHTTP(rec, req) + require.Equal(t, 200, rec.Code) + + require.NoError(t, provider.Shutdown(context.Background())) +} diff --git a/pkg/telemetry/step_instrumentor.go b/pkg/telemetry/step_instrumentor.go new file mode 100644 index 0000000..f4f19d3 --- /dev/null +++ b/pkg/telemetry/step_instrumentor.go @@ -0,0 +1,78 @@ +package telemetry + +import ( + "context" + "errors" + "fmt" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/beckn-one/beckn-onix/pkg/model" + "github.com/beckn-one/beckn-onix/pkg/plugin/definition" +) + +// InstrumentedStep wraps a processing step with telemetry instrumentation. +type InstrumentedStep struct { + step definition.Step + stepName string + moduleName string + metrics *Metrics +} + +// NewInstrumentedStep returns a telemetry enabled wrapper around a definition.Step. +func NewInstrumentedStep(step definition.Step, stepName, moduleName string) (*InstrumentedStep, error) { + metrics, err := GetMetrics(context.Background()) + if err != nil { + return nil, err + } + + return &InstrumentedStep{ + step: step, + stepName: stepName, + moduleName: moduleName, + metrics: metrics, + }, nil +} + +type becknError interface { + BecknError() *model.Error +} + +// Run executes the underlying step and records RED style metrics. +func (is *InstrumentedStep) Run(ctx *model.StepContext) error { + if is.metrics == nil { + return is.step.Run(ctx) + } + + start := time.Now() + err := is.step.Run(ctx) + duration := time.Since(start).Seconds() + + attrs := []attribute.KeyValue{ + AttrModule.String(is.moduleName), + AttrStep.String(is.stepName), + AttrRole.String(string(ctx.Role)), + } + + is.metrics.StepExecutionTotal.Add(ctx.Context, 1, metric.WithAttributes(attrs...)) + is.metrics.StepExecutionDuration.Record(ctx.Context, duration, metric.WithAttributes(attrs...)) + + if err != nil { + errorType := fmt.Sprintf("%T", err) + var becknErr becknError + if errors.As(err, &becknErr) { + if be := becknErr.BecknError(); be != nil && be.Code != "" { + errorType = be.Code + } + } + + errorAttrs := append(attrs, AttrErrorType.String(errorType)) + is.metrics.StepErrorsTotal.Add(ctx.Context, 1, metric.WithAttributes(errorAttrs...)) + log.Errorf(ctx.Context, err, "Step %s failed", is.stepName) + } + + return err +} diff --git a/pkg/telemetry/step_instrumentor_test.go b/pkg/telemetry/step_instrumentor_test.go new file mode 100644 index 0000000..ea67934 --- /dev/null +++ b/pkg/telemetry/step_instrumentor_test.go @@ -0,0 +1,60 @@ +package telemetry + +import ( + "context" + "errors" + "testing" + + "github.com/beckn-one/beckn-onix/pkg/model" + "github.com/stretchr/testify/require" +) + +type stubStep struct { + err error +} + +func (s stubStep) Run(ctx *model.StepContext) error { + return s.err +} + +func TestInstrumentedStepSuccess(t *testing.T) { + ctx := context.Background() + provider, err := NewProvider(ctx, &Config{ + ServiceName: "test-service", + ServiceVersion: "1.0.0", + EnableMetrics: true, + Environment: "test", + }) + require.NoError(t, err) + defer provider.Shutdown(context.Background()) + + step, err := NewInstrumentedStep(stubStep{}, "test-step", "test-module") + require.NoError(t, err) + + stepCtx := &model.StepContext{ + Context: context.Background(), + Role: model.RoleBAP, + } + require.NoError(t, step.Run(stepCtx)) +} + +func TestInstrumentedStepError(t *testing.T) { + ctx := context.Background() + provider, err := NewProvider(ctx, &Config{ + ServiceName: "test-service", + ServiceVersion: "1.0.0", + EnableMetrics: true, + Environment: "test", + }) + require.NoError(t, err) + defer provider.Shutdown(context.Background()) + + step, err := NewInstrumentedStep(stubStep{err: errors.New("boom")}, "test-step", "test-module") + require.NoError(t, err) + + stepCtx := &model.StepContext{ + Context: context.Background(), + Role: model.RoleBAP, + } + require.Error(t, step.Run(stepCtx)) +} diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go new file mode 100644 index 0000000..00f48b2 --- /dev/null +++ b/pkg/telemetry/telemetry.go @@ -0,0 +1,110 @@ +package telemetry + +import ( + "context" + "fmt" + "net/http" + + clientprom "github.com/prometheus/client_golang/prometheus" + clientpromhttp "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/contrib/instrumentation/runtime" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + otelprom "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + + "github.com/beckn-one/beckn-onix/pkg/log" +) + +// Config represents OpenTelemetry related configuration. +type Config struct { + ServiceName string `yaml:"serviceName"` + ServiceVersion string `yaml:"serviceVersion"` + EnableMetrics bool `yaml:"enableMetrics"` + Environment string `yaml:"environment"` +} + +// Provider holds references to telemetry components that need coordinated shutdown. +type Provider struct { + MeterProvider *metric.MeterProvider + MetricsHandler http.Handler + Shutdown func(context.Context) error +} + +// DefaultConfig returns sensible defaults for telemetry configuration. +func DefaultConfig() *Config { + return &Config{ + ServiceName: "beckn-onix", + ServiceVersion: "dev", + EnableMetrics: true, + Environment: "development", + } +} + +// NewProvider wires OpenTelemetry with a Prometheus exporter and exposes /metrics handler. +func NewProvider(ctx context.Context, cfg *Config) (*Provider, error) { + if cfg == nil { + cfg = DefaultConfig() + } + if cfg.ServiceName == "" { + cfg.ServiceName = DefaultConfig().ServiceName + } + if cfg.ServiceVersion == "" { + cfg.ServiceVersion = DefaultConfig().ServiceVersion + } + if cfg.Environment == "" { + cfg.Environment = DefaultConfig().Environment + } + + if !cfg.EnableMetrics { + log.Info(ctx, "OpenTelemetry metrics disabled") + return &Provider{ + Shutdown: func(context.Context) error { return nil }, + }, nil + } + + res, err := resource.New( + ctx, + resource.WithAttributes( + attribute.String("service.name", cfg.ServiceName), + attribute.String("service.version", cfg.ServiceVersion), + attribute.String("deployment.environment", cfg.Environment), + ), + ) + if err != nil { + return nil, fmt.Errorf("failed to create telemetry resource: %w", err) + } + + registry := clientprom.NewRegistry() + + exporter, err := otelprom.New( + otelprom.WithRegisterer(registry), + otelprom.WithoutUnits(), + otelprom.WithoutScopeInfo(), + ) + if err != nil { + return nil, fmt.Errorf("failed to create prometheus exporter: %w", err) + } + + meterProvider := metric.NewMeterProvider( + metric.WithReader(exporter), + metric.WithResource(res), + ) + + otel.SetMeterProvider(meterProvider) + log.Infof(ctx, "OpenTelemetry metrics initialized for service=%s version=%s env=%s", + cfg.ServiceName, cfg.ServiceVersion, cfg.Environment) + + if err := runtime.Start(runtime.WithMinimumReadMemStatsInterval(0)); err != nil { + log.Warnf(ctx, "Failed to start Go runtime instrumentation: %v", err) + } + + return &Provider{ + MeterProvider: meterProvider, + MetricsHandler: clientpromhttp.HandlerFor(registry, clientpromhttp.HandlerOpts{}), + Shutdown: func(ctx context.Context) error { + return meterProvider.Shutdown(ctx) + }, + }, nil +}