diff --git a/CONFIG.md b/CONFIG.md index d5d1a4f..e4b72d8 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -6,13 +6,14 @@ 3. [Top-Level Configuration](#top-level-configuration) 4. [HTTP Configuration](#http-configuration) 5. [Logging Configuration](#logging-configuration) -6. [Plugin Manager Configuration](#plugin-manager-configuration) -7. [Module Configuration](#module-configuration) -8. [Handler Configuration](#handler-configuration) -9. [Plugin Configuration](#plugin-configuration) -10. [Routing Configuration](#routing-configuration) -11. [Deployment Scenarios](#deployment-scenarios) -12. [Configuration Examples](#configuration-examples) +6. [Metrics Configuration](#metrics-configuration) +7. [Plugin Manager Configuration](#plugin-manager-configuration) +8. [Module Configuration](#module-configuration) +9. [Handler Configuration](#handler-configuration) +10. [Plugin Configuration](#plugin-configuration) +11. [Routing Configuration](#routing-configuration) +12. [Deployment Scenarios](#deployment-scenarios) +13. [Configuration Examples](#configuration-examples) --- @@ -70,6 +71,7 @@ The main configuration file follows this structure: ```yaml appName: "onix-local" log: {...} +metrics: {...} http: {...} pluginManager: {...} modules: [...] @@ -187,6 +189,120 @@ log: --- +## Application-Level Plugins Configuration + +### `plugins` +**Type**: `object` +**Required**: No +**Description**: Application-level plugin configurations. These plugins apply to the entire application and are shared across all modules. + +#### `plugins.otelsetup` +**Type**: `object` +**Required**: No +**Description**: OpenTelemetry configuration controlling whether the Prometheus exporter is enabled. + +**Important**: This block is optional—omit it to run without telemetry. When present, the `/metrics` endpoint is exposed on a separate port (configurable via `metricsPort`) only if `enableMetrics: true`. + +##### Parameters: + +###### `id` +**Type**: `string` +**Required**: Yes +**Description**: Plugin identifier. Must be `"otelsetup"`. + +###### `config` +**Type**: `object` +**Required**: Yes +**Description**: Plugin configuration parameters. + +###### `config.enableMetrics` +**Type**: `string` (boolean) +**Required**: No +**Default**: `"true"` +**Description**: Enables metrics collection and the `/metrics` endpoint. Must be `"true"` or `"false"` as a string. + +###### `config.serviceName` +**Type**: `string` +**Required**: No +**Default**: `"beckn-onix"` +**Description**: Sets the `service.name` resource attribute. + +###### `config.serviceVersion` +**Type**: `string` +**Required**: No +**Description**: Sets the `service.version` resource attribute. + +###### `config.environment` +**Type**: `string` +**Required**: No +**Default**: `"development"` +**Description**: Sets the `deployment.environment` attribute (e.g., `development`, `staging`, `production`). + +###### `config.metricsPort` +**Type**: `string` +**Required**: No +**Default**: `"9090"` +**Description**: Port on which the metrics HTTP server will listen. The metrics endpoint is hosted on a separate server from the main application. + +**Example - Enable Metrics** (matches `config/local-simple.yaml`): +```yaml +plugins: + otelsetup: + id: otelsetup + config: + serviceName: "beckn-onix" + serviceVersion: "1.0.0" + enableMetrics: "true" + environment: "development" + metricsPort: "9090" +``` + +### Accessing Metrics + +When `plugins.otelsetup.config.enableMetrics: "true"`, the metrics endpoint is hosted on a separate HTTP server. Scrape metrics at: + +``` +http://your-server:9090/metrics +``` + +**Note**: The metrics server runs on the port specified by `config.metricsPort` (default: `9090`), which is separate from the main application port configured in `http.port`. + +### Metrics Collected + +Metrics are organized by module for better maintainability and encapsulation: + +#### OTel Setup (from `otelsetup` plugin) +- Prometheus exporter & `/metrics` endpoint on separate HTTP server +- Go runtime instrumentation (`go_*`), resource attributes, and meter provider wiring + +#### Step Execution Metrics (from `telemetry` package) +- `onix_step_executions_total`, `onix_step_execution_duration_seconds`, `onix_step_errors_total` + +#### Handler Metrics (from `handler` module) +- `beckn_signature_validations_total` - Signature validation attempts +- `beckn_schema_validations_total` - Schema validation attempts +- `onix_routing_decisions_total` - Routing decisions taken by handler + +#### Cache Metrics (from `cache` plugin) +- `onix_cache_operations_total`, `onix_cache_hits_total`, `onix_cache_misses_total` + +#### Plugin Metrics (from `telemetry` package) +- `onix_plugin_execution_duration_seconds`, `onix_plugin_errors_total` + +#### Runtime Metrics +- Go runtime metrics (`go_*`) and Redis instrumentation via `redisotel` + +Each metric includes consistent labels such as `module`, `role`, `action`, `status`, `step`, `plugin_id`, and `schema_version` to enable low-cardinality dashboards. + +**Note**: Metric definitions are now located in their respective modules: +- OTel setup: `pkg/plugin/implementation/otelsetup` +- Step metrics: `core/module/handler/step_metrics.go` +- Handler metrics: `core/module/handler/handlerMetrics.go` +- Cache metrics: `pkg/plugin/implementation/cache/cache_metrics.go` +- Plugin metrics: `pkg/telemetry/pluginMetrics.go` + +--- + ## Plugin Manager Configuration ### `pluginManager` @@ -1045,6 +1161,7 @@ routingRules: - Embedded Ed25519 keys - Local Redis - Simplified routing +- Optional metrics collection (available on separate port when enabled) **Use Case**: Quick local development and testing @@ -1052,6 +1169,10 @@ routingRules: appName: "onix-local" log: level: debug +metrics: + enabled: true + exporterType: prometheus + serviceName: onix-local http: port: 8081 modules: @@ -1063,6 +1184,8 @@ modules: config: {} ``` +**Metrics Access**: When enabled, access metrics at `http://localhost:9090/metrics` (default metrics port, configurable via `plugins.otelsetup.config.metricsPort`) + ### 2. Local Development (Vault Mode) **File**: `config/local-dev.yaml` @@ -1096,10 +1219,21 @@ modules: - Production Redis - Remote plugin loading - Pub/Sub integration +- OpenTelemetry metrics enabled (available on separate port, default: 9090) **Use Case**: Single deployment serving both roles ```yaml +appName: "onix-production" +log: + level: info + destinations: + - type: stdout +metrics: + enabled: true + exporterType: prometheus + serviceName: beckn-onix + serviceVersion: "1.0.0" pluginManager: root: /app/plugins remoteRoot: /mnt/gcs/plugins/plugins_bundle.zip @@ -1122,6 +1256,9 @@ modules: topic: bapNetworkReciever ``` +**Metrics Access**: +- Prometheus scraping: `http://your-server:9090/metrics` (default metrics port, configurable via `plugins.otelsetup.config.metricsPort`) + ### 4. Production BAP-Only Mode **File**: `config/onix-bap/adapter.yaml` diff --git a/README.md b/README.md index cbe0684..21f05a6 100644 --- a/README.md +++ b/README.md @@ -64,9 +64,45 @@ The **Beckn Protocol** is an open protocol that enables location-aware, local co ### 📊 **Observability** - **Structured Logging**: JSON-formatted logs with contextual information - **Transaction Tracking**: End-to-end request tracing with unique IDs -- **Metrics Support**: Performance and business metrics collection +- **OpenTelemetry Metrics**: Pull-based metrics exposed via `/metrics` + - RED metrics for every module and action (rate, errors, duration) + - Per-step histograms with error attribution + - Cache, routing, plugin, and business KPIs (signature/schema validations, Beckn messages) + - Native Prometheus exporter with Grafana dashboards & alert rules (`monitoring/`) + - Opt-in: add a `plugins.otelsetup` block in your config to wire the `otelsetup` plugin; omit it to run without metrics. Example: + + ```yaml + plugins: + otelsetup: + id: otelsetup + config: + serviceName: "beckn-onix" + serviceVersion: "1.0.0" + enableMetrics: "true" + environment: "development" + ``` + - **Modular Metrics Architecture**: Metrics are organized by module for better maintainability: + - OTel SDK wiring via `otelsetup` plugin + - Step execution metrics in `telemetry` package + - Handler metrics (signature, schema, routing) in `handler` module + - Cache metrics in `cache` plugin +- **Runtime Instrumentation**: Go runtime + Redis client metrics baked in - **Health Checks**: Liveness and readiness probes for Kubernetes +#### Monitoring Quick Start +```bash +./install/build-plugins.sh +go build -o beckn-adapter ./cmd/adapter +./beckn-adapter --config=config/local-simple.yaml +cd monitoring && docker-compose -f docker-compose-monitoring.yml up -d +open http://localhost:3000 # Grafana (admin/admin) +``` +Resources: +- `monitoring/prometheus.yml` – scrape config +- `monitoring/prometheus-alerts.yml` – alert rules (RED, cache, step, plugin) +- `monitoring/grafana/dashboards/beckn-onix-overview.json` – curated dashboard +- `docs/METRICS_RUNBOOK.md` – runbook with PromQL recipes & troubleshooting + ### 🌐 **Multi-Domain Support** - **Retail & E-commerce**: Product search, order management, fulfillment tracking - **Mobility Services**: Ride-hailing, public transport, vehicle rentals @@ -356,6 +392,15 @@ modules: | POST | `/bpp/receiver/*` | Receives all BAP requests | | POST | `/bpp/caller/on_*` | Sends responses back to BAP | +### Observability Endpoints + +| Method | Endpoint | Description | +|--------|----------|-------------| +| GET | `/health` | Health check endpoint | +| GET | `/metrics` | Prometheus metrics endpoint (when telemetry is enabled) | + +**Note**: The `/metrics` endpoint is available when `telemetry.enableMetrics: true` in the configuration file. It returns metrics in Prometheus format. + ## Documentation - **[Setup Guide](SETUP.md)**: Complete installation, configuration, and deployment instructions diff --git a/cmd/adapter/main.go b/cmd/adapter/main.go index 5962314..2d1d8c7 100644 --- a/cmd/adapter/main.go +++ b/cmd/adapter/main.go @@ -17,12 +17,19 @@ import ( "github.com/beckn-one/beckn-onix/core/module/handler" "github.com/beckn-one/beckn-onix/pkg/log" "github.com/beckn-one/beckn-onix/pkg/plugin" + "github.com/beckn-one/beckn-onix/pkg/telemetry" ) +// ApplicationPlugins holds application-level plugin configurations. +type ApplicationPlugins struct { + OtelSetup *plugin.Config `yaml:"otelsetup,omitempty"` +} + // Config struct holds all configurations. type Config struct { AppName string `yaml:"appName"` Log log.Config `yaml:"log"` + Plugins ApplicationPlugins `yaml:"plugins,omitempty"` PluginManager *plugin.ManagerConfig `yaml:"pluginManager"` Modules []module.Config `yaml:"modules"` HTTP httpConfig `yaml:"http"` @@ -91,11 +98,39 @@ func validateConfig(cfg *Config) error { return nil } +// loadAppPlugin is a generic function to load and validate application-level plugins. +func loadAppPlugin[T any](ctx context.Context, name string, cfg *plugin.Config, mgrFunc func(context.Context, *plugin.Config) (T, error)) error { + if cfg == nil { + log.Debugf(ctx, "Skipping %s plugin: not configured", name) + return nil + } + + _, err := mgrFunc(ctx, cfg) + if err != nil { + return fmt.Errorf("failed to load %s plugin (%s): %w", name, cfg.ID, err) + } + + log.Debugf(ctx, "Loaded %s plugin: %s", name, cfg.ID) + return nil +} + +// initAppPlugins initializes application-level plugins including telemetry. +// This function is designed to be extensible for future plugin types. +func initAppPlugins(ctx context.Context, mgr *plugin.Manager, cfg ApplicationPlugins) error { + if err := loadAppPlugin(ctx, "OtelSetup", cfg.OtelSetup, func(ctx context.Context, cfg *plugin.Config) (*telemetry.Provider, error) { + return mgr.OtelSetup(ctx, cfg) + }); err != nil { + return fmt.Errorf("failed to initialize application plugins: %w", err) + } + + return nil +} + // newServer creates and initializes the HTTP server. func newServer(ctx context.Context, mgr handler.PluginManager, cfg *Config) (http.Handler, error) { mux := http.NewServeMux() - err := module.Register(ctx, cfg.Modules, mux, mgr) - if err != nil { + + if err := module.Register(ctx, cfg.Modules, mux, mgr); err != nil { return nil, fmt.Errorf("failed to register modules: %w", err) } return mux, nil @@ -126,6 +161,11 @@ func run(ctx context.Context, configPath string) error { closers = append(closers, closer) log.Debug(ctx, "Plugin manager loaded.") + // Initialize plugins including telemetry. + if err := initAppPlugins(ctx, mgr, cfg.Plugins); err != nil { + return fmt.Errorf("failed to initialize plugins: %w", err) + } + // Initialize HTTP server. log.Infof(ctx, "Initializing HTTP server") srv, err := newServerFunc(ctx, mgr, cfg) diff --git a/cmd/adapter/main_test.go b/cmd/adapter/main_test.go index dff0447..4961a3a 100644 --- a/cmd/adapter/main_test.go +++ b/cmd/adapter/main_test.go @@ -73,6 +73,11 @@ func (m *MockPluginManager) KeyManager(ctx context.Context, cache definition.Cac return nil, nil } +// TransportWrapper returns a mock implementation of the TransportWrapper interface. +func (m *MockPluginManager) TransportWrapper(ctx context.Context, cfg *plugin.Config) (definition.TransportWrapper, error) { + return nil, nil +} + // SchemaValidator returns a mock implementation of the SchemaValidator interface. func (m *MockPluginManager) SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error) { return nil, nil @@ -170,14 +175,19 @@ func TestRunFailure(t *testing.T) { // Mock dependencies originalNewManager := newManagerFunc - // newManagerFunc = func(ctx context.Context, cfg *plugin.ManagerConfig) (*plugin.Manager, func(), error) { - // return tt.mockMgr() - // } - newManagerFunc = nil + // Ensure newManagerFunc is never nil to avoid panic if invoked. + newManagerFunc = func(ctx context.Context, cfg *plugin.ManagerConfig) (*plugin.Manager, func(), error) { + _, closer, err := tt.mockMgr() + if err != nil { + return nil, closer, err + } + // Return a deterministic error so the code path exits cleanly if reached. + return nil, closer, errors.New("mock manager error") + } defer func() { newManagerFunc = originalNewManager }() - originalNewServer := newServerFunc - newServerFunc = func(ctx context.Context, mgr handler.PluginManager, cfg *Config) (http.Handler, error) { + originalNewServer := newServerFunc + newServerFunc = func(ctx context.Context, mgr handler.PluginManager, cfg *Config) (http.Handler, error) { return tt.mockServer(ctx, mgr, cfg) } defer func() { newServerFunc = originalNewServer }() diff --git a/cmd/adapter/metrics_integration_test.go b/cmd/adapter/metrics_integration_test.go new file mode 100644 index 0000000..ea03195 --- /dev/null +++ b/cmd/adapter/metrics_integration_test.go @@ -0,0 +1,25 @@ +package main + +import ( + "context" + "testing" + + "github.com/beckn-one/beckn-onix/pkg/plugin/implementation/otelsetup" + "github.com/stretchr/testify/require" +) + +func TestMetricsEndpointExposesPrometheus(t *testing.T) { + ctx := context.Background() + setup := otelsetup.Setup{} + provider, err := setup.New(ctx, &otelsetup.Config{ + ServiceName: "test-onix", + ServiceVersion: "1.0.0", + EnableMetrics: true, + Environment: "test", + }) + require.NoError(t, err) + defer provider.Shutdown(context.Background()) + + // Metrics are served by the plugin’s own HTTP server; just ensure provider is initialized. + require.NotNil(t, provider.MeterProvider) +} diff --git a/config/local-simple.yaml b/config/local-simple.yaml index 548f4c5..b8127e0 100644 --- a/config/local-simple.yaml +++ b/config/local-simple.yaml @@ -8,6 +8,14 @@ log: - message_id - subscriber_id - module_id +plugins: + otelsetup: + id: otelsetup + config: + serviceName: "beckn-onix" + serviceVersion: "1.0.0" + enableMetrics: "true" + environment: "development" http: port: 8081 timeout: diff --git a/core/module/handler/handlerMetrics.go b/core/module/handler/handlerMetrics.go new file mode 100644 index 0000000..ccc5932 --- /dev/null +++ b/core/module/handler/handlerMetrics.go @@ -0,0 +1,68 @@ +package handler + +import ( + "context" + "fmt" + "sync" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" +) + +// HandlerMetrics exposes handler-related metric instruments. +type HandlerMetrics struct { + SignatureValidationsTotal metric.Int64Counter + SchemaValidationsTotal metric.Int64Counter + RoutingDecisionsTotal metric.Int64Counter +} + +var ( + handlerMetricsInstance *HandlerMetrics + handlerMetricsOnce sync.Once + handlerMetricsErr error +) + +// GetHandlerMetrics lazily initializes handler metric instruments and returns a cached reference. +func GetHandlerMetrics(ctx context.Context) (*HandlerMetrics, error) { + handlerMetricsOnce.Do(func() { + handlerMetricsInstance, handlerMetricsErr = newHandlerMetrics() + }) + return handlerMetricsInstance, handlerMetricsErr +} + +func newHandlerMetrics() (*HandlerMetrics, error) { + meter := otel.GetMeterProvider().Meter( + "github.com/beckn-one/beckn-onix/handler", + metric.WithInstrumentationVersion("1.0.0"), + ) + + m := &HandlerMetrics{} + var err error + + if m.SignatureValidationsTotal, err = meter.Int64Counter( + "beckn_signature_validations_total", + metric.WithDescription("Signature validation attempts"), + metric.WithUnit("{validation}"), + ); err != nil { + return nil, fmt.Errorf("beckn_signature_validations_total: %w", err) + } + + if m.SchemaValidationsTotal, err = meter.Int64Counter( + "beckn_schema_validations_total", + metric.WithDescription("Schema validation attempts"), + metric.WithUnit("{validation}"), + ); err != nil { + return nil, fmt.Errorf("beckn_schema_validations_total: %w", err) + } + + if m.RoutingDecisionsTotal, err = meter.Int64Counter( + "onix_routing_decisions_total", + metric.WithDescription("Routing decisions taken by handler"), + metric.WithUnit("{decision}"), + ); err != nil { + return nil, fmt.Errorf("onix_routing_decisions_total: %w", err) + } + + return m, nil +} + diff --git a/core/module/handler/stdHandler.go b/core/module/handler/stdHandler.go index 9af7044..9e9fefc 100644 --- a/core/module/handler/stdHandler.go +++ b/core/module/handler/stdHandler.go @@ -30,6 +30,7 @@ type stdHandler struct { SubscriberID string role model.Role httpClient *http.Client + moduleName string } // newHTTPClient creates a new HTTP client with a custom transport configuration. @@ -51,6 +52,7 @@ func newHTTPClient(cfg *HttpClientConfig, wrapper definition.TransportWrapper) * if cfg.ResponseHeaderTimeout > 0 { transport.ResponseHeaderTimeout = cfg.ResponseHeaderTimeout } + var finalTransport http.RoundTripper = transport if wrapper != nil { log.Debugf(context.Background(), "Applying custom transport wrapper") @@ -60,11 +62,12 @@ func newHTTPClient(cfg *HttpClientConfig, wrapper definition.TransportWrapper) * } // NewStdHandler initializes a new processor with plugins and steps. -func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Handler, error) { +func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config, moduleName string) (http.Handler, error) { h := &stdHandler{ steps: []definition.Step{}, SubscriberID: cfg.SubscriberID, role: cfg.Role, + moduleName: moduleName, } // Initialize plugins. if err := h.initPlugins(ctx, mgr, &cfg.Plugins); err != nil { @@ -81,6 +84,16 @@ func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Ha // ServeHTTP processes an incoming HTTP request and executes defined processing steps. func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + r.Header.Set("X-Module-Name", h.moduleName) + r.Header.Set("X-Role", string(h.role)) + + // These headers are only needed for internal instrumentation; avoid leaking them downstream. + // Use defer to ensure cleanup regardless of return path. + defer func() { + r.Header.Del("X-Module-Name") + r.Header.Del("X-Role") + }() + ctx, err := h.stepCtx(r, w.Header()) if err != nil { log.Errorf(r.Context(), err, "stepCtx(r):%v", err) @@ -297,7 +310,13 @@ func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Conf if err != nil { return err } - h.steps = append(h.steps, s) + instrumentedStep, wrapErr := NewInstrumentedStep(s, step, h.moduleName) + if wrapErr != nil { + log.Warnf(ctx, "Failed to instrument step %s: %v", step, wrapErr) + h.steps = append(h.steps, s) + continue + } + h.steps = append(h.steps, instrumentedStep) } log.Infof(ctx, "Processor steps initialized: %v", cfg.Steps) return nil diff --git a/core/module/handler/step.go b/core/module/handler/step.go index 2464daf..f985031 100644 --- a/core/module/handler/step.go +++ b/core/module/handler/step.go @@ -2,13 +2,17 @@ package handler import ( "context" + "encoding/json" "fmt" "strings" "time" + "go.opentelemetry.io/otel/metric" + "github.com/beckn-one/beckn-onix/pkg/log" "github.com/beckn-one/beckn-onix/pkg/model" "github.com/beckn-one/beckn-onix/pkg/plugin/definition" + "github.com/beckn-one/beckn-onix/pkg/telemetry" ) // signStep represents the signing step in the processing pipeline. @@ -68,6 +72,7 @@ func (s *signStep) generateAuthHeader(subID, keyID string, createdAt, validTill type validateSignStep struct { validator definition.SignValidator km definition.KeyManager + metrics *HandlerMetrics } // newValidateSignStep initializes and returns a new validate sign step. @@ -78,11 +83,22 @@ func newValidateSignStep(signValidator definition.SignValidator, km definition.K if km == nil { return nil, fmt.Errorf("invalid config: KeyManager plugin not configured") } - return &validateSignStep{validator: signValidator, km: km}, nil + metrics, _ := GetHandlerMetrics(context.Background()) + return &validateSignStep{ + validator: signValidator, + km: km, + metrics: metrics, + }, nil } // Run executes the validation step. func (s *validateSignStep) Run(ctx *model.StepContext) error { + err := s.validateHeaders(ctx) + s.recordMetrics(ctx, err) + return err +} + +func (s *validateSignStep) validateHeaders(ctx *model.StepContext) error { unauthHeader := fmt.Sprintf("Signature realm=\"%s\",headers=\"(created) (expires) digest\"", ctx.SubID) headerValue := ctx.Request.Header.Get(model.AuthHeaderGateway) if len(headerValue) != 0 { @@ -123,6 +139,18 @@ func (s *validateSignStep) validate(ctx *model.StepContext, value string) error return nil } +func (s *validateSignStep) recordMetrics(ctx *model.StepContext, err error) { + if s.metrics == nil { + return + } + status := "success" + if err != nil { + status = "failed" + } + s.metrics.SignatureValidationsTotal.Add(ctx.Context, 1, + metric.WithAttributes(telemetry.AttrStatus.String(status))) +} + // ParsedKeyID holds the components from the parsed Authorization header's keyId. type authHeader struct { SubscriberID string @@ -165,6 +193,7 @@ func parseHeader(header string) (*authHeader, error) { // validateSchemaStep represents the schema validation step. type validateSchemaStep struct { validator definition.SchemaValidator + metrics *HandlerMetrics } // newValidateSchemaStep creates and returns the validateSchema step after validation. @@ -173,20 +202,43 @@ func newValidateSchemaStep(schemaValidator definition.SchemaValidator) (definiti return nil, fmt.Errorf("invalid config: SchemaValidator plugin not configured") } log.Debug(context.Background(), "adding schema validator") - return &validateSchemaStep{validator: schemaValidator}, nil + metrics, _ := GetHandlerMetrics(context.Background()) + return &validateSchemaStep{ + validator: schemaValidator, + metrics: metrics, + }, nil } // Run executes the schema validation step. func (s *validateSchemaStep) Run(ctx *model.StepContext) error { - if err := s.validator.Validate(ctx, ctx.Request.URL, ctx.Body); err != nil { - return fmt.Errorf("schema validation failed: %w", err) + err := s.validator.Validate(ctx, ctx.Request.URL, ctx.Body) + if err != nil { + err = fmt.Errorf("schema validation failed: %w", err) } - return nil + s.recordMetrics(ctx, err) + return err +} + +func (s *validateSchemaStep) recordMetrics(ctx *model.StepContext, err error) { + if s.metrics == nil { + return + } + status := "success" + if err != nil { + status = "failed" + } + version := extractSchemaVersion(ctx.Body) + s.metrics.SchemaValidationsTotal.Add(ctx.Context, 1, + metric.WithAttributes( + telemetry.AttrSchemaVersion.String(version), + telemetry.AttrStatus.String(status), + )) } // addRouteStep represents the route determination step. type addRouteStep struct { - router definition.Router + router definition.Router + metrics *HandlerMetrics } // newAddRouteStep creates and returns the addRoute step after validation. @@ -194,7 +246,11 @@ func newAddRouteStep(router definition.Router) (definition.Step, error) { if router == nil { return nil, fmt.Errorf("invalid config: Router plugin not configured") } - return &addRouteStep{router: router}, nil + metrics, _ := GetHandlerMetrics(context.Background()) + return &addRouteStep{ + router: router, + metrics: metrics, + }, nil } // Run executes the routing step. @@ -208,5 +264,26 @@ func (s *addRouteStep) Run(ctx *model.StepContext) error { PublisherID: route.PublisherID, URL: route.URL, } + if s.metrics != nil && ctx.Route != nil { + s.metrics.RoutingDecisionsTotal.Add(ctx.Context, 1, + metric.WithAttributes( + telemetry.AttrTargetType.String(ctx.Route.TargetType), + )) + } return nil -} \ No newline at end of file +} + +func extractSchemaVersion(body []byte) string { + type contextEnvelope struct { + Context struct { + Version string `json:"version"` + } `json:"context"` + } + var payload contextEnvelope + if err := json.Unmarshal(body, &payload); err == nil { + if payload.Context.Version != "" { + return payload.Context.Version + } + } + return "unknown" +} diff --git a/core/module/handler/step_instrumentor.go b/core/module/handler/step_instrumentor.go new file mode 100644 index 0000000..0869304 --- /dev/null +++ b/core/module/handler/step_instrumentor.go @@ -0,0 +1,84 @@ +package handler + +import ( + "context" + "errors" + "fmt" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/beckn-one/beckn-onix/pkg/model" + "github.com/beckn-one/beckn-onix/pkg/telemetry" +) + +// StepRunner represents the minimal contract required for step instrumentation. +type StepRunner interface { + Run(*model.StepContext) error +} + +// InstrumentedStep wraps a processing step with telemetry instrumentation. +type InstrumentedStep struct { + step StepRunner + stepName string + moduleName string + metrics *StepMetrics +} + +// NewInstrumentedStep returns a telemetry enabled wrapper around a definition.Step. +func NewInstrumentedStep(step StepRunner, stepName, moduleName string) (*InstrumentedStep, error) { + metrics, err := GetStepMetrics(context.Background()) + if err != nil { + return nil, err + } + + return &InstrumentedStep{ + step: step, + stepName: stepName, + moduleName: moduleName, + metrics: metrics, + }, nil +} + +type becknError interface { + BecknError() *model.Error +} + +// Run executes the underlying step and records RED style metrics. +func (is *InstrumentedStep) Run(ctx *model.StepContext) error { + if is.metrics == nil { + return is.step.Run(ctx) + } + + start := time.Now() + err := is.step.Run(ctx) + duration := time.Since(start).Seconds() + + attrs := []attribute.KeyValue{ + telemetry.AttrModule.String(is.moduleName), + telemetry.AttrStep.String(is.stepName), + telemetry.AttrRole.String(string(ctx.Role)), + } + + is.metrics.StepExecutionTotal.Add(ctx.Context, 1, metric.WithAttributes(attrs...)) + is.metrics.StepExecutionDuration.Record(ctx.Context, duration, metric.WithAttributes(attrs...)) + + if err != nil { + errorType := fmt.Sprintf("%T", err) + var becknErr becknError + if errors.As(err, &becknErr) { + if be := becknErr.BecknError(); be != nil && be.Code != "" { + errorType = be.Code + } + } + + errorAttrs := append(attrs, telemetry.AttrErrorType.String(errorType)) + is.metrics.StepErrorsTotal.Add(ctx.Context, 1, metric.WithAttributes(errorAttrs...)) + log.Errorf(ctx.Context, err, "Step %s failed", is.stepName) + } + + return err +} + diff --git a/core/module/handler/step_instrumentor_test.go b/core/module/handler/step_instrumentor_test.go new file mode 100644 index 0000000..281aa7e --- /dev/null +++ b/core/module/handler/step_instrumentor_test.go @@ -0,0 +1,52 @@ +package handler + +import ( + "context" + "errors" + "testing" + + "github.com/beckn-one/beckn-onix/pkg/model" + "github.com/beckn-one/beckn-onix/pkg/telemetry" + "github.com/stretchr/testify/require" +) + +type stubStep struct { + err error +} + +func (s stubStep) Run(ctx *model.StepContext) error { + return s.err +} + +func TestInstrumentedStepSuccess(t *testing.T) { + ctx := context.Background() + provider, err := telemetry.NewTestProvider(ctx) + require.NoError(t, err) + defer provider.Shutdown(context.Background()) + + step, err := NewInstrumentedStep(stubStep{}, "test-step", "test-module") + require.NoError(t, err) + + stepCtx := &model.StepContext{ + Context: context.Background(), + Role: model.RoleBAP, + } + require.NoError(t, step.Run(stepCtx)) +} + +func TestInstrumentedStepError(t *testing.T) { + ctx := context.Background() + provider, err := telemetry.NewTestProvider(ctx) + require.NoError(t, err) + defer provider.Shutdown(context.Background()) + + step, err := NewInstrumentedStep(stubStep{err: errors.New("boom")}, "test-step", "test-module") + require.NoError(t, err) + + stepCtx := &model.StepContext{ + Context: context.Background(), + Role: model.RoleBAP, + } + require.Error(t, step.Run(stepCtx)) +} + diff --git a/core/module/handler/step_metrics.go b/core/module/handler/step_metrics.go new file mode 100644 index 0000000..e3fc418 --- /dev/null +++ b/core/module/handler/step_metrics.go @@ -0,0 +1,69 @@ +package handler + +import ( + "context" + "fmt" + "sync" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" +) + +// StepMetrics exposes step execution metric instruments. +type StepMetrics struct { + StepExecutionDuration metric.Float64Histogram + StepExecutionTotal metric.Int64Counter + StepErrorsTotal metric.Int64Counter +} + +var ( + stepMetricsInstance *StepMetrics + stepMetricsOnce sync.Once + stepMetricsErr error +) + +// GetStepMetrics lazily initializes step metric instruments and returns a cached reference. +func GetStepMetrics(ctx context.Context) (*StepMetrics, error) { + stepMetricsOnce.Do(func() { + stepMetricsInstance, stepMetricsErr = newStepMetrics() + }) + return stepMetricsInstance, stepMetricsErr +} + +func newStepMetrics() (*StepMetrics, error) { + meter := otel.GetMeterProvider().Meter( + "github.com/beckn-one/beckn-onix/telemetry", + metric.WithInstrumentationVersion("1.0.0"), + ) + + m := &StepMetrics{} + var err error + + if m.StepExecutionDuration, err = meter.Float64Histogram( + "onix_step_execution_duration_seconds", + metric.WithDescription("Duration of individual processing steps"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5), + ); err != nil { + return nil, fmt.Errorf("onix_step_execution_duration_seconds: %w", err) + } + + if m.StepExecutionTotal, err = meter.Int64Counter( + "onix_step_executions_total", + metric.WithDescription("Total processing step executions"), + metric.WithUnit("{execution}"), + ); err != nil { + return nil, fmt.Errorf("onix_step_executions_total: %w", err) + } + + if m.StepErrorsTotal, err = meter.Int64Counter( + "onix_step_errors_total", + metric.WithDescription("Processing step errors"), + metric.WithUnit("{error}"), + ); err != nil { + return nil, fmt.Errorf("onix_step_errors_total: %w", err) + } + + return m, nil +} + diff --git a/core/module/handler/step_metrics_test.go b/core/module/handler/step_metrics_test.go new file mode 100644 index 0000000..777821b --- /dev/null +++ b/core/module/handler/step_metrics_test.go @@ -0,0 +1,131 @@ +package handler + +import ( + "context" + "net/http/httptest" + "testing" + + "go.opentelemetry.io/otel/metric" + + "github.com/beckn-one/beckn-onix/pkg/telemetry" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGetStepMetrics_Success(t *testing.T) { + ctx := context.Background() + + // Initialize telemetry provider first + provider, err := telemetry.NewTestProvider(ctx) + require.NoError(t, err) + defer provider.Shutdown(context.Background()) + + // Test getting step metrics + metrics, err := GetStepMetrics(ctx) + require.NoError(t, err, "GetStepMetrics() should not return error") + require.NotNil(t, metrics, "GetStepMetrics() should return non-nil metrics") + + // Verify all metric instruments are initialized + assert.NotNil(t, metrics.StepExecutionDuration, "StepExecutionDuration should be initialized") + assert.NotNil(t, metrics.StepExecutionTotal, "StepExecutionTotal should be initialized") + assert.NotNil(t, metrics.StepErrorsTotal, "StepErrorsTotal should be initialized") +} + +func TestGetStepMetrics_ConcurrentAccess(t *testing.T) { + ctx := context.Background() + + // Initialize telemetry provider first + provider, err := telemetry.NewTestProvider(ctx) + require.NoError(t, err) + defer provider.Shutdown(context.Background()) + + // Test that GetStepMetrics is safe for concurrent access + // and returns the same instance (singleton pattern) + metrics1, err1 := GetStepMetrics(ctx) + require.NoError(t, err1) + require.NotNil(t, metrics1) + + metrics2, err2 := GetStepMetrics(ctx) + require.NoError(t, err2) + require.NotNil(t, metrics2) + + // Should return the same instance + assert.Equal(t, metrics1, metrics2, "GetStepMetrics should return the same instance") +} + +func TestGetStepMetrics_WithoutProvider(t *testing.T) { + ctx := context.Background() + + // Test getting step metrics without initializing provider + // This should still work but may not have a valid meter provider + metrics, err := GetStepMetrics(ctx) + // Note: This might succeed or fail depending on OTel's default behavior + // We're just checking it doesn't panic + if err != nil { + t.Logf("GetStepMetrics returned error (expected if no provider): %v", err) + } else { + assert.NotNil(t, metrics, "Metrics should be returned even without explicit provider") + } +} + +func TestStepMetrics_Instruments(t *testing.T) { + ctx := context.Background() + + // Initialize telemetry provider + provider, err := telemetry.NewTestProvider(ctx) + require.NoError(t, err) + defer provider.Shutdown(context.Background()) + + // Get step metrics + metrics, err := GetStepMetrics(ctx) + require.NoError(t, err) + require.NotNil(t, metrics) + + // Test that we can record metrics (this tests the instruments are functional) + // Note: We can't easily verify the metrics were recorded without querying the exporter, + // but we can verify the instruments are not nil and can be called without panicking + + // Test StepExecutionDuration + require.NotPanics(t, func() { + metrics.StepExecutionDuration.Record(ctx, 0.5, + metric.WithAttributes(telemetry.AttrStep.String("test-step"), telemetry.AttrModule.String("test-module"))) + }, "StepExecutionDuration.Record should not panic") + + // Test StepExecutionTotal + require.NotPanics(t, func() { + metrics.StepExecutionTotal.Add(ctx, 1, + metric.WithAttributes(telemetry.AttrStep.String("test-step"), telemetry.AttrModule.String("test-module"))) + }, "StepExecutionTotal.Add should not panic") + + // Test StepErrorsTotal + require.NotPanics(t, func() { + metrics.StepErrorsTotal.Add(ctx, 1, + metric.WithAttributes(telemetry.AttrStep.String("test-step"), telemetry.AttrModule.String("test-module"))) + }, "StepErrorsTotal.Add should not panic") + + // Verify metrics are exposed via HTTP handler + rec := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/metrics", nil) + provider.MetricsHandler.ServeHTTP(rec, req) + assert.Equal(t, 200, rec.Code, "Metrics endpoint should return 200") +} + +func TestStepMetrics_MultipleCalls(t *testing.T) { + ctx := context.Background() + + // Initialize telemetry provider + provider, err := telemetry.NewTestProvider(ctx) + require.NoError(t, err) + defer provider.Shutdown(context.Background()) + + // Call GetStepMetrics multiple times + for i := 0; i < 10; i++ { + metrics, err := GetStepMetrics(ctx) + require.NoError(t, err, "GetStepMetrics should succeed on call %d", i) + require.NotNil(t, metrics, "GetStepMetrics should return non-nil on call %d", i) + assert.NotNil(t, metrics.StepExecutionDuration, "StepExecutionDuration should be initialized") + assert.NotNil(t, metrics.StepExecutionTotal, "StepExecutionTotal should be initialized") + assert.NotNil(t, metrics.StepErrorsTotal, "StepErrorsTotal should be initialized") + } +} + diff --git a/core/module/module.go b/core/module/module.go index a15030f..f41be23 100644 --- a/core/module/module.go +++ b/core/module/module.go @@ -18,7 +18,7 @@ type Config struct { } // Provider represents a function that initializes an HTTP handler using a PluginManager. -type Provider func(ctx context.Context, mgr handler.PluginManager, cfg *handler.Config) (http.Handler, error) +type Provider func(ctx context.Context, mgr handler.PluginManager, cfg *handler.Config, moduleName string) (http.Handler, error) // handlerProviders maintains a mapping of handler types to their respective providers. var handlerProviders = map[handler.Type]Provider{ @@ -29,6 +29,7 @@ var handlerProviders = map[handler.Type]Provider{ // It iterates over the module configurations, retrieves appropriate handler providers, // and registers the handlers with the HTTP multiplexer. func Register(ctx context.Context, mCfgs []Config, mux *http.ServeMux, mgr handler.PluginManager) error { + mux.Handle("/health", http.HandlerFunc(handler.HealthHandler)) log.Debugf(ctx, "Registering modules with config: %#v", mCfgs) @@ -38,7 +39,7 @@ func Register(ctx context.Context, mCfgs []Config, mux *http.ServeMux, mgr handl if !ok { return fmt.Errorf("invalid module : %s", c.Name) } - h, err := rmp(ctx, mgr, &c.Handler) + h, err := rmp(ctx, mgr, &c.Handler, c.Name) if err != nil { return fmt.Errorf("%s : %w", c.Name, err) } @@ -81,4 +82,4 @@ func moduleCtxMiddleware(moduleName string, next http.Handler) http.Handler { ctx := context.WithValue(r.Context(), model.ContextKeyModuleID, moduleName) next.ServeHTTP(w, r.WithContext(ctx)) }) -} \ No newline at end of file +} diff --git a/core/module/module_test.go b/core/module/module_test.go index 0c810ae..f1a0caa 100644 --- a/core/module/module_test.go +++ b/core/module/module_test.go @@ -128,6 +128,7 @@ func TestRegisterSuccess(t *testing.T) { if capturedModuleName != "test-module" { t.Errorf("expected module_id in context to be 'test-module', got %v", capturedModuleName) } + // Verifying /health endpoint registration reqHealth := httptest.NewRequest(http.MethodGet, "/health", nil) recHealth := httptest.NewRecorder() diff --git a/go.mod b/go.mod index d85ebc5..090e5f1 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,11 @@ module github.com/beckn-one/beckn-onix go 1.24.0 require ( - github.com/rogpeppe/go-internal v1.13.1 // indirect github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 golang.org/x/crypto v0.36.0 ) -require github.com/stretchr/testify v1.10.0 +require github.com/stretchr/testify v1.11.1 require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -23,13 +22,15 @@ require github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03 require golang.org/x/text v0.26.0 // indirect require ( + github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/go-jose/go-jose/v4 v4.0.1 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/swag v0.23.0 // indirect - github.com/google/go-cmp v0.6.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-rootcerts v1.0.2 // indirect @@ -41,17 +42,25 @@ require ( github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect github.com/oasdiff/yaml v0.0.0-20250309154309-f31be36b4037 // indirect github.com/oasdiff/yaml3 v0.0.0-20250309153720-d2182401db90 // indirect github.com/perimeterx/marshmallow v1.1.5 // indirect + github.com/prometheus/client_model v0.6.0 // indirect + github.com/prometheus/common v0.45.0 // indirect + github.com/prometheus/procfs v0.12.0 // indirect + github.com/redis/go-redis/extra/rediscmd/v9 v9.16.0 // indirect github.com/ryanuber/go-glob v1.0.0 // indirect github.com/woodsbury/decimal128 v1.3.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/trace v1.38.0 // indirect golang.org/x/net v0.38.0 // indirect golang.org/x/sys v0.38.0 // indirect golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect + google.golang.org/protobuf v1.32.0 // indirect ) require ( @@ -60,9 +69,17 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.7 github.com/hashicorp/vault/api v1.16.0 github.com/jsonata-go/jsonata v0.0.0-20250709164031-599f35f32e5f + github.com/prometheus/client_golang v1.18.0 github.com/rabbitmq/amqp091-go v1.10.0 - github.com/redis/go-redis/v9 v9.8.0 + github.com/redis/go-redis/extra/redisotel/v9 v9.16.0 + github.com/redis/go-redis/v9 v9.16.0 github.com/rs/zerolog v1.34.0 + go.opentelemetry.io/contrib/instrumentation/runtime v0.63.0 + go.opentelemetry.io/otel v1.38.0 + go.opentelemetry.io/otel/exporters/prometheus v0.46.0 + go.opentelemetry.io/otel/metric v1.38.0 + go.opentelemetry.io/otel/sdk v1.38.0 + go.opentelemetry.io/otel/sdk/metric v1.38.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index 4bc3411..74f27a8 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,8 @@ github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= @@ -8,11 +12,15 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3 github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dlclark/regexp2 v1.11.0 h1:G/nrcoOa7ZXlpoa/91N3X7mM3r8eIlMBBJZvsz/mxKI= @@ -24,6 +32,11 @@ github.com/getkin/kin-openapi v0.133.0 h1:pJdmNohVIJ97r4AUFtEXRXwESr8b0bD721u/Tz github.com/getkin/kin-openapi v0.133.0/go.mod h1:boAciF6cXk5FhPqe/NQeBTeenbjqU4LhWBf09ILVvWE= github.com/go-jose/go-jose/v4 v4.0.1 h1:QVEPDE3OluqXBQZDcnNvQrInro2h0e4eqNbnZSWqS6U= github.com/go-jose/go-jose/v4 v4.0.1/go.mod h1:WVf9LFMHh/QVrmqrOfqun0C45tMe3RoiKJMPvgWwLfY= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE= @@ -31,8 +44,8 @@ github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ github.com/go-test/deep v1.0.8 h1:TDsG77qcSprGbC6vTN8OuXp5g+J+b5Pcguhf7Zt61VM= github.com/go-test/deep v1.0.8/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -78,6 +91,8 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= +github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= @@ -98,10 +113,22 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= +github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= +github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA= +github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos= +github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8= +github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM= +github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= -github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI= -github.com/redis/go-redis/v9 v9.8.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= +github.com/redis/go-redis/extra/rediscmd/v9 v9.16.0 h1:zAFQyFxJ3QDwpPUY/CKn22LI5+B8m/lUyffzq2+8ENs= +github.com/redis/go-redis/extra/rediscmd/v9 v9.16.0/go.mod h1:ouOc8ujB2wdUG6o0RrqaPl2tI6cenExC0KkJQ+PHXmw= +github.com/redis/go-redis/extra/redisotel/v9 v9.16.0 h1:+a9h9qxFXdf3gX0FXnDcz7X44ZBFUPq58Gblq7aMU4s= +github.com/redis/go-redis/extra/redisotel/v9 v9.16.0/go.mod h1:EtTTC7vnKWgznfG6kBgl9ySLqd7NckRCFUBzVXdeHeI= +github.com/redis/go-redis/v9 v9.16.0 h1:OotgqgLSRCmzfqChbQyG1PHC3tLNR89DG4jdOERSEP4= +github.com/redis/go-redis/v9 v9.16.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= @@ -116,14 +143,30 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0= github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY= github.com/woodsbury/decimal128 v1.3.0 h1:8pffMNWIlC0O5vbyHWFZAt5yWvWcrHA+3ovIIjVWss0= github.com/woodsbury/decimal128 v1.3.0/go.mod h1:C5UTmyTjW3JftjUFzOVhC20BEQa2a4ZKOB5I6Zjb+ds= github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03 h1:m1h+vudopHsI67FPT9MOncyndWhTcdUoBtI1R1uajGY= github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03/go.mod h1:8sheVFH84v3PCyFY/O02mIgSQY9I6wMYPWsq7mDnEZY= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/contrib/instrumentation/runtime v0.63.0 h1:PeBoRj6af6xMI7qCupwFvTbbnd49V7n5YpG6pg8iDYQ= +go.opentelemetry.io/contrib/instrumentation/runtime v0.63.0/go.mod h1:ingqBCtMCe8I4vpz/UVzCW6sxoqgZB37nao91mLQ3Bw= +go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= +go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.opentelemetry.io/otel/exporters/prometheus v0.46.0 h1:I8WIFXR351FoLJYuloU4EgXbtNX2URfU/85pUPheIEQ= +go.opentelemetry.io/otel/exporters/prometheus v0.46.0/go.mod h1:ztwVUHe5DTR/1v7PeuGRnU5Bbd4QKYwApWmuutKsJSs= +go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= +go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= +go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= +go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg= +go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM= +go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= +go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= +go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= @@ -142,6 +185,8 @@ golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= +google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= @@ -152,3 +197,4 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= + diff --git a/install/build-plugins.sh b/install/build-plugins.sh index b790023..389f235 100755 --- a/install/build-plugins.sh +++ b/install/build-plugins.sh @@ -16,6 +16,7 @@ plugins=( "registry" "dediregistry" "reqpreprocessor" + "otelsetup" "reqmapper" "router" "schemavalidator" diff --git a/pkg/plugin/definition/metrics.go b/pkg/plugin/definition/metrics.go new file mode 100644 index 0000000..1850bb6 --- /dev/null +++ b/pkg/plugin/definition/metrics.go @@ -0,0 +1,15 @@ +package definition + +import ( + "context" + + "github.com/beckn-one/beckn-onix/pkg/telemetry" +) + +// OtelSetupMetricsProvider encapsulates initialization of OpenTelemetry metrics +// providers. Implementations wire exporters and return a Provider that the core +// application can manage. +type OtelSetupMetricsProvider interface { + // New initializes a new telemetry provider instance with the given configuration. + New(ctx context.Context, config map[string]string) (*telemetry.Provider, func() error, error) +} diff --git a/pkg/plugin/implementation/cache/cache.go b/pkg/plugin/implementation/cache/cache.go index 375eba1..1e38f96 100644 --- a/pkg/plugin/implementation/cache/cache.go +++ b/pkg/plugin/implementation/cache/cache.go @@ -7,7 +7,12 @@ import ( "os" "time" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/beckn-one/beckn-onix/pkg/telemetry" + "github.com/redis/go-redis/extra/redisotel/v9" "github.com/redis/go-redis/v9" ) @@ -31,7 +36,8 @@ type Config struct { // Cache wraps a Redis client to provide basic caching operations. type Cache struct { - Client RedisClient + Client RedisClient + metrics *CacheMetrics } // Error variables to describe common failure modes. @@ -77,26 +83,80 @@ func New(ctx context.Context, cfg *Config) (*Cache, func() error, error) { return nil, nil, fmt.Errorf("%w: %v", ErrConnectionFail, err) } + // Enable OpenTelemetry instrumentation for tracing and metrics + // This will automatically collect Redis operation metrics and expose them via /metrics endpoint + if redisClient, ok := client.(*redis.Client); ok { + if err := redisotel.InstrumentTracing(redisClient); err != nil { + // Log error but don't fail - instrumentation is optional + log.Debugf(ctx, "Failed to instrument Redis tracing: %v", err) + } + + if err := redisotel.InstrumentMetrics(redisClient); err != nil { + // Log error but don't fail - instrumentation is optional + log.Debugf(ctx, "Failed to instrument Redis metrics: %v", err) + } + } + + metrics, _ := GetCacheMetrics(ctx) + log.Infof(ctx, "Cache connection to Redis established successfully") - return &Cache{Client: client}, client.Close, nil + return &Cache{Client: client, metrics: metrics}, client.Close, nil } // Get retrieves the value for the specified key from Redis. func (c *Cache) Get(ctx context.Context, key string) (string, error) { - return c.Client.Get(ctx, key).Result() + result, err := c.Client.Get(ctx, key).Result() + if c.metrics != nil { + attrs := []attribute.KeyValue{ + telemetry.AttrOperation.String("get"), + } + switch { + case err == redis.Nil: + c.metrics.CacheMissesTotal.Add(ctx, 1, metric.WithAttributes(attrs...)) + c.metrics.CacheOperationsTotal.Add(ctx, 1, + metric.WithAttributes(append(attrs, telemetry.AttrStatus.String("miss"))...)) + case err != nil: + c.metrics.CacheOperationsTotal.Add(ctx, 1, + metric.WithAttributes(append(attrs, telemetry.AttrStatus.String("error"))...)) + default: + c.metrics.CacheHitsTotal.Add(ctx, 1, metric.WithAttributes(attrs...)) + c.metrics.CacheOperationsTotal.Add(ctx, 1, + metric.WithAttributes(append(attrs, telemetry.AttrStatus.String("hit"))...)) + } + } + return result, err } // Set stores the given key-value pair in Redis with the specified TTL (time to live). func (c *Cache) Set(ctx context.Context, key, value string, ttl time.Duration) error { - return c.Client.Set(ctx, key, value, ttl).Err() + err := c.Client.Set(ctx, key, value, ttl).Err() + c.recordOperation(ctx, "set", err) + return err } // Delete removes the specified key from Redis. func (c *Cache) Delete(ctx context.Context, key string) error { - return c.Client.Del(ctx, key).Err() + err := c.Client.Del(ctx, key).Err() + c.recordOperation(ctx, "delete", err) + return err } // Clear removes all keys in the currently selected Redis database. func (c *Cache) Clear(ctx context.Context) error { return c.Client.FlushDB(ctx).Err() } + +func (c *Cache) recordOperation(ctx context.Context, op string, err error) { + if c.metrics == nil { + return + } + status := "success" + if err != nil { + status = "error" + } + c.metrics.CacheOperationsTotal.Add(ctx, 1, + metric.WithAttributes( + telemetry.AttrOperation.String(op), + telemetry.AttrStatus.String(status), + )) +} diff --git a/pkg/plugin/implementation/cache/cache_metrics.go b/pkg/plugin/implementation/cache/cache_metrics.go new file mode 100644 index 0000000..b15f298 --- /dev/null +++ b/pkg/plugin/implementation/cache/cache_metrics.go @@ -0,0 +1,69 @@ +package cache + +import ( + "context" + "fmt" + "sync" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" +) + +// CacheMetrics exposes cache-related metric instruments. +type CacheMetrics struct { + CacheOperationsTotal metric.Int64Counter + CacheHitsTotal metric.Int64Counter + CacheMissesTotal metric.Int64Counter +} + +var ( + cacheMetricsInstance *CacheMetrics + cacheMetricsOnce sync.Once + cacheMetricsErr error +) + +// GetCacheMetrics lazily initializes cache metric instruments and returns a cached reference. +func GetCacheMetrics(ctx context.Context) (*CacheMetrics, error) { + cacheMetricsOnce.Do(func() { + cacheMetricsInstance, cacheMetricsErr = newCacheMetrics() + }) + return cacheMetricsInstance, cacheMetricsErr +} + +func newCacheMetrics() (*CacheMetrics, error) { + meter := otel.GetMeterProvider().Meter( + "github.com/beckn-one/beckn-onix/cache", + metric.WithInstrumentationVersion("1.0.0"), + ) + + m := &CacheMetrics{} + var err error + + if m.CacheOperationsTotal, err = meter.Int64Counter( + "onix_cache_operations_total", + metric.WithDescription("Redis cache operations"), + metric.WithUnit("{operation}"), + ); err != nil { + return nil, fmt.Errorf("onix_cache_operations_total: %w", err) + } + + if m.CacheHitsTotal, err = meter.Int64Counter( + "onix_cache_hits_total", + metric.WithDescription("Redis cache hits"), + metric.WithUnit("{hit}"), + ); err != nil { + return nil, fmt.Errorf("onix_cache_hits_total: %w", err) + } + + if m.CacheMissesTotal, err = meter.Int64Counter( + "onix_cache_misses_total", + metric.WithDescription("Redis cache misses"), + metric.WithUnit("{miss}"), + ); err != nil { + return nil, fmt.Errorf("onix_cache_misses_total: %w", err) + } + + return m, nil +} + + diff --git a/pkg/plugin/implementation/otelsetup/cmd/plugin.go b/pkg/plugin/implementation/otelsetup/cmd/plugin.go new file mode 100644 index 0000000..260231e --- /dev/null +++ b/pkg/plugin/implementation/otelsetup/cmd/plugin.go @@ -0,0 +1,79 @@ +package main + +import ( + "context" + "errors" + "strconv" + "time" + + "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/beckn-one/beckn-onix/pkg/plugin/implementation/otelsetup" + "github.com/beckn-one/beckn-onix/pkg/telemetry" +) + +// metricsProvider implements the OtelSetupMetricsProvider interface for the otelsetup plugin. +type metricsProvider struct { + impl otelsetup.Setup +} + +// New creates a new telemetry provider instance. +func (m metricsProvider) New(ctx context.Context, config map[string]string) (*telemetry.Provider, func() error, error) { + if ctx == nil { + return nil, nil, errors.New("context cannot be nil") + } + + // Convert map[string]string to otelsetup.Config + telemetryConfig := &otelsetup.Config{ + ServiceName: config["serviceName"], + ServiceVersion: config["serviceVersion"], + Environment: config["environment"], + MetricsPort: config["metricsPort"], + } + + // Parse enableMetrics as boolean + if enableMetricsStr, ok := config["enableMetrics"]; ok && enableMetricsStr != "" { + enableMetrics, err := strconv.ParseBool(enableMetricsStr) + if err != nil { + log.Warnf(ctx, "Invalid enableMetrics value '%s', defaulting to true: %v", enableMetricsStr, err) + telemetryConfig.EnableMetrics = true + } else { + telemetryConfig.EnableMetrics = enableMetrics + } + } else { + telemetryConfig.EnableMetrics = true // Default to true if not specified or empty + } + + // Apply defaults if fields are empty + if telemetryConfig.ServiceName == "" { + telemetryConfig.ServiceName = otelsetup.DefaultConfig().ServiceName + } + if telemetryConfig.ServiceVersion == "" { + telemetryConfig.ServiceVersion = otelsetup.DefaultConfig().ServiceVersion + } + if telemetryConfig.Environment == "" { + telemetryConfig.Environment = otelsetup.DefaultConfig().Environment + } + + log.Debugf(ctx, "Telemetry config mapped: %+v", telemetryConfig) + provider, err := m.impl.New(ctx, telemetryConfig) + if err != nil { + log.Errorf(ctx, err, "Failed to create telemetry provider instance") + return nil, nil, err + } + + // Wrap the Shutdown function to match the closer signature + var closer func() error + if provider != nil && provider.Shutdown != nil { + closer = func() error { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return provider.Shutdown(shutdownCtx) + } + } + + log.Infof(ctx, "Telemetry provider instance created successfully") + return provider, closer, nil +} + +// Provider is the exported plugin instance +var Provider = metricsProvider{} diff --git a/pkg/plugin/implementation/otelsetup/cmd/plugin_test.go b/pkg/plugin/implementation/otelsetup/cmd/plugin_test.go new file mode 100644 index 0000000..79a2783 --- /dev/null +++ b/pkg/plugin/implementation/otelsetup/cmd/plugin_test.go @@ -0,0 +1,296 @@ +package main + +import ( + "context" + "testing" + + "github.com/beckn-one/beckn-onix/pkg/plugin/implementation/otelsetup" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMetricsProviderNew_Success(t *testing.T) { + provider := metricsProvider{} + + tests := []struct { + name string + ctx context.Context + config map[string]string + }{ + { + name: "Valid config with all fields", + ctx: context.Background(), + config: map[string]string{ + "serviceName": "test-service", + "serviceVersion": "1.0.0", + "enableMetrics": "true", + "environment": "test", + }, + }, + { + name: "Valid config with minimal fields (uses defaults)", + ctx: context.Background(), + config: map[string]string{}, + }, + { + name: "Valid config with enableMetrics false", + ctx: context.Background(), + config: map[string]string{ + "enableMetrics": "false", + }, + }, + { + name: "Valid config with partial fields", + ctx: context.Background(), + config: map[string]string{ + "serviceName": "custom-service", + "serviceVersion": "2.0.0", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + telemetryProvider, cleanup, err := provider.New(tt.ctx, tt.config) + + require.NoError(t, err, "New() should not return error") + require.NotNil(t, telemetryProvider, "New() should return non-nil provider") + + // Metrics server is started inside provider when enabled; MetricsHandler is not exposed. + if cleanup != nil { + err := cleanup() + assert.NoError(t, err, "cleanup() should not return error") + } + }) + } +} + +func TestMetricsProviderNew_Failure(t *testing.T) { + provider := metricsProvider{} + + tests := []struct { + name string + ctx context.Context + config map[string]string + wantErr bool + }{ + { + name: "Nil context", + ctx: nil, + config: map[string]string{}, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + telemetryProvider, cleanup, err := provider.New(tt.ctx, tt.config) + + if tt.wantErr { + assert.Error(t, err, "New() should return error for nil context") + assert.Nil(t, telemetryProvider, "New() should return nil provider on error") + assert.Nil(t, cleanup, "New() should return nil cleanup on error") + } else { + assert.NoError(t, err, "New() should not return error") + assert.NotNil(t, telemetryProvider, "New() should return non-nil provider") + } + }) + } +} + +func TestMetricsProviderNew_ConfigConversion(t *testing.T) { + provider := metricsProvider{} + ctx := context.Background() + + tests := []struct { + name string + config map[string]string + expectedConfig *otelsetup.Config + }{ + { + name: "All fields provided", + config: map[string]string{ + "serviceName": "my-service", + "serviceVersion": "3.0.0", + "enableMetrics": "true", + "environment": "production", + }, + expectedConfig: &otelsetup.Config{ + ServiceName: "my-service", + ServiceVersion: "3.0.0", + EnableMetrics: true, + Environment: "production", + }, + }, + { + name: "Empty config uses defaults", + config: map[string]string{}, + expectedConfig: &otelsetup.Config{ + ServiceName: otelsetup.DefaultConfig().ServiceName, + ServiceVersion: otelsetup.DefaultConfig().ServiceVersion, + EnableMetrics: true, // Default when not specified + Environment: otelsetup.DefaultConfig().Environment, + }, + }, + { + name: "EnableMetrics false", + config: map[string]string{ + "enableMetrics": "false", + }, + expectedConfig: &otelsetup.Config{ + ServiceName: otelsetup.DefaultConfig().ServiceName, + ServiceVersion: otelsetup.DefaultConfig().ServiceVersion, + EnableMetrics: false, + Environment: otelsetup.DefaultConfig().Environment, + }, + }, + { + name: "Invalid enableMetrics defaults to true", + config: map[string]string{ + "enableMetrics": "invalid", + }, + expectedConfig: &otelsetup.Config{ + ServiceName: otelsetup.DefaultConfig().ServiceName, + ServiceVersion: otelsetup.DefaultConfig().ServiceVersion, + EnableMetrics: true, // Defaults to true on parse error + Environment: otelsetup.DefaultConfig().Environment, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + telemetryProvider, cleanup, err := provider.New(ctx, tt.config) + + require.NoError(t, err, "New() should not return error") + require.NotNil(t, telemetryProvider, "New() should return non-nil provider") + + if cleanup != nil { + err := cleanup() + assert.NoError(t, err, "cleanup() should not return error") + } + }) + } +} + +func TestMetricsProviderNew_BooleanParsing(t *testing.T) { + provider := metricsProvider{} + ctx := context.Background() + + tests := []struct { + name string + enableMetrics string + expected bool + }{ + { + name: "True string", + enableMetrics: "true", + expected: true, + }, + { + name: "False string", + enableMetrics: "false", + expected: false, + }, + { + name: "True uppercase", + enableMetrics: "TRUE", + expected: true, + }, + { + name: "False uppercase", + enableMetrics: "FALSE", + expected: false, + }, + { + name: "Invalid value defaults to true", + enableMetrics: "invalid", + expected: true, // Defaults to true on parse error + }, + { + name: "Empty string defaults to true", + enableMetrics: "", + expected: true, // Defaults to true when not specified + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + config := map[string]string{ + "enableMetrics": tt.enableMetrics, + } + + telemetryProvider, cleanup, err := provider.New(ctx, config) + + require.NoError(t, err, "New() should not return error") + require.NotNil(t, telemetryProvider, "New() should return non-nil provider") + + if cleanup != nil { + err := cleanup() + assert.NoError(t, err, "cleanup() should not return error") + } + }) + } +} + +func TestMetricsProviderNew_CleanupFunction(t *testing.T) { + provider := metricsProvider{} + ctx := context.Background() + + config := map[string]string{ + "serviceName": "test-service", + "serviceVersion": "1.0.0", + "enableMetrics": "true", + "environment": "test", + } + + telemetryProvider, cleanup, err := provider.New(ctx, config) + + require.NoError(t, err, "New() should not return error") + require.NotNil(t, telemetryProvider, "New() should return non-nil provider") + require.NotNil(t, cleanup, "New() should return non-nil cleanup function") + + // Test that cleanup can be called successfully + err = cleanup() + assert.NoError(t, err, "cleanup() should not return error") +} + +func TestProviderVariable(t *testing.T) { + assert.NotNil(t, Provider, "Provider should not be nil") + + // Verify Provider implements the interface correctly + ctx := context.Background() + config := map[string]string{ + "serviceName": "test", + "serviceVersion": "1.0.0", + "enableMetrics": "true", + } + + telemetryProvider, cleanup, err := Provider.New(ctx, config) + + require.NoError(t, err, "Provider.New() should not return error") + require.NotNil(t, telemetryProvider, "Provider.New() should return non-nil provider") + + if cleanup != nil { + err := cleanup() + assert.NoError(t, err, "cleanup() should not return error") + } +} + +func TestMetricsProviderNew_DefaultValues(t *testing.T) { + provider := metricsProvider{} + ctx := context.Background() + + // Test with completely empty config + config := map[string]string{} + + telemetryProvider, cleanup, err := provider.New(ctx, config) + + require.NoError(t, err, "New() should not return error with empty config") + require.NotNil(t, telemetryProvider, "New() should return non-nil provider") + + if cleanup != nil { + err := cleanup() + assert.NoError(t, err, "cleanup() should not return error") + } +} diff --git a/pkg/plugin/implementation/otelsetup/otelsetup.go b/pkg/plugin/implementation/otelsetup/otelsetup.go new file mode 100644 index 0000000..4b52d78 --- /dev/null +++ b/pkg/plugin/implementation/otelsetup/otelsetup.go @@ -0,0 +1,169 @@ +package otelsetup + +import ( + "context" + "fmt" + "net" + "net/http" + "sync" + "time" + + clientprom "github.com/prometheus/client_golang/prometheus" + clientpromhttp "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/contrib/instrumentation/runtime" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + otelprom "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + + "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/beckn-one/beckn-onix/pkg/plugin" + "github.com/beckn-one/beckn-onix/pkg/telemetry" +) + +// Setup wires the telemetry provider. This is the concrete implementation +// behind the OtelSetupMetricsProvider interface. +type Setup struct{} + +// Config represents OpenTelemetry related configuration. +type Config struct { + ServiceName string `yaml:"serviceName"` + ServiceVersion string `yaml:"serviceVersion"` + EnableMetrics bool `yaml:"enableMetrics"` + Environment string `yaml:"environment"` + MetricsPort string `yaml:"metricsPort"` +} + +// DefaultConfig returns sensible defaults for telemetry configuration. +func DefaultConfig() *Config { + return &Config{ + ServiceName: "beckn-onix", + ServiceVersion: "dev", + EnableMetrics: true, + Environment: "development", + MetricsPort: "9090", + } +} + +// ToPluginConfig converts Config to plugin.Config format. +func ToPluginConfig(cfg *Config) *plugin.Config { + return &plugin.Config{ + ID: "otelsetup", + Config: map[string]string{ + "serviceName": cfg.ServiceName, + "serviceVersion": cfg.ServiceVersion, + "enableMetrics": fmt.Sprintf("%t", cfg.EnableMetrics), + "environment": cfg.Environment, + "metricsPort": cfg.MetricsPort, + }, + } +} + +// New initializes the underlying telemetry provider. The returned provider +// exposes the HTTP handler and shutdown hooks that the core application can +// manage directly. +func (Setup) New(ctx context.Context, cfg *Config) (*telemetry.Provider, error) { + if cfg == nil { + return nil, fmt.Errorf("telemetry config cannot be nil") + } + + // Apply defaults if fields are empty + if cfg.ServiceName == "" { + cfg.ServiceName = DefaultConfig().ServiceName + } + if cfg.ServiceVersion == "" { + cfg.ServiceVersion = DefaultConfig().ServiceVersion + } + if cfg.Environment == "" { + cfg.Environment = DefaultConfig().Environment + } + if cfg.MetricsPort == "" { + cfg.MetricsPort = DefaultConfig().MetricsPort + } + + if !cfg.EnableMetrics { + log.Info(ctx, "OpenTelemetry metrics disabled") + return &telemetry.Provider{ + Shutdown: func(context.Context) error { return nil }, + }, nil + } + + res, err := resource.New( + ctx, + resource.WithAttributes( + attribute.String("service.name", cfg.ServiceName), + attribute.String("service.version", cfg.ServiceVersion), + attribute.String("deployment.environment", cfg.Environment), + ), + ) + if err != nil { + return nil, fmt.Errorf("failed to create telemetry resource: %w", err) + } + + registry := clientprom.NewRegistry() + + exporter, err := otelprom.New( + otelprom.WithRegisterer(registry), + otelprom.WithoutUnits(), + otelprom.WithoutScopeInfo(), + ) + if err != nil { + return nil, fmt.Errorf("failed to create prometheus exporter: %w", err) + } + + meterProvider := metric.NewMeterProvider( + metric.WithReader(exporter), + metric.WithResource(res), + ) + + otel.SetMeterProvider(meterProvider) + log.Infof(ctx, "OpenTelemetry metrics initialized for service=%s version=%s env=%s", + cfg.ServiceName, cfg.ServiceVersion, cfg.Environment) + + if err := runtime.Start(runtime.WithMinimumReadMemStatsInterval(0)); err != nil { + log.Warnf(ctx, "Failed to start Go runtime instrumentation: %v", err) + } + + // Create metrics handler + metricsHandler := clientpromhttp.HandlerFor(registry, clientpromhttp.HandlerOpts{}) + + // Create and start metrics HTTP server + metricsMux := http.NewServeMux() + metricsMux.Handle("/metrics", metricsHandler) + + metricsServer := &http.Server{ + Addr: net.JoinHostPort("", cfg.MetricsPort), + Handler: metricsMux, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + IdleTimeout: 30 * time.Second, + } + + var serverWg sync.WaitGroup + serverWg.Add(1) + go func() { + defer serverWg.Done() + log.Infof(ctx, "Metrics server listening on %s", metricsServer.Addr) + if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Errorf(ctx, fmt.Errorf("metrics server ListenAndServe: %w", err), "error listening and serving metrics") + } + }() + + return &telemetry.Provider{ + MeterProvider: meterProvider, + MetricsHandler: metricsHandler, + Shutdown: func(shutdownCtx context.Context) error { + log.Infof(ctx, "Shutting down metrics server...") + // Shutdown the metrics server + serverShutdownCtx, cancel := context.WithTimeout(shutdownCtx, 10*time.Second) + defer cancel() + if err := metricsServer.Shutdown(serverShutdownCtx); err != nil { + log.Errorf(ctx, fmt.Errorf("metrics server shutdown: %w", err), "error shutting down metrics server") + } + serverWg.Wait() + // Shutdown the meter provider + return meterProvider.Shutdown(shutdownCtx) + }, + }, nil +} diff --git a/pkg/plugin/implementation/otelsetup/otelsetup_test.go b/pkg/plugin/implementation/otelsetup/otelsetup_test.go new file mode 100644 index 0000000..916b632 --- /dev/null +++ b/pkg/plugin/implementation/otelsetup/otelsetup_test.go @@ -0,0 +1,259 @@ +package otelsetup + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSetup_New_Success(t *testing.T) { + setup := Setup{} + ctx := context.Background() + + tests := []struct { + name string + cfg *Config + }{ + { + name: "Valid config with all fields", + cfg: &Config{ + ServiceName: "test-service", + ServiceVersion: "1.0.0", + EnableMetrics: true, + Environment: "test", + }, + }, + { + name: "Valid config with metrics disabled", + cfg: &Config{ + ServiceName: "test-service", + ServiceVersion: "1.0.0", + EnableMetrics: false, + Environment: "test", + }, + }, + { + name: "Config with empty fields uses defaults", + cfg: &Config{ + ServiceName: "", + ServiceVersion: "", + EnableMetrics: true, + Environment: "", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + provider, err := setup.New(ctx, tt.cfg) + + require.NoError(t, err, "New() should not return error") + require.NotNil(t, provider, "New() should return non-nil provider") + require.NotNil(t, provider.Shutdown, "Provider should have shutdown function") + + if tt.cfg.EnableMetrics { + assert.NotNil(t, provider.MeterProvider, "MeterProvider should be set when metrics enabled") + } + + // Test shutdown + err = provider.Shutdown(ctx) + assert.NoError(t, err, "Shutdown should not return error") + }) + } +} + +func TestSetup_New_Failure(t *testing.T) { + setup := Setup{} + ctx := context.Background() + + tests := []struct { + name string + cfg *Config + wantErr bool + }{ + { + name: "Nil config", + cfg: nil, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + provider, err := setup.New(ctx, tt.cfg) + + if tt.wantErr { + assert.Error(t, err, "New() should return error") + assert.Nil(t, provider, "New() should return nil provider on error") + } else { + assert.NoError(t, err, "New() should not return error") + assert.NotNil(t, provider, "New() should return non-nil provider") + } + }) + } +} + +func TestSetup_New_DefaultValues(t *testing.T) { + setup := Setup{} + ctx := context.Background() + + // Test with empty fields - should use defaults + cfg := &Config{ + ServiceName: "", + ServiceVersion: "", + EnableMetrics: true, + Environment: "", + } + + provider, err := setup.New(ctx, cfg) + require.NoError(t, err) + require.NotNil(t, provider) + + // Verify defaults are applied by checking that provider is functional + assert.NotNil(t, provider.MeterProvider, "MeterProvider should be set with defaults") + + // Cleanup + err = provider.Shutdown(ctx) + assert.NoError(t, err) +} + +func TestSetup_New_MetricsDisabled(t *testing.T) { + setup := Setup{} + ctx := context.Background() + + cfg := &Config{ + ServiceName: "test-service", + ServiceVersion: "1.0.0", + EnableMetrics: false, + Environment: "test", + } + + provider, err := setup.New(ctx, cfg) + require.NoError(t, err) + require.NotNil(t, provider) + + // When metrics are disabled, MetricsHandler should be nil and MeterProvider should be nil + assert.Nil(t, provider.MeterProvider, "MeterProvider should be nil when metrics disabled") + + // Shutdown should still work + err = provider.Shutdown(ctx) + assert.NoError(t, err, "Shutdown should work even when metrics disabled") +} + +func TestToPluginConfig_Success(t *testing.T) { + tests := []struct { + name string + cfg *Config + expectedID string + expectedConfig map[string]string + }{ + { + name: "Valid config with all fields", + cfg: &Config{ + ServiceName: "test-service", + ServiceVersion: "1.0.0", + EnableMetrics: true, + Environment: "test", + }, + expectedID: "otelsetup", + expectedConfig: map[string]string{ + "serviceName": "test-service", + "serviceVersion": "1.0.0", + "enableMetrics": "true", + "environment": "test", + "metricsPort": "", + }, + }, + { + name: "Config with enableMetrics false", + cfg: &Config{ + ServiceName: "my-service", + ServiceVersion: "2.0.0", + EnableMetrics: false, + Environment: "production", + }, + expectedID: "otelsetup", + expectedConfig: map[string]string{ + "serviceName": "my-service", + "serviceVersion": "2.0.0", + "enableMetrics": "false", + "environment": "production", + "metricsPort": "", + }, + }, + { + name: "Config with empty fields", + cfg: &Config{ + ServiceName: "", + ServiceVersion: "", + EnableMetrics: true, + Environment: "", + }, + expectedID: "otelsetup", + expectedConfig: map[string]string{ + "serviceName": "", + "serviceVersion": "", + "enableMetrics": "true", + "environment": "", + "metricsPort": "", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := ToPluginConfig(tt.cfg) + + require.NotNil(t, result, "ToPluginConfig should return non-nil config") + assert.Equal(t, tt.expectedID, result.ID, "Plugin ID should be 'otelsetup'") + assert.Equal(t, tt.expectedConfig, result.Config, "Config map should match expected values") + }) + } +} + +func TestToPluginConfig_NilConfig(t *testing.T) { + // Test that ToPluginConfig handles nil config + // Note: This will panic if nil is passed, which is acceptable behavior + // as the function expects a valid config. In practice, callers should check for nil. + assert.Panics(t, func() { + ToPluginConfig(nil) + }, "ToPluginConfig should panic when given nil config") +} + +func TestToPluginConfig_BooleanConversion(t *testing.T) { + tests := []struct { + name string + enableMetrics bool + expected string + }{ + { + name: "EnableMetrics true", + enableMetrics: true, + expected: "true", + }, + { + name: "EnableMetrics false", + enableMetrics: false, + expected: "false", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := &Config{ + ServiceName: "test", + ServiceVersion: "1.0.0", + EnableMetrics: tt.enableMetrics, + Environment: "test", + MetricsPort: "", + } + + result := ToPluginConfig(cfg) + require.NotNil(t, result) + assert.Equal(t, tt.expected, result.Config["enableMetrics"], "enableMetrics should be converted to string correctly") + assert.Equal(t, "", result.Config["metricsPort"], "metricsPort should be included even when empty") + }) + } +} diff --git a/pkg/plugin/implementation/router/router_test.go b/pkg/plugin/implementation/router/router_test.go index c0a7356..8642e61 100644 --- a/pkg/plugin/implementation/router/router_test.go +++ b/pkg/plugin/implementation/router/router_test.go @@ -683,7 +683,6 @@ func TestExcludeActionWithNonURLTargetTypes(t *testing.T) { } } - // TestV2RouteSuccess tests v2 routing with domain-agnostic behavior func TestV2RouteSuccess(t *testing.T) { ctx := context.Background() diff --git a/pkg/plugin/manager.go b/pkg/plugin/manager.go index a06ade2..ebc4316 100644 --- a/pkg/plugin/manager.go +++ b/pkg/plugin/manager.go @@ -15,6 +15,7 @@ import ( "github.com/beckn-one/beckn-onix/pkg/log" "github.com/beckn-one/beckn-onix/pkg/plugin/definition" + "github.com/beckn-one/beckn-onix/pkg/telemetry" ) type onixPlugin interface { @@ -196,6 +197,33 @@ func (m *Manager) Middleware(ctx context.Context, cfg *Config) (func(http.Handle return mwp.New(ctx, cfg.Config) } +// OtelSetup initializes OpenTelemetry via a dedicated plugin. The plugin is +// expected to return a telemetry Provider that the core application can use for +// instrumentation. +func (m *Manager) OtelSetup(ctx context.Context, cfg *Config) (*telemetry.Provider, error) { + if cfg == nil { + log.Info(ctx, "Telemetry config not provided; skipping OpenTelemetry setup") + return nil, nil + } + + otp, err := provider[definition.OtelSetupMetricsProvider](m.plugins, cfg.ID) + if err != nil { + return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err) + } + provider, closer, err := otp.New(ctx, cfg.Config) + if err != nil { + return nil, err + } + if closer != nil { + m.closers = append(m.closers, func() { + if err := closer(); err != nil { + log.Errorf(context.Background(), err, "Failed to shutdown telemetry provider") + } + }) + } + return provider, nil +} + // TransportWrapper returns a TransportWrapper instance based on the provided configuration. func (m *Manager) TransportWrapper(ctx context.Context, cfg *Config) (definition.TransportWrapper, error) { twp, err := provider[definition.TransportWrapperProvider](m.plugins, cfg.ID) diff --git a/pkg/telemetry/metrics_test.go b/pkg/telemetry/metrics_test.go new file mode 100644 index 0000000..1c3663a --- /dev/null +++ b/pkg/telemetry/metrics_test.go @@ -0,0 +1,28 @@ +package telemetry + +import ( + "context" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNewProviderAndMetrics(t *testing.T) { + ctx := context.Background() + provider, err := NewTestProvider(ctx) + require.NoError(t, err) + require.NotNil(t, provider) + require.NotNil(t, provider.MetricsHandler) + + metrics, err := GetMetrics(ctx) + require.NoError(t, err) + require.NotNil(t, metrics) + + rec := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/metrics", nil) + provider.MetricsHandler.ServeHTTP(rec, req) + require.Equal(t, 200, rec.Code) + + require.NoError(t, provider.Shutdown(context.Background())) +} diff --git a/pkg/telemetry/pluginMetrics.go b/pkg/telemetry/pluginMetrics.go new file mode 100644 index 0000000..c6d83ce --- /dev/null +++ b/pkg/telemetry/pluginMetrics.go @@ -0,0 +1,86 @@ +package telemetry + +import ( + "context" + "fmt" + "sync" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// Metrics exposes strongly typed metric instruments used across the adapter. +// Note: Most metrics have been moved to their respective modules. Only plugin-level +// metrics remain here. See: +// - OTel setup: pkg/plugin/implementation/otelsetup +// - Step metrics: core/module/handler/step_metrics.go +// - Cache metrics: pkg/plugin/implementation/cache/cache_metrics.go +// - Handler metrics: core/module/handler/handlerMetrics.go +type Metrics struct { + PluginExecutionDuration metric.Float64Histogram + PluginErrorsTotal metric.Int64Counter +} + +var ( + metricsInstance *Metrics + metricsOnce sync.Once + metricsErr error +) + +// Attribute keys shared across instruments. +var ( + AttrModule = attribute.Key("module") + AttrSubsystem = attribute.Key("subsystem") + AttrName = attribute.Key("name") + AttrStep = attribute.Key("step") + AttrRole = attribute.Key("role") + AttrAction = attribute.Key("action") + AttrHTTPMethod = attribute.Key("http_method") + AttrHTTPStatus = attribute.Key("http_status_code") + AttrStatus = attribute.Key("status") + AttrErrorType = attribute.Key("error_type") + AttrPluginID = attribute.Key("plugin_id") + AttrPluginType = attribute.Key("plugin_type") + AttrOperation = attribute.Key("operation") + AttrRouteType = attribute.Key("route_type") + AttrTargetType = attribute.Key("target_type") + AttrSchemaVersion = attribute.Key("schema_version") +) + +// GetMetrics lazily initializes instruments and returns a cached reference. +func GetMetrics(ctx context.Context) (*Metrics, error) { + metricsOnce.Do(func() { + metricsInstance, metricsErr = newMetrics() + }) + return metricsInstance, metricsErr +} + +func newMetrics() (*Metrics, error) { + meter := otel.GetMeterProvider().Meter( + "github.com/beckn-one/beckn-onix/telemetry", + metric.WithInstrumentationVersion("1.0.0"), + ) + + m := &Metrics{} + var err error + + if m.PluginExecutionDuration, err = meter.Float64Histogram( + "onix_plugin_execution_duration_seconds", + metric.WithDescription("Plugin execution time"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1), + ); err != nil { + return nil, fmt.Errorf("onix_plugin_execution_duration_seconds: %w", err) + } + + if m.PluginErrorsTotal, err = meter.Int64Counter( + "onix_plugin_errors_total", + metric.WithDescription("Plugin level errors"), + metric.WithUnit("{error}"), + ); err != nil { + return nil, fmt.Errorf("onix_plugin_errors_total: %w", err) + } + + return m, nil +} diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go new file mode 100644 index 0000000..c5b70df --- /dev/null +++ b/pkg/telemetry/telemetry.go @@ -0,0 +1,15 @@ +package telemetry + +import ( + "context" + "net/http" + + "go.opentelemetry.io/otel/sdk/metric" +) + +// Provider holds references to telemetry components that need coordinated shutdown. +type Provider struct { + MeterProvider *metric.MeterProvider + MetricsHandler http.Handler + Shutdown func(context.Context) error +} diff --git a/pkg/telemetry/test_helper.go b/pkg/telemetry/test_helper.go new file mode 100644 index 0000000..627965b --- /dev/null +++ b/pkg/telemetry/test_helper.go @@ -0,0 +1,54 @@ +package telemetry + +import ( + "context" + + clientprom "github.com/prometheus/client_golang/prometheus" + clientpromhttp "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + otelprom "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" +) + +// NewTestProvider creates a minimal telemetry provider for testing purposes. +// This avoids import cycles by not depending on the otelsetup package. +func NewTestProvider(ctx context.Context) (*Provider, error) { + res, err := resource.New( + ctx, + resource.WithAttributes( + attribute.String("service.name", "test-service"), + attribute.String("service.version", "test"), + attribute.String("deployment.environment", "test"), + ), + ) + if err != nil { + return nil, err + } + + registry := clientprom.NewRegistry() + exporter, err := otelprom.New( + otelprom.WithRegisterer(registry), + otelprom.WithoutUnits(), + otelprom.WithoutScopeInfo(), + ) + if err != nil { + return nil, err + } + + meterProvider := metric.NewMeterProvider( + metric.WithReader(exporter), + metric.WithResource(res), + ) + + otel.SetMeterProvider(meterProvider) + + return &Provider{ + MeterProvider: meterProvider, + MetricsHandler: clientpromhttp.HandlerFor(registry, clientpromhttp.HandlerOpts{}), + Shutdown: func(ctx context.Context) error { + return meterProvider.Shutdown(ctx) + }, + }, nil +}