Merge pull request #568 from Beckn-One/feat/observability
Feat/observability
This commit is contained in:
151
CONFIG.md
151
CONFIG.md
@@ -6,13 +6,14 @@
|
||||
3. [Top-Level Configuration](#top-level-configuration)
|
||||
4. [HTTP Configuration](#http-configuration)
|
||||
5. [Logging Configuration](#logging-configuration)
|
||||
6. [Plugin Manager Configuration](#plugin-manager-configuration)
|
||||
7. [Module Configuration](#module-configuration)
|
||||
8. [Handler Configuration](#handler-configuration)
|
||||
9. [Plugin Configuration](#plugin-configuration)
|
||||
10. [Routing Configuration](#routing-configuration)
|
||||
11. [Deployment Scenarios](#deployment-scenarios)
|
||||
12. [Configuration Examples](#configuration-examples)
|
||||
6. [Metrics Configuration](#metrics-configuration)
|
||||
7. [Plugin Manager Configuration](#plugin-manager-configuration)
|
||||
8. [Module Configuration](#module-configuration)
|
||||
9. [Handler Configuration](#handler-configuration)
|
||||
10. [Plugin Configuration](#plugin-configuration)
|
||||
11. [Routing Configuration](#routing-configuration)
|
||||
12. [Deployment Scenarios](#deployment-scenarios)
|
||||
13. [Configuration Examples](#configuration-examples)
|
||||
|
||||
---
|
||||
|
||||
@@ -70,6 +71,7 @@ The main configuration file follows this structure:
|
||||
```yaml
|
||||
appName: "onix-local"
|
||||
log: {...}
|
||||
metrics: {...}
|
||||
http: {...}
|
||||
pluginManager: {...}
|
||||
modules: [...]
|
||||
@@ -187,6 +189,120 @@ log:
|
||||
|
||||
---
|
||||
|
||||
## Application-Level Plugins Configuration
|
||||
|
||||
### `plugins`
|
||||
**Type**: `object`
|
||||
**Required**: No
|
||||
**Description**: Application-level plugin configurations. These plugins apply to the entire application and are shared across all modules.
|
||||
|
||||
#### `plugins.otelsetup`
|
||||
**Type**: `object`
|
||||
**Required**: No
|
||||
**Description**: OpenTelemetry configuration controlling whether the Prometheus exporter is enabled.
|
||||
|
||||
**Important**: This block is optional—omit it to run without telemetry. When present, the `/metrics` endpoint is exposed on a separate port (configurable via `metricsPort`) only if `enableMetrics: true`.
|
||||
|
||||
##### Parameters:
|
||||
|
||||
###### `id`
|
||||
**Type**: `string`
|
||||
**Required**: Yes
|
||||
**Description**: Plugin identifier. Must be `"otelsetup"`.
|
||||
|
||||
###### `config`
|
||||
**Type**: `object`
|
||||
**Required**: Yes
|
||||
**Description**: Plugin configuration parameters.
|
||||
|
||||
###### `config.enableMetrics`
|
||||
**Type**: `string` (boolean)
|
||||
**Required**: No
|
||||
**Default**: `"true"`
|
||||
**Description**: Enables metrics collection and the `/metrics` endpoint. Must be `"true"` or `"false"` as a string.
|
||||
|
||||
###### `config.serviceName`
|
||||
**Type**: `string`
|
||||
**Required**: No
|
||||
**Default**: `"beckn-onix"`
|
||||
**Description**: Sets the `service.name` resource attribute.
|
||||
|
||||
###### `config.serviceVersion`
|
||||
**Type**: `string`
|
||||
**Required**: No
|
||||
**Description**: Sets the `service.version` resource attribute.
|
||||
|
||||
###### `config.environment`
|
||||
**Type**: `string`
|
||||
**Required**: No
|
||||
**Default**: `"development"`
|
||||
**Description**: Sets the `deployment.environment` attribute (e.g., `development`, `staging`, `production`).
|
||||
|
||||
###### `config.metricsPort`
|
||||
**Type**: `string`
|
||||
**Required**: No
|
||||
**Default**: `"9090"`
|
||||
**Description**: Port on which the metrics HTTP server will listen. The metrics endpoint is hosted on a separate server from the main application.
|
||||
|
||||
**Example - Enable Metrics** (matches `config/local-simple.yaml`):
|
||||
```yaml
|
||||
plugins:
|
||||
otelsetup:
|
||||
id: otelsetup
|
||||
config:
|
||||
serviceName: "beckn-onix"
|
||||
serviceVersion: "1.0.0"
|
||||
enableMetrics: "true"
|
||||
environment: "development"
|
||||
metricsPort: "9090"
|
||||
```
|
||||
|
||||
### Accessing Metrics
|
||||
|
||||
When `plugins.otelsetup.config.enableMetrics: "true"`, the metrics endpoint is hosted on a separate HTTP server. Scrape metrics at:
|
||||
|
||||
```
|
||||
http://your-server:9090/metrics
|
||||
```
|
||||
|
||||
**Note**: The metrics server runs on the port specified by `config.metricsPort` (default: `9090`), which is separate from the main application port configured in `http.port`.
|
||||
|
||||
### Metrics Collected
|
||||
|
||||
Metrics are organized by module for better maintainability and encapsulation:
|
||||
|
||||
#### OTel Setup (from `otelsetup` plugin)
|
||||
- Prometheus exporter & `/metrics` endpoint on separate HTTP server
|
||||
- Go runtime instrumentation (`go_*`), resource attributes, and meter provider wiring
|
||||
|
||||
#### Step Execution Metrics (from `telemetry` package)
|
||||
- `onix_step_executions_total`, `onix_step_execution_duration_seconds`, `onix_step_errors_total`
|
||||
|
||||
#### Handler Metrics (from `handler` module)
|
||||
- `beckn_signature_validations_total` - Signature validation attempts
|
||||
- `beckn_schema_validations_total` - Schema validation attempts
|
||||
- `onix_routing_decisions_total` - Routing decisions taken by handler
|
||||
|
||||
#### Cache Metrics (from `cache` plugin)
|
||||
- `onix_cache_operations_total`, `onix_cache_hits_total`, `onix_cache_misses_total`
|
||||
|
||||
#### Plugin Metrics (from `telemetry` package)
|
||||
- `onix_plugin_execution_duration_seconds`, `onix_plugin_errors_total`
|
||||
|
||||
#### Runtime Metrics
|
||||
- Go runtime metrics (`go_*`) and Redis instrumentation via `redisotel`
|
||||
|
||||
Each metric includes consistent labels such as `module`, `role`, `action`, `status`, `step`, `plugin_id`, and `schema_version` to enable low-cardinality dashboards.
|
||||
|
||||
**Note**: Metric definitions are now located in their respective modules:
|
||||
- OTel setup: `pkg/plugin/implementation/otelsetup`
|
||||
- Step metrics: `core/module/handler/step_metrics.go`
|
||||
- Handler metrics: `core/module/handler/handlerMetrics.go`
|
||||
- Cache metrics: `pkg/plugin/implementation/cache/cache_metrics.go`
|
||||
- Plugin metrics: `pkg/telemetry/pluginMetrics.go`
|
||||
|
||||
---
|
||||
|
||||
## Plugin Manager Configuration
|
||||
|
||||
### `pluginManager`
|
||||
@@ -1045,6 +1161,7 @@ routingRules:
|
||||
- Embedded Ed25519 keys
|
||||
- Local Redis
|
||||
- Simplified routing
|
||||
- Optional metrics collection (available on separate port when enabled)
|
||||
|
||||
**Use Case**: Quick local development and testing
|
||||
|
||||
@@ -1052,6 +1169,10 @@ routingRules:
|
||||
appName: "onix-local"
|
||||
log:
|
||||
level: debug
|
||||
metrics:
|
||||
enabled: true
|
||||
exporterType: prometheus
|
||||
serviceName: onix-local
|
||||
http:
|
||||
port: 8081
|
||||
modules:
|
||||
@@ -1063,6 +1184,8 @@ modules:
|
||||
config: {}
|
||||
```
|
||||
|
||||
**Metrics Access**: When enabled, access metrics at `http://localhost:9090/metrics` (default metrics port, configurable via `plugins.otelsetup.config.metricsPort`)
|
||||
|
||||
### 2. Local Development (Vault Mode)
|
||||
|
||||
**File**: `config/local-dev.yaml`
|
||||
@@ -1096,10 +1219,21 @@ modules:
|
||||
- Production Redis
|
||||
- Remote plugin loading
|
||||
- Pub/Sub integration
|
||||
- OpenTelemetry metrics enabled (available on separate port, default: 9090)
|
||||
|
||||
**Use Case**: Single deployment serving both roles
|
||||
|
||||
```yaml
|
||||
appName: "onix-production"
|
||||
log:
|
||||
level: info
|
||||
destinations:
|
||||
- type: stdout
|
||||
metrics:
|
||||
enabled: true
|
||||
exporterType: prometheus
|
||||
serviceName: beckn-onix
|
||||
serviceVersion: "1.0.0"
|
||||
pluginManager:
|
||||
root: /app/plugins
|
||||
remoteRoot: /mnt/gcs/plugins/plugins_bundle.zip
|
||||
@@ -1122,6 +1256,9 @@ modules:
|
||||
topic: bapNetworkReciever
|
||||
```
|
||||
|
||||
**Metrics Access**:
|
||||
- Prometheus scraping: `http://your-server:9090/metrics` (default metrics port, configurable via `plugins.otelsetup.config.metricsPort`)
|
||||
|
||||
### 4. Production BAP-Only Mode
|
||||
|
||||
**File**: `config/onix-bap/adapter.yaml`
|
||||
|
||||
47
README.md
47
README.md
@@ -64,9 +64,45 @@ The **Beckn Protocol** is an open protocol that enables location-aware, local co
|
||||
### 📊 **Observability**
|
||||
- **Structured Logging**: JSON-formatted logs with contextual information
|
||||
- **Transaction Tracking**: End-to-end request tracing with unique IDs
|
||||
- **Metrics Support**: Performance and business metrics collection
|
||||
- **OpenTelemetry Metrics**: Pull-based metrics exposed via `/metrics`
|
||||
- RED metrics for every module and action (rate, errors, duration)
|
||||
- Per-step histograms with error attribution
|
||||
- Cache, routing, plugin, and business KPIs (signature/schema validations, Beckn messages)
|
||||
- Native Prometheus exporter with Grafana dashboards & alert rules (`monitoring/`)
|
||||
- Opt-in: add a `plugins.otelsetup` block in your config to wire the `otelsetup` plugin; omit it to run without metrics. Example:
|
||||
|
||||
```yaml
|
||||
plugins:
|
||||
otelsetup:
|
||||
id: otelsetup
|
||||
config:
|
||||
serviceName: "beckn-onix"
|
||||
serviceVersion: "1.0.0"
|
||||
enableMetrics: "true"
|
||||
environment: "development"
|
||||
```
|
||||
- **Modular Metrics Architecture**: Metrics are organized by module for better maintainability:
|
||||
- OTel SDK wiring via `otelsetup` plugin
|
||||
- Step execution metrics in `telemetry` package
|
||||
- Handler metrics (signature, schema, routing) in `handler` module
|
||||
- Cache metrics in `cache` plugin
|
||||
- **Runtime Instrumentation**: Go runtime + Redis client metrics baked in
|
||||
- **Health Checks**: Liveness and readiness probes for Kubernetes
|
||||
|
||||
#### Monitoring Quick Start
|
||||
```bash
|
||||
./install/build-plugins.sh
|
||||
go build -o beckn-adapter ./cmd/adapter
|
||||
./beckn-adapter --config=config/local-simple.yaml
|
||||
cd monitoring && docker-compose -f docker-compose-monitoring.yml up -d
|
||||
open http://localhost:3000 # Grafana (admin/admin)
|
||||
```
|
||||
Resources:
|
||||
- `monitoring/prometheus.yml` – scrape config
|
||||
- `monitoring/prometheus-alerts.yml` – alert rules (RED, cache, step, plugin)
|
||||
- `monitoring/grafana/dashboards/beckn-onix-overview.json` – curated dashboard
|
||||
- `docs/METRICS_RUNBOOK.md` – runbook with PromQL recipes & troubleshooting
|
||||
|
||||
### 🌐 **Multi-Domain Support**
|
||||
- **Retail & E-commerce**: Product search, order management, fulfillment tracking
|
||||
- **Mobility Services**: Ride-hailing, public transport, vehicle rentals
|
||||
@@ -356,6 +392,15 @@ modules:
|
||||
| POST | `/bpp/receiver/*` | Receives all BAP requests |
|
||||
| POST | `/bpp/caller/on_*` | Sends responses back to BAP |
|
||||
|
||||
### Observability Endpoints
|
||||
|
||||
| Method | Endpoint | Description |
|
||||
|--------|----------|-------------|
|
||||
| GET | `/health` | Health check endpoint |
|
||||
| GET | `/metrics` | Prometheus metrics endpoint (when telemetry is enabled) |
|
||||
|
||||
**Note**: The `/metrics` endpoint is available when `telemetry.enableMetrics: true` in the configuration file. It returns metrics in Prometheus format.
|
||||
|
||||
## Documentation
|
||||
|
||||
- **[Setup Guide](SETUP.md)**: Complete installation, configuration, and deployment instructions
|
||||
|
||||
@@ -17,12 +17,19 @@ import (
|
||||
"github.com/beckn-one/beckn-onix/core/module/handler"
|
||||
"github.com/beckn-one/beckn-onix/pkg/log"
|
||||
"github.com/beckn-one/beckn-onix/pkg/plugin"
|
||||
"github.com/beckn-one/beckn-onix/pkg/telemetry"
|
||||
)
|
||||
|
||||
// ApplicationPlugins holds application-level plugin configurations.
|
||||
type ApplicationPlugins struct {
|
||||
OtelSetup *plugin.Config `yaml:"otelsetup,omitempty"`
|
||||
}
|
||||
|
||||
// Config struct holds all configurations.
|
||||
type Config struct {
|
||||
AppName string `yaml:"appName"`
|
||||
Log log.Config `yaml:"log"`
|
||||
Plugins ApplicationPlugins `yaml:"plugins,omitempty"`
|
||||
PluginManager *plugin.ManagerConfig `yaml:"pluginManager"`
|
||||
Modules []module.Config `yaml:"modules"`
|
||||
HTTP httpConfig `yaml:"http"`
|
||||
@@ -91,11 +98,39 @@ func validateConfig(cfg *Config) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadAppPlugin is a generic function to load and validate application-level plugins.
|
||||
func loadAppPlugin[T any](ctx context.Context, name string, cfg *plugin.Config, mgrFunc func(context.Context, *plugin.Config) (T, error)) error {
|
||||
if cfg == nil {
|
||||
log.Debugf(ctx, "Skipping %s plugin: not configured", name)
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err := mgrFunc(ctx, cfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load %s plugin (%s): %w", name, cfg.ID, err)
|
||||
}
|
||||
|
||||
log.Debugf(ctx, "Loaded %s plugin: %s", name, cfg.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// initAppPlugins initializes application-level plugins including telemetry.
|
||||
// This function is designed to be extensible for future plugin types.
|
||||
func initAppPlugins(ctx context.Context, mgr *plugin.Manager, cfg ApplicationPlugins) error {
|
||||
if err := loadAppPlugin(ctx, "OtelSetup", cfg.OtelSetup, func(ctx context.Context, cfg *plugin.Config) (*telemetry.Provider, error) {
|
||||
return mgr.OtelSetup(ctx, cfg)
|
||||
}); err != nil {
|
||||
return fmt.Errorf("failed to initialize application plugins: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// newServer creates and initializes the HTTP server.
|
||||
func newServer(ctx context.Context, mgr handler.PluginManager, cfg *Config) (http.Handler, error) {
|
||||
mux := http.NewServeMux()
|
||||
err := module.Register(ctx, cfg.Modules, mux, mgr)
|
||||
if err != nil {
|
||||
|
||||
if err := module.Register(ctx, cfg.Modules, mux, mgr); err != nil {
|
||||
return nil, fmt.Errorf("failed to register modules: %w", err)
|
||||
}
|
||||
return mux, nil
|
||||
@@ -126,6 +161,11 @@ func run(ctx context.Context, configPath string) error {
|
||||
closers = append(closers, closer)
|
||||
log.Debug(ctx, "Plugin manager loaded.")
|
||||
|
||||
// Initialize plugins including telemetry.
|
||||
if err := initAppPlugins(ctx, mgr, cfg.Plugins); err != nil {
|
||||
return fmt.Errorf("failed to initialize plugins: %w", err)
|
||||
}
|
||||
|
||||
// Initialize HTTP server.
|
||||
log.Infof(ctx, "Initializing HTTP server")
|
||||
srv, err := newServerFunc(ctx, mgr, cfg)
|
||||
|
||||
@@ -73,6 +73,11 @@ func (m *MockPluginManager) KeyManager(ctx context.Context, cache definition.Cac
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// TransportWrapper returns a mock implementation of the TransportWrapper interface.
|
||||
func (m *MockPluginManager) TransportWrapper(ctx context.Context, cfg *plugin.Config) (definition.TransportWrapper, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// SchemaValidator returns a mock implementation of the SchemaValidator interface.
|
||||
func (m *MockPluginManager) SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error) {
|
||||
return nil, nil
|
||||
@@ -170,14 +175,19 @@ func TestRunFailure(t *testing.T) {
|
||||
|
||||
// Mock dependencies
|
||||
originalNewManager := newManagerFunc
|
||||
// newManagerFunc = func(ctx context.Context, cfg *plugin.ManagerConfig) (*plugin.Manager, func(), error) {
|
||||
// return tt.mockMgr()
|
||||
// }
|
||||
newManagerFunc = nil
|
||||
// Ensure newManagerFunc is never nil to avoid panic if invoked.
|
||||
newManagerFunc = func(ctx context.Context, cfg *plugin.ManagerConfig) (*plugin.Manager, func(), error) {
|
||||
_, closer, err := tt.mockMgr()
|
||||
if err != nil {
|
||||
return nil, closer, err
|
||||
}
|
||||
// Return a deterministic error so the code path exits cleanly if reached.
|
||||
return nil, closer, errors.New("mock manager error")
|
||||
}
|
||||
defer func() { newManagerFunc = originalNewManager }()
|
||||
|
||||
originalNewServer := newServerFunc
|
||||
newServerFunc = func(ctx context.Context, mgr handler.PluginManager, cfg *Config) (http.Handler, error) {
|
||||
originalNewServer := newServerFunc
|
||||
newServerFunc = func(ctx context.Context, mgr handler.PluginManager, cfg *Config) (http.Handler, error) {
|
||||
return tt.mockServer(ctx, mgr, cfg)
|
||||
}
|
||||
defer func() { newServerFunc = originalNewServer }()
|
||||
|
||||
25
cmd/adapter/metrics_integration_test.go
Normal file
25
cmd/adapter/metrics_integration_test.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/plugin/implementation/otelsetup"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMetricsEndpointExposesPrometheus(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
setup := otelsetup.Setup{}
|
||||
provider, err := setup.New(ctx, &otelsetup.Config{
|
||||
ServiceName: "test-onix",
|
||||
ServiceVersion: "1.0.0",
|
||||
EnableMetrics: true,
|
||||
Environment: "test",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
defer provider.Shutdown(context.Background())
|
||||
|
||||
// Metrics are served by the plugin’s own HTTP server; just ensure provider is initialized.
|
||||
require.NotNil(t, provider.MeterProvider)
|
||||
}
|
||||
@@ -8,6 +8,14 @@ log:
|
||||
- message_id
|
||||
- subscriber_id
|
||||
- module_id
|
||||
plugins:
|
||||
otelsetup:
|
||||
id: otelsetup
|
||||
config:
|
||||
serviceName: "beckn-onix"
|
||||
serviceVersion: "1.0.0"
|
||||
enableMetrics: "true"
|
||||
environment: "development"
|
||||
http:
|
||||
port: 8081
|
||||
timeout:
|
||||
|
||||
68
core/module/handler/handlerMetrics.go
Normal file
68
core/module/handler/handlerMetrics.go
Normal file
@@ -0,0 +1,68 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
// HandlerMetrics exposes handler-related metric instruments.
|
||||
type HandlerMetrics struct {
|
||||
SignatureValidationsTotal metric.Int64Counter
|
||||
SchemaValidationsTotal metric.Int64Counter
|
||||
RoutingDecisionsTotal metric.Int64Counter
|
||||
}
|
||||
|
||||
var (
|
||||
handlerMetricsInstance *HandlerMetrics
|
||||
handlerMetricsOnce sync.Once
|
||||
handlerMetricsErr error
|
||||
)
|
||||
|
||||
// GetHandlerMetrics lazily initializes handler metric instruments and returns a cached reference.
|
||||
func GetHandlerMetrics(ctx context.Context) (*HandlerMetrics, error) {
|
||||
handlerMetricsOnce.Do(func() {
|
||||
handlerMetricsInstance, handlerMetricsErr = newHandlerMetrics()
|
||||
})
|
||||
return handlerMetricsInstance, handlerMetricsErr
|
||||
}
|
||||
|
||||
func newHandlerMetrics() (*HandlerMetrics, error) {
|
||||
meter := otel.GetMeterProvider().Meter(
|
||||
"github.com/beckn-one/beckn-onix/handler",
|
||||
metric.WithInstrumentationVersion("1.0.0"),
|
||||
)
|
||||
|
||||
m := &HandlerMetrics{}
|
||||
var err error
|
||||
|
||||
if m.SignatureValidationsTotal, err = meter.Int64Counter(
|
||||
"beckn_signature_validations_total",
|
||||
metric.WithDescription("Signature validation attempts"),
|
||||
metric.WithUnit("{validation}"),
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("beckn_signature_validations_total: %w", err)
|
||||
}
|
||||
|
||||
if m.SchemaValidationsTotal, err = meter.Int64Counter(
|
||||
"beckn_schema_validations_total",
|
||||
metric.WithDescription("Schema validation attempts"),
|
||||
metric.WithUnit("{validation}"),
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("beckn_schema_validations_total: %w", err)
|
||||
}
|
||||
|
||||
if m.RoutingDecisionsTotal, err = meter.Int64Counter(
|
||||
"onix_routing_decisions_total",
|
||||
metric.WithDescription("Routing decisions taken by handler"),
|
||||
metric.WithUnit("{decision}"),
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("onix_routing_decisions_total: %w", err)
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ type stdHandler struct {
|
||||
SubscriberID string
|
||||
role model.Role
|
||||
httpClient *http.Client
|
||||
moduleName string
|
||||
}
|
||||
|
||||
// newHTTPClient creates a new HTTP client with a custom transport configuration.
|
||||
@@ -51,6 +52,7 @@ func newHTTPClient(cfg *HttpClientConfig, wrapper definition.TransportWrapper) *
|
||||
if cfg.ResponseHeaderTimeout > 0 {
|
||||
transport.ResponseHeaderTimeout = cfg.ResponseHeaderTimeout
|
||||
}
|
||||
|
||||
var finalTransport http.RoundTripper = transport
|
||||
if wrapper != nil {
|
||||
log.Debugf(context.Background(), "Applying custom transport wrapper")
|
||||
@@ -60,11 +62,12 @@ func newHTTPClient(cfg *HttpClientConfig, wrapper definition.TransportWrapper) *
|
||||
}
|
||||
|
||||
// NewStdHandler initializes a new processor with plugins and steps.
|
||||
func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Handler, error) {
|
||||
func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config, moduleName string) (http.Handler, error) {
|
||||
h := &stdHandler{
|
||||
steps: []definition.Step{},
|
||||
SubscriberID: cfg.SubscriberID,
|
||||
role: cfg.Role,
|
||||
moduleName: moduleName,
|
||||
}
|
||||
// Initialize plugins.
|
||||
if err := h.initPlugins(ctx, mgr, &cfg.Plugins); err != nil {
|
||||
@@ -81,6 +84,16 @@ func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Ha
|
||||
|
||||
// ServeHTTP processes an incoming HTTP request and executes defined processing steps.
|
||||
func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
r.Header.Set("X-Module-Name", h.moduleName)
|
||||
r.Header.Set("X-Role", string(h.role))
|
||||
|
||||
// These headers are only needed for internal instrumentation; avoid leaking them downstream.
|
||||
// Use defer to ensure cleanup regardless of return path.
|
||||
defer func() {
|
||||
r.Header.Del("X-Module-Name")
|
||||
r.Header.Del("X-Role")
|
||||
}()
|
||||
|
||||
ctx, err := h.stepCtx(r, w.Header())
|
||||
if err != nil {
|
||||
log.Errorf(r.Context(), err, "stepCtx(r):%v", err)
|
||||
@@ -297,7 +310,13 @@ func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Conf
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
h.steps = append(h.steps, s)
|
||||
instrumentedStep, wrapErr := NewInstrumentedStep(s, step, h.moduleName)
|
||||
if wrapErr != nil {
|
||||
log.Warnf(ctx, "Failed to instrument step %s: %v", step, wrapErr)
|
||||
h.steps = append(h.steps, s)
|
||||
continue
|
||||
}
|
||||
h.steps = append(h.steps, instrumentedStep)
|
||||
}
|
||||
log.Infof(ctx, "Processor steps initialized: %v", cfg.Steps)
|
||||
return nil
|
||||
|
||||
@@ -2,13 +2,17 @@ package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/log"
|
||||
"github.com/beckn-one/beckn-onix/pkg/model"
|
||||
"github.com/beckn-one/beckn-onix/pkg/plugin/definition"
|
||||
"github.com/beckn-one/beckn-onix/pkg/telemetry"
|
||||
)
|
||||
|
||||
// signStep represents the signing step in the processing pipeline.
|
||||
@@ -68,6 +72,7 @@ func (s *signStep) generateAuthHeader(subID, keyID string, createdAt, validTill
|
||||
type validateSignStep struct {
|
||||
validator definition.SignValidator
|
||||
km definition.KeyManager
|
||||
metrics *HandlerMetrics
|
||||
}
|
||||
|
||||
// newValidateSignStep initializes and returns a new validate sign step.
|
||||
@@ -78,11 +83,22 @@ func newValidateSignStep(signValidator definition.SignValidator, km definition.K
|
||||
if km == nil {
|
||||
return nil, fmt.Errorf("invalid config: KeyManager plugin not configured")
|
||||
}
|
||||
return &validateSignStep{validator: signValidator, km: km}, nil
|
||||
metrics, _ := GetHandlerMetrics(context.Background())
|
||||
return &validateSignStep{
|
||||
validator: signValidator,
|
||||
km: km,
|
||||
metrics: metrics,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run executes the validation step.
|
||||
func (s *validateSignStep) Run(ctx *model.StepContext) error {
|
||||
err := s.validateHeaders(ctx)
|
||||
s.recordMetrics(ctx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *validateSignStep) validateHeaders(ctx *model.StepContext) error {
|
||||
unauthHeader := fmt.Sprintf("Signature realm=\"%s\",headers=\"(created) (expires) digest\"", ctx.SubID)
|
||||
headerValue := ctx.Request.Header.Get(model.AuthHeaderGateway)
|
||||
if len(headerValue) != 0 {
|
||||
@@ -123,6 +139,18 @@ func (s *validateSignStep) validate(ctx *model.StepContext, value string) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *validateSignStep) recordMetrics(ctx *model.StepContext, err error) {
|
||||
if s.metrics == nil {
|
||||
return
|
||||
}
|
||||
status := "success"
|
||||
if err != nil {
|
||||
status = "failed"
|
||||
}
|
||||
s.metrics.SignatureValidationsTotal.Add(ctx.Context, 1,
|
||||
metric.WithAttributes(telemetry.AttrStatus.String(status)))
|
||||
}
|
||||
|
||||
// ParsedKeyID holds the components from the parsed Authorization header's keyId.
|
||||
type authHeader struct {
|
||||
SubscriberID string
|
||||
@@ -165,6 +193,7 @@ func parseHeader(header string) (*authHeader, error) {
|
||||
// validateSchemaStep represents the schema validation step.
|
||||
type validateSchemaStep struct {
|
||||
validator definition.SchemaValidator
|
||||
metrics *HandlerMetrics
|
||||
}
|
||||
|
||||
// newValidateSchemaStep creates and returns the validateSchema step after validation.
|
||||
@@ -173,20 +202,43 @@ func newValidateSchemaStep(schemaValidator definition.SchemaValidator) (definiti
|
||||
return nil, fmt.Errorf("invalid config: SchemaValidator plugin not configured")
|
||||
}
|
||||
log.Debug(context.Background(), "adding schema validator")
|
||||
return &validateSchemaStep{validator: schemaValidator}, nil
|
||||
metrics, _ := GetHandlerMetrics(context.Background())
|
||||
return &validateSchemaStep{
|
||||
validator: schemaValidator,
|
||||
metrics: metrics,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run executes the schema validation step.
|
||||
func (s *validateSchemaStep) Run(ctx *model.StepContext) error {
|
||||
if err := s.validator.Validate(ctx, ctx.Request.URL, ctx.Body); err != nil {
|
||||
return fmt.Errorf("schema validation failed: %w", err)
|
||||
err := s.validator.Validate(ctx, ctx.Request.URL, ctx.Body)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("schema validation failed: %w", err)
|
||||
}
|
||||
return nil
|
||||
s.recordMetrics(ctx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *validateSchemaStep) recordMetrics(ctx *model.StepContext, err error) {
|
||||
if s.metrics == nil {
|
||||
return
|
||||
}
|
||||
status := "success"
|
||||
if err != nil {
|
||||
status = "failed"
|
||||
}
|
||||
version := extractSchemaVersion(ctx.Body)
|
||||
s.metrics.SchemaValidationsTotal.Add(ctx.Context, 1,
|
||||
metric.WithAttributes(
|
||||
telemetry.AttrSchemaVersion.String(version),
|
||||
telemetry.AttrStatus.String(status),
|
||||
))
|
||||
}
|
||||
|
||||
// addRouteStep represents the route determination step.
|
||||
type addRouteStep struct {
|
||||
router definition.Router
|
||||
router definition.Router
|
||||
metrics *HandlerMetrics
|
||||
}
|
||||
|
||||
// newAddRouteStep creates and returns the addRoute step after validation.
|
||||
@@ -194,7 +246,11 @@ func newAddRouteStep(router definition.Router) (definition.Step, error) {
|
||||
if router == nil {
|
||||
return nil, fmt.Errorf("invalid config: Router plugin not configured")
|
||||
}
|
||||
return &addRouteStep{router: router}, nil
|
||||
metrics, _ := GetHandlerMetrics(context.Background())
|
||||
return &addRouteStep{
|
||||
router: router,
|
||||
metrics: metrics,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run executes the routing step.
|
||||
@@ -208,5 +264,26 @@ func (s *addRouteStep) Run(ctx *model.StepContext) error {
|
||||
PublisherID: route.PublisherID,
|
||||
URL: route.URL,
|
||||
}
|
||||
if s.metrics != nil && ctx.Route != nil {
|
||||
s.metrics.RoutingDecisionsTotal.Add(ctx.Context, 1,
|
||||
metric.WithAttributes(
|
||||
telemetry.AttrTargetType.String(ctx.Route.TargetType),
|
||||
))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func extractSchemaVersion(body []byte) string {
|
||||
type contextEnvelope struct {
|
||||
Context struct {
|
||||
Version string `json:"version"`
|
||||
} `json:"context"`
|
||||
}
|
||||
var payload contextEnvelope
|
||||
if err := json.Unmarshal(body, &payload); err == nil {
|
||||
if payload.Context.Version != "" {
|
||||
return payload.Context.Version
|
||||
}
|
||||
}
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
84
core/module/handler/step_instrumentor.go
Normal file
84
core/module/handler/step_instrumentor.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/log"
|
||||
"github.com/beckn-one/beckn-onix/pkg/model"
|
||||
"github.com/beckn-one/beckn-onix/pkg/telemetry"
|
||||
)
|
||||
|
||||
// StepRunner represents the minimal contract required for step instrumentation.
|
||||
type StepRunner interface {
|
||||
Run(*model.StepContext) error
|
||||
}
|
||||
|
||||
// InstrumentedStep wraps a processing step with telemetry instrumentation.
|
||||
type InstrumentedStep struct {
|
||||
step StepRunner
|
||||
stepName string
|
||||
moduleName string
|
||||
metrics *StepMetrics
|
||||
}
|
||||
|
||||
// NewInstrumentedStep returns a telemetry enabled wrapper around a definition.Step.
|
||||
func NewInstrumentedStep(step StepRunner, stepName, moduleName string) (*InstrumentedStep, error) {
|
||||
metrics, err := GetStepMetrics(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &InstrumentedStep{
|
||||
step: step,
|
||||
stepName: stepName,
|
||||
moduleName: moduleName,
|
||||
metrics: metrics,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type becknError interface {
|
||||
BecknError() *model.Error
|
||||
}
|
||||
|
||||
// Run executes the underlying step and records RED style metrics.
|
||||
func (is *InstrumentedStep) Run(ctx *model.StepContext) error {
|
||||
if is.metrics == nil {
|
||||
return is.step.Run(ctx)
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
err := is.step.Run(ctx)
|
||||
duration := time.Since(start).Seconds()
|
||||
|
||||
attrs := []attribute.KeyValue{
|
||||
telemetry.AttrModule.String(is.moduleName),
|
||||
telemetry.AttrStep.String(is.stepName),
|
||||
telemetry.AttrRole.String(string(ctx.Role)),
|
||||
}
|
||||
|
||||
is.metrics.StepExecutionTotal.Add(ctx.Context, 1, metric.WithAttributes(attrs...))
|
||||
is.metrics.StepExecutionDuration.Record(ctx.Context, duration, metric.WithAttributes(attrs...))
|
||||
|
||||
if err != nil {
|
||||
errorType := fmt.Sprintf("%T", err)
|
||||
var becknErr becknError
|
||||
if errors.As(err, &becknErr) {
|
||||
if be := becknErr.BecknError(); be != nil && be.Code != "" {
|
||||
errorType = be.Code
|
||||
}
|
||||
}
|
||||
|
||||
errorAttrs := append(attrs, telemetry.AttrErrorType.String(errorType))
|
||||
is.metrics.StepErrorsTotal.Add(ctx.Context, 1, metric.WithAttributes(errorAttrs...))
|
||||
log.Errorf(ctx.Context, err, "Step %s failed", is.stepName)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
52
core/module/handler/step_instrumentor_test.go
Normal file
52
core/module/handler/step_instrumentor_test.go
Normal file
@@ -0,0 +1,52 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/model"
|
||||
"github.com/beckn-one/beckn-onix/pkg/telemetry"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type stubStep struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (s stubStep) Run(ctx *model.StepContext) error {
|
||||
return s.err
|
||||
}
|
||||
|
||||
func TestInstrumentedStepSuccess(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
provider, err := telemetry.NewTestProvider(ctx)
|
||||
require.NoError(t, err)
|
||||
defer provider.Shutdown(context.Background())
|
||||
|
||||
step, err := NewInstrumentedStep(stubStep{}, "test-step", "test-module")
|
||||
require.NoError(t, err)
|
||||
|
||||
stepCtx := &model.StepContext{
|
||||
Context: context.Background(),
|
||||
Role: model.RoleBAP,
|
||||
}
|
||||
require.NoError(t, step.Run(stepCtx))
|
||||
}
|
||||
|
||||
func TestInstrumentedStepError(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
provider, err := telemetry.NewTestProvider(ctx)
|
||||
require.NoError(t, err)
|
||||
defer provider.Shutdown(context.Background())
|
||||
|
||||
step, err := NewInstrumentedStep(stubStep{err: errors.New("boom")}, "test-step", "test-module")
|
||||
require.NoError(t, err)
|
||||
|
||||
stepCtx := &model.StepContext{
|
||||
Context: context.Background(),
|
||||
Role: model.RoleBAP,
|
||||
}
|
||||
require.Error(t, step.Run(stepCtx))
|
||||
}
|
||||
|
||||
69
core/module/handler/step_metrics.go
Normal file
69
core/module/handler/step_metrics.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
// StepMetrics exposes step execution metric instruments.
|
||||
type StepMetrics struct {
|
||||
StepExecutionDuration metric.Float64Histogram
|
||||
StepExecutionTotal metric.Int64Counter
|
||||
StepErrorsTotal metric.Int64Counter
|
||||
}
|
||||
|
||||
var (
|
||||
stepMetricsInstance *StepMetrics
|
||||
stepMetricsOnce sync.Once
|
||||
stepMetricsErr error
|
||||
)
|
||||
|
||||
// GetStepMetrics lazily initializes step metric instruments and returns a cached reference.
|
||||
func GetStepMetrics(ctx context.Context) (*StepMetrics, error) {
|
||||
stepMetricsOnce.Do(func() {
|
||||
stepMetricsInstance, stepMetricsErr = newStepMetrics()
|
||||
})
|
||||
return stepMetricsInstance, stepMetricsErr
|
||||
}
|
||||
|
||||
func newStepMetrics() (*StepMetrics, error) {
|
||||
meter := otel.GetMeterProvider().Meter(
|
||||
"github.com/beckn-one/beckn-onix/telemetry",
|
||||
metric.WithInstrumentationVersion("1.0.0"),
|
||||
)
|
||||
|
||||
m := &StepMetrics{}
|
||||
var err error
|
||||
|
||||
if m.StepExecutionDuration, err = meter.Float64Histogram(
|
||||
"onix_step_execution_duration_seconds",
|
||||
metric.WithDescription("Duration of individual processing steps"),
|
||||
metric.WithUnit("s"),
|
||||
metric.WithExplicitBucketBoundaries(0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5),
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("onix_step_execution_duration_seconds: %w", err)
|
||||
}
|
||||
|
||||
if m.StepExecutionTotal, err = meter.Int64Counter(
|
||||
"onix_step_executions_total",
|
||||
metric.WithDescription("Total processing step executions"),
|
||||
metric.WithUnit("{execution}"),
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("onix_step_executions_total: %w", err)
|
||||
}
|
||||
|
||||
if m.StepErrorsTotal, err = meter.Int64Counter(
|
||||
"onix_step_errors_total",
|
||||
metric.WithDescription("Processing step errors"),
|
||||
metric.WithUnit("{error}"),
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("onix_step_errors_total: %w", err)
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
131
core/module/handler/step_metrics_test.go
Normal file
131
core/module/handler/step_metrics_test.go
Normal file
@@ -0,0 +1,131 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/telemetry"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestGetStepMetrics_Success(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Initialize telemetry provider first
|
||||
provider, err := telemetry.NewTestProvider(ctx)
|
||||
require.NoError(t, err)
|
||||
defer provider.Shutdown(context.Background())
|
||||
|
||||
// Test getting step metrics
|
||||
metrics, err := GetStepMetrics(ctx)
|
||||
require.NoError(t, err, "GetStepMetrics() should not return error")
|
||||
require.NotNil(t, metrics, "GetStepMetrics() should return non-nil metrics")
|
||||
|
||||
// Verify all metric instruments are initialized
|
||||
assert.NotNil(t, metrics.StepExecutionDuration, "StepExecutionDuration should be initialized")
|
||||
assert.NotNil(t, metrics.StepExecutionTotal, "StepExecutionTotal should be initialized")
|
||||
assert.NotNil(t, metrics.StepErrorsTotal, "StepErrorsTotal should be initialized")
|
||||
}
|
||||
|
||||
func TestGetStepMetrics_ConcurrentAccess(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Initialize telemetry provider first
|
||||
provider, err := telemetry.NewTestProvider(ctx)
|
||||
require.NoError(t, err)
|
||||
defer provider.Shutdown(context.Background())
|
||||
|
||||
// Test that GetStepMetrics is safe for concurrent access
|
||||
// and returns the same instance (singleton pattern)
|
||||
metrics1, err1 := GetStepMetrics(ctx)
|
||||
require.NoError(t, err1)
|
||||
require.NotNil(t, metrics1)
|
||||
|
||||
metrics2, err2 := GetStepMetrics(ctx)
|
||||
require.NoError(t, err2)
|
||||
require.NotNil(t, metrics2)
|
||||
|
||||
// Should return the same instance
|
||||
assert.Equal(t, metrics1, metrics2, "GetStepMetrics should return the same instance")
|
||||
}
|
||||
|
||||
func TestGetStepMetrics_WithoutProvider(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Test getting step metrics without initializing provider
|
||||
// This should still work but may not have a valid meter provider
|
||||
metrics, err := GetStepMetrics(ctx)
|
||||
// Note: This might succeed or fail depending on OTel's default behavior
|
||||
// We're just checking it doesn't panic
|
||||
if err != nil {
|
||||
t.Logf("GetStepMetrics returned error (expected if no provider): %v", err)
|
||||
} else {
|
||||
assert.NotNil(t, metrics, "Metrics should be returned even without explicit provider")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStepMetrics_Instruments(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Initialize telemetry provider
|
||||
provider, err := telemetry.NewTestProvider(ctx)
|
||||
require.NoError(t, err)
|
||||
defer provider.Shutdown(context.Background())
|
||||
|
||||
// Get step metrics
|
||||
metrics, err := GetStepMetrics(ctx)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, metrics)
|
||||
|
||||
// Test that we can record metrics (this tests the instruments are functional)
|
||||
// Note: We can't easily verify the metrics were recorded without querying the exporter,
|
||||
// but we can verify the instruments are not nil and can be called without panicking
|
||||
|
||||
// Test StepExecutionDuration
|
||||
require.NotPanics(t, func() {
|
||||
metrics.StepExecutionDuration.Record(ctx, 0.5,
|
||||
metric.WithAttributes(telemetry.AttrStep.String("test-step"), telemetry.AttrModule.String("test-module")))
|
||||
}, "StepExecutionDuration.Record should not panic")
|
||||
|
||||
// Test StepExecutionTotal
|
||||
require.NotPanics(t, func() {
|
||||
metrics.StepExecutionTotal.Add(ctx, 1,
|
||||
metric.WithAttributes(telemetry.AttrStep.String("test-step"), telemetry.AttrModule.String("test-module")))
|
||||
}, "StepExecutionTotal.Add should not panic")
|
||||
|
||||
// Test StepErrorsTotal
|
||||
require.NotPanics(t, func() {
|
||||
metrics.StepErrorsTotal.Add(ctx, 1,
|
||||
metric.WithAttributes(telemetry.AttrStep.String("test-step"), telemetry.AttrModule.String("test-module")))
|
||||
}, "StepErrorsTotal.Add should not panic")
|
||||
|
||||
// Verify metrics are exposed via HTTP handler
|
||||
rec := httptest.NewRecorder()
|
||||
req := httptest.NewRequest("GET", "/metrics", nil)
|
||||
provider.MetricsHandler.ServeHTTP(rec, req)
|
||||
assert.Equal(t, 200, rec.Code, "Metrics endpoint should return 200")
|
||||
}
|
||||
|
||||
func TestStepMetrics_MultipleCalls(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Initialize telemetry provider
|
||||
provider, err := telemetry.NewTestProvider(ctx)
|
||||
require.NoError(t, err)
|
||||
defer provider.Shutdown(context.Background())
|
||||
|
||||
// Call GetStepMetrics multiple times
|
||||
for i := 0; i < 10; i++ {
|
||||
metrics, err := GetStepMetrics(ctx)
|
||||
require.NoError(t, err, "GetStepMetrics should succeed on call %d", i)
|
||||
require.NotNil(t, metrics, "GetStepMetrics should return non-nil on call %d", i)
|
||||
assert.NotNil(t, metrics.StepExecutionDuration, "StepExecutionDuration should be initialized")
|
||||
assert.NotNil(t, metrics.StepExecutionTotal, "StepExecutionTotal should be initialized")
|
||||
assert.NotNil(t, metrics.StepErrorsTotal, "StepErrorsTotal should be initialized")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ type Config struct {
|
||||
}
|
||||
|
||||
// Provider represents a function that initializes an HTTP handler using a PluginManager.
|
||||
type Provider func(ctx context.Context, mgr handler.PluginManager, cfg *handler.Config) (http.Handler, error)
|
||||
type Provider func(ctx context.Context, mgr handler.PluginManager, cfg *handler.Config, moduleName string) (http.Handler, error)
|
||||
|
||||
// handlerProviders maintains a mapping of handler types to their respective providers.
|
||||
var handlerProviders = map[handler.Type]Provider{
|
||||
@@ -29,6 +29,7 @@ var handlerProviders = map[handler.Type]Provider{
|
||||
// It iterates over the module configurations, retrieves appropriate handler providers,
|
||||
// and registers the handlers with the HTTP multiplexer.
|
||||
func Register(ctx context.Context, mCfgs []Config, mux *http.ServeMux, mgr handler.PluginManager) error {
|
||||
|
||||
mux.Handle("/health", http.HandlerFunc(handler.HealthHandler))
|
||||
|
||||
log.Debugf(ctx, "Registering modules with config: %#v", mCfgs)
|
||||
@@ -38,7 +39,7 @@ func Register(ctx context.Context, mCfgs []Config, mux *http.ServeMux, mgr handl
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid module : %s", c.Name)
|
||||
}
|
||||
h, err := rmp(ctx, mgr, &c.Handler)
|
||||
h, err := rmp(ctx, mgr, &c.Handler, c.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s : %w", c.Name, err)
|
||||
}
|
||||
@@ -81,4 +82,4 @@ func moduleCtxMiddleware(moduleName string, next http.Handler) http.Handler {
|
||||
ctx := context.WithValue(r.Context(), model.ContextKeyModuleID, moduleName)
|
||||
next.ServeHTTP(w, r.WithContext(ctx))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -128,6 +128,7 @@ func TestRegisterSuccess(t *testing.T) {
|
||||
if capturedModuleName != "test-module" {
|
||||
t.Errorf("expected module_id in context to be 'test-module', got %v", capturedModuleName)
|
||||
}
|
||||
|
||||
// Verifying /health endpoint registration
|
||||
reqHealth := httptest.NewRequest(http.MethodGet, "/health", nil)
|
||||
recHealth := httptest.NewRecorder()
|
||||
|
||||
25
go.mod
25
go.mod
@@ -3,12 +3,11 @@ module github.com/beckn-one/beckn-onix
|
||||
go 1.24.0
|
||||
|
||||
require (
|
||||
github.com/rogpeppe/go-internal v1.13.1 // indirect
|
||||
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1
|
||||
golang.org/x/crypto v0.36.0
|
||||
)
|
||||
|
||||
require github.com/stretchr/testify v1.10.0
|
||||
require github.com/stretchr/testify v1.11.1
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
@@ -23,13 +22,15 @@ require github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03
|
||||
require golang.org/x/text v0.26.0 // indirect
|
||||
|
||||
require (
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/go-jose/go-jose/v4 v4.0.1 // indirect
|
||||
github.com/go-logr/logr v1.4.3 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.21.0 // indirect
|
||||
github.com/go-openapi/swag v0.23.0 // indirect
|
||||
github.com/google/go-cmp v0.6.0 // indirect
|
||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
|
||||
@@ -41,17 +42,25 @@ require (
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
|
||||
github.com/mitchellh/go-homedir v1.1.0 // indirect
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
|
||||
github.com/oasdiff/yaml v0.0.0-20250309154309-f31be36b4037 // indirect
|
||||
github.com/oasdiff/yaml3 v0.0.0-20250309153720-d2182401db90 // indirect
|
||||
github.com/perimeterx/marshmallow v1.1.5 // indirect
|
||||
github.com/prometheus/client_model v0.6.0 // indirect
|
||||
github.com/prometheus/common v0.45.0 // indirect
|
||||
github.com/prometheus/procfs v0.12.0 // indirect
|
||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.16.0 // indirect
|
||||
github.com/ryanuber/go-glob v1.0.0 // indirect
|
||||
github.com/woodsbury/decimal128 v1.3.0 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.38.0 // indirect
|
||||
golang.org/x/net v0.38.0 // indirect
|
||||
golang.org/x/sys v0.38.0 // indirect
|
||||
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect
|
||||
google.golang.org/protobuf v1.32.0 // indirect
|
||||
)
|
||||
|
||||
require (
|
||||
@@ -60,9 +69,17 @@ require (
|
||||
github.com/hashicorp/go-retryablehttp v0.7.7
|
||||
github.com/hashicorp/vault/api v1.16.0
|
||||
github.com/jsonata-go/jsonata v0.0.0-20250709164031-599f35f32e5f
|
||||
github.com/prometheus/client_golang v1.18.0
|
||||
github.com/rabbitmq/amqp091-go v1.10.0
|
||||
github.com/redis/go-redis/v9 v9.8.0
|
||||
github.com/redis/go-redis/extra/redisotel/v9 v9.16.0
|
||||
github.com/redis/go-redis/v9 v9.16.0
|
||||
github.com/rs/zerolog v1.34.0
|
||||
go.opentelemetry.io/contrib/instrumentation/runtime v0.63.0
|
||||
go.opentelemetry.io/otel v1.38.0
|
||||
go.opentelemetry.io/otel/exporters/prometheus v0.46.0
|
||||
go.opentelemetry.io/otel/metric v1.38.0
|
||||
go.opentelemetry.io/otel/sdk v1.38.0
|
||||
go.opentelemetry.io/otel/sdk/metric v1.38.0
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
||||
gopkg.in/yaml.v2 v2.4.0
|
||||
)
|
||||
|
||||
58
go.sum
58
go.sum
@@ -1,4 +1,8 @@
|
||||
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
|
||||
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
|
||||
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
|
||||
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
||||
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
||||
@@ -8,11 +12,15 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3
|
||||
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||
github.com/dlclark/regexp2 v1.11.0 h1:G/nrcoOa7ZXlpoa/91N3X7mM3r8eIlMBBJZvsz/mxKI=
|
||||
@@ -24,6 +32,11 @@ github.com/getkin/kin-openapi v0.133.0 h1:pJdmNohVIJ97r4AUFtEXRXwESr8b0bD721u/Tz
|
||||
github.com/getkin/kin-openapi v0.133.0/go.mod h1:boAciF6cXk5FhPqe/NQeBTeenbjqU4LhWBf09ILVvWE=
|
||||
github.com/go-jose/go-jose/v4 v4.0.1 h1:QVEPDE3OluqXBQZDcnNvQrInro2h0e4eqNbnZSWqS6U=
|
||||
github.com/go-jose/go-jose/v4 v4.0.1/go.mod h1:WVf9LFMHh/QVrmqrOfqun0C45tMe3RoiKJMPvgWwLfY=
|
||||
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
|
||||
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ=
|
||||
github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY=
|
||||
github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE=
|
||||
@@ -31,8 +44,8 @@ github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ
|
||||
github.com/go-test/deep v1.0.8 h1:TDsG77qcSprGbC6vTN8OuXp5g+J+b5Pcguhf7Zt61VM=
|
||||
github.com/go-test/deep v1.0.8/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
|
||||
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||
@@ -78,6 +91,8 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/
|
||||
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
|
||||
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
|
||||
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
|
||||
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
|
||||
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
|
||||
@@ -98,10 +113,22 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
|
||||
github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
|
||||
github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA=
|
||||
github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos=
|
||||
github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8=
|
||||
github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM=
|
||||
github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY=
|
||||
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
|
||||
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||
github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI=
|
||||
github.com/redis/go-redis/v9 v9.8.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
|
||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.16.0 h1:zAFQyFxJ3QDwpPUY/CKn22LI5+B8m/lUyffzq2+8ENs=
|
||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.16.0/go.mod h1:ouOc8ujB2wdUG6o0RrqaPl2tI6cenExC0KkJQ+PHXmw=
|
||||
github.com/redis/go-redis/extra/redisotel/v9 v9.16.0 h1:+a9h9qxFXdf3gX0FXnDcz7X44ZBFUPq58Gblq7aMU4s=
|
||||
github.com/redis/go-redis/extra/redisotel/v9 v9.16.0/go.mod h1:EtTTC7vnKWgznfG6kBgl9ySLqd7NckRCFUBzVXdeHeI=
|
||||
github.com/redis/go-redis/v9 v9.16.0 h1:OotgqgLSRCmzfqChbQyG1PHC3tLNR89DG4jdOERSEP4=
|
||||
github.com/redis/go-redis/v9 v9.16.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
|
||||
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
||||
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
||||
github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0=
|
||||
@@ -116,14 +143,30 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
|
||||
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
|
||||
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||
github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0=
|
||||
github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY=
|
||||
github.com/woodsbury/decimal128 v1.3.0 h1:8pffMNWIlC0O5vbyHWFZAt5yWvWcrHA+3ovIIjVWss0=
|
||||
github.com/woodsbury/decimal128 v1.3.0/go.mod h1:C5UTmyTjW3JftjUFzOVhC20BEQa2a4ZKOB5I6Zjb+ds=
|
||||
github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03 h1:m1h+vudopHsI67FPT9MOncyndWhTcdUoBtI1R1uajGY=
|
||||
github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03/go.mod h1:8sheVFH84v3PCyFY/O02mIgSQY9I6wMYPWsq7mDnEZY=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
|
||||
go.opentelemetry.io/contrib/instrumentation/runtime v0.63.0 h1:PeBoRj6af6xMI7qCupwFvTbbnd49V7n5YpG6pg8iDYQ=
|
||||
go.opentelemetry.io/contrib/instrumentation/runtime v0.63.0/go.mod h1:ingqBCtMCe8I4vpz/UVzCW6sxoqgZB37nao91mLQ3Bw=
|
||||
go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8=
|
||||
go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM=
|
||||
go.opentelemetry.io/otel/exporters/prometheus v0.46.0 h1:I8WIFXR351FoLJYuloU4EgXbtNX2URfU/85pUPheIEQ=
|
||||
go.opentelemetry.io/otel/exporters/prometheus v0.46.0/go.mod h1:ztwVUHe5DTR/1v7PeuGRnU5Bbd4QKYwApWmuutKsJSs=
|
||||
go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA=
|
||||
go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI=
|
||||
go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E=
|
||||
go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA=
|
||||
go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE=
|
||||
go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34=
|
||||
@@ -142,6 +185,8 @@ golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
|
||||
golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
|
||||
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
|
||||
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
|
||||
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
@@ -152,3 +197,4 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ plugins=(
|
||||
"registry"
|
||||
"dediregistry"
|
||||
"reqpreprocessor"
|
||||
"otelsetup"
|
||||
"reqmapper"
|
||||
"router"
|
||||
"schemavalidator"
|
||||
|
||||
15
pkg/plugin/definition/metrics.go
Normal file
15
pkg/plugin/definition/metrics.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package definition
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/telemetry"
|
||||
)
|
||||
|
||||
// OtelSetupMetricsProvider encapsulates initialization of OpenTelemetry metrics
|
||||
// providers. Implementations wire exporters and return a Provider that the core
|
||||
// application can manage.
|
||||
type OtelSetupMetricsProvider interface {
|
||||
// New initializes a new telemetry provider instance with the given configuration.
|
||||
New(ctx context.Context, config map[string]string) (*telemetry.Provider, func() error, error)
|
||||
}
|
||||
70
pkg/plugin/implementation/cache/cache.go
vendored
70
pkg/plugin/implementation/cache/cache.go
vendored
@@ -7,7 +7,12 @@ import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/log"
|
||||
"github.com/beckn-one/beckn-onix/pkg/telemetry"
|
||||
"github.com/redis/go-redis/extra/redisotel/v9"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
@@ -31,7 +36,8 @@ type Config struct {
|
||||
|
||||
// Cache wraps a Redis client to provide basic caching operations.
|
||||
type Cache struct {
|
||||
Client RedisClient
|
||||
Client RedisClient
|
||||
metrics *CacheMetrics
|
||||
}
|
||||
|
||||
// Error variables to describe common failure modes.
|
||||
@@ -77,26 +83,80 @@ func New(ctx context.Context, cfg *Config) (*Cache, func() error, error) {
|
||||
return nil, nil, fmt.Errorf("%w: %v", ErrConnectionFail, err)
|
||||
}
|
||||
|
||||
// Enable OpenTelemetry instrumentation for tracing and metrics
|
||||
// This will automatically collect Redis operation metrics and expose them via /metrics endpoint
|
||||
if redisClient, ok := client.(*redis.Client); ok {
|
||||
if err := redisotel.InstrumentTracing(redisClient); err != nil {
|
||||
// Log error but don't fail - instrumentation is optional
|
||||
log.Debugf(ctx, "Failed to instrument Redis tracing: %v", err)
|
||||
}
|
||||
|
||||
if err := redisotel.InstrumentMetrics(redisClient); err != nil {
|
||||
// Log error but don't fail - instrumentation is optional
|
||||
log.Debugf(ctx, "Failed to instrument Redis metrics: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
metrics, _ := GetCacheMetrics(ctx)
|
||||
|
||||
log.Infof(ctx, "Cache connection to Redis established successfully")
|
||||
return &Cache{Client: client}, client.Close, nil
|
||||
return &Cache{Client: client, metrics: metrics}, client.Close, nil
|
||||
}
|
||||
|
||||
// Get retrieves the value for the specified key from Redis.
|
||||
func (c *Cache) Get(ctx context.Context, key string) (string, error) {
|
||||
return c.Client.Get(ctx, key).Result()
|
||||
result, err := c.Client.Get(ctx, key).Result()
|
||||
if c.metrics != nil {
|
||||
attrs := []attribute.KeyValue{
|
||||
telemetry.AttrOperation.String("get"),
|
||||
}
|
||||
switch {
|
||||
case err == redis.Nil:
|
||||
c.metrics.CacheMissesTotal.Add(ctx, 1, metric.WithAttributes(attrs...))
|
||||
c.metrics.CacheOperationsTotal.Add(ctx, 1,
|
||||
metric.WithAttributes(append(attrs, telemetry.AttrStatus.String("miss"))...))
|
||||
case err != nil:
|
||||
c.metrics.CacheOperationsTotal.Add(ctx, 1,
|
||||
metric.WithAttributes(append(attrs, telemetry.AttrStatus.String("error"))...))
|
||||
default:
|
||||
c.metrics.CacheHitsTotal.Add(ctx, 1, metric.WithAttributes(attrs...))
|
||||
c.metrics.CacheOperationsTotal.Add(ctx, 1,
|
||||
metric.WithAttributes(append(attrs, telemetry.AttrStatus.String("hit"))...))
|
||||
}
|
||||
}
|
||||
return result, err
|
||||
}
|
||||
|
||||
// Set stores the given key-value pair in Redis with the specified TTL (time to live).
|
||||
func (c *Cache) Set(ctx context.Context, key, value string, ttl time.Duration) error {
|
||||
return c.Client.Set(ctx, key, value, ttl).Err()
|
||||
err := c.Client.Set(ctx, key, value, ttl).Err()
|
||||
c.recordOperation(ctx, "set", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete removes the specified key from Redis.
|
||||
func (c *Cache) Delete(ctx context.Context, key string) error {
|
||||
return c.Client.Del(ctx, key).Err()
|
||||
err := c.Client.Del(ctx, key).Err()
|
||||
c.recordOperation(ctx, "delete", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Clear removes all keys in the currently selected Redis database.
|
||||
func (c *Cache) Clear(ctx context.Context) error {
|
||||
return c.Client.FlushDB(ctx).Err()
|
||||
}
|
||||
|
||||
func (c *Cache) recordOperation(ctx context.Context, op string, err error) {
|
||||
if c.metrics == nil {
|
||||
return
|
||||
}
|
||||
status := "success"
|
||||
if err != nil {
|
||||
status = "error"
|
||||
}
|
||||
c.metrics.CacheOperationsTotal.Add(ctx, 1,
|
||||
metric.WithAttributes(
|
||||
telemetry.AttrOperation.String(op),
|
||||
telemetry.AttrStatus.String(status),
|
||||
))
|
||||
}
|
||||
|
||||
69
pkg/plugin/implementation/cache/cache_metrics.go
vendored
Normal file
69
pkg/plugin/implementation/cache/cache_metrics.go
vendored
Normal file
@@ -0,0 +1,69 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
// CacheMetrics exposes cache-related metric instruments.
|
||||
type CacheMetrics struct {
|
||||
CacheOperationsTotal metric.Int64Counter
|
||||
CacheHitsTotal metric.Int64Counter
|
||||
CacheMissesTotal metric.Int64Counter
|
||||
}
|
||||
|
||||
var (
|
||||
cacheMetricsInstance *CacheMetrics
|
||||
cacheMetricsOnce sync.Once
|
||||
cacheMetricsErr error
|
||||
)
|
||||
|
||||
// GetCacheMetrics lazily initializes cache metric instruments and returns a cached reference.
|
||||
func GetCacheMetrics(ctx context.Context) (*CacheMetrics, error) {
|
||||
cacheMetricsOnce.Do(func() {
|
||||
cacheMetricsInstance, cacheMetricsErr = newCacheMetrics()
|
||||
})
|
||||
return cacheMetricsInstance, cacheMetricsErr
|
||||
}
|
||||
|
||||
func newCacheMetrics() (*CacheMetrics, error) {
|
||||
meter := otel.GetMeterProvider().Meter(
|
||||
"github.com/beckn-one/beckn-onix/cache",
|
||||
metric.WithInstrumentationVersion("1.0.0"),
|
||||
)
|
||||
|
||||
m := &CacheMetrics{}
|
||||
var err error
|
||||
|
||||
if m.CacheOperationsTotal, err = meter.Int64Counter(
|
||||
"onix_cache_operations_total",
|
||||
metric.WithDescription("Redis cache operations"),
|
||||
metric.WithUnit("{operation}"),
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("onix_cache_operations_total: %w", err)
|
||||
}
|
||||
|
||||
if m.CacheHitsTotal, err = meter.Int64Counter(
|
||||
"onix_cache_hits_total",
|
||||
metric.WithDescription("Redis cache hits"),
|
||||
metric.WithUnit("{hit}"),
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("onix_cache_hits_total: %w", err)
|
||||
}
|
||||
|
||||
if m.CacheMissesTotal, err = meter.Int64Counter(
|
||||
"onix_cache_misses_total",
|
||||
metric.WithDescription("Redis cache misses"),
|
||||
metric.WithUnit("{miss}"),
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("onix_cache_misses_total: %w", err)
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
|
||||
79
pkg/plugin/implementation/otelsetup/cmd/plugin.go
Normal file
79
pkg/plugin/implementation/otelsetup/cmd/plugin.go
Normal file
@@ -0,0 +1,79 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/log"
|
||||
"github.com/beckn-one/beckn-onix/pkg/plugin/implementation/otelsetup"
|
||||
"github.com/beckn-one/beckn-onix/pkg/telemetry"
|
||||
)
|
||||
|
||||
// metricsProvider implements the OtelSetupMetricsProvider interface for the otelsetup plugin.
|
||||
type metricsProvider struct {
|
||||
impl otelsetup.Setup
|
||||
}
|
||||
|
||||
// New creates a new telemetry provider instance.
|
||||
func (m metricsProvider) New(ctx context.Context, config map[string]string) (*telemetry.Provider, func() error, error) {
|
||||
if ctx == nil {
|
||||
return nil, nil, errors.New("context cannot be nil")
|
||||
}
|
||||
|
||||
// Convert map[string]string to otelsetup.Config
|
||||
telemetryConfig := &otelsetup.Config{
|
||||
ServiceName: config["serviceName"],
|
||||
ServiceVersion: config["serviceVersion"],
|
||||
Environment: config["environment"],
|
||||
MetricsPort: config["metricsPort"],
|
||||
}
|
||||
|
||||
// Parse enableMetrics as boolean
|
||||
if enableMetricsStr, ok := config["enableMetrics"]; ok && enableMetricsStr != "" {
|
||||
enableMetrics, err := strconv.ParseBool(enableMetricsStr)
|
||||
if err != nil {
|
||||
log.Warnf(ctx, "Invalid enableMetrics value '%s', defaulting to true: %v", enableMetricsStr, err)
|
||||
telemetryConfig.EnableMetrics = true
|
||||
} else {
|
||||
telemetryConfig.EnableMetrics = enableMetrics
|
||||
}
|
||||
} else {
|
||||
telemetryConfig.EnableMetrics = true // Default to true if not specified or empty
|
||||
}
|
||||
|
||||
// Apply defaults if fields are empty
|
||||
if telemetryConfig.ServiceName == "" {
|
||||
telemetryConfig.ServiceName = otelsetup.DefaultConfig().ServiceName
|
||||
}
|
||||
if telemetryConfig.ServiceVersion == "" {
|
||||
telemetryConfig.ServiceVersion = otelsetup.DefaultConfig().ServiceVersion
|
||||
}
|
||||
if telemetryConfig.Environment == "" {
|
||||
telemetryConfig.Environment = otelsetup.DefaultConfig().Environment
|
||||
}
|
||||
|
||||
log.Debugf(ctx, "Telemetry config mapped: %+v", telemetryConfig)
|
||||
provider, err := m.impl.New(ctx, telemetryConfig)
|
||||
if err != nil {
|
||||
log.Errorf(ctx, err, "Failed to create telemetry provider instance")
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Wrap the Shutdown function to match the closer signature
|
||||
var closer func() error
|
||||
if provider != nil && provider.Shutdown != nil {
|
||||
closer = func() error {
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
return provider.Shutdown(shutdownCtx)
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof(ctx, "Telemetry provider instance created successfully")
|
||||
return provider, closer, nil
|
||||
}
|
||||
|
||||
// Provider is the exported plugin instance
|
||||
var Provider = metricsProvider{}
|
||||
296
pkg/plugin/implementation/otelsetup/cmd/plugin_test.go
Normal file
296
pkg/plugin/implementation/otelsetup/cmd/plugin_test.go
Normal file
@@ -0,0 +1,296 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/plugin/implementation/otelsetup"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMetricsProviderNew_Success(t *testing.T) {
|
||||
provider := metricsProvider{}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
ctx context.Context
|
||||
config map[string]string
|
||||
}{
|
||||
{
|
||||
name: "Valid config with all fields",
|
||||
ctx: context.Background(),
|
||||
config: map[string]string{
|
||||
"serviceName": "test-service",
|
||||
"serviceVersion": "1.0.0",
|
||||
"enableMetrics": "true",
|
||||
"environment": "test",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Valid config with minimal fields (uses defaults)",
|
||||
ctx: context.Background(),
|
||||
config: map[string]string{},
|
||||
},
|
||||
{
|
||||
name: "Valid config with enableMetrics false",
|
||||
ctx: context.Background(),
|
||||
config: map[string]string{
|
||||
"enableMetrics": "false",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Valid config with partial fields",
|
||||
ctx: context.Background(),
|
||||
config: map[string]string{
|
||||
"serviceName": "custom-service",
|
||||
"serviceVersion": "2.0.0",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
telemetryProvider, cleanup, err := provider.New(tt.ctx, tt.config)
|
||||
|
||||
require.NoError(t, err, "New() should not return error")
|
||||
require.NotNil(t, telemetryProvider, "New() should return non-nil provider")
|
||||
|
||||
// Metrics server is started inside provider when enabled; MetricsHandler is not exposed.
|
||||
if cleanup != nil {
|
||||
err := cleanup()
|
||||
assert.NoError(t, err, "cleanup() should not return error")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetricsProviderNew_Failure(t *testing.T) {
|
||||
provider := metricsProvider{}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
ctx context.Context
|
||||
config map[string]string
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "Nil context",
|
||||
ctx: nil,
|
||||
config: map[string]string{},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
telemetryProvider, cleanup, err := provider.New(tt.ctx, tt.config)
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err, "New() should return error for nil context")
|
||||
assert.Nil(t, telemetryProvider, "New() should return nil provider on error")
|
||||
assert.Nil(t, cleanup, "New() should return nil cleanup on error")
|
||||
} else {
|
||||
assert.NoError(t, err, "New() should not return error")
|
||||
assert.NotNil(t, telemetryProvider, "New() should return non-nil provider")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetricsProviderNew_ConfigConversion(t *testing.T) {
|
||||
provider := metricsProvider{}
|
||||
ctx := context.Background()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
config map[string]string
|
||||
expectedConfig *otelsetup.Config
|
||||
}{
|
||||
{
|
||||
name: "All fields provided",
|
||||
config: map[string]string{
|
||||
"serviceName": "my-service",
|
||||
"serviceVersion": "3.0.0",
|
||||
"enableMetrics": "true",
|
||||
"environment": "production",
|
||||
},
|
||||
expectedConfig: &otelsetup.Config{
|
||||
ServiceName: "my-service",
|
||||
ServiceVersion: "3.0.0",
|
||||
EnableMetrics: true,
|
||||
Environment: "production",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Empty config uses defaults",
|
||||
config: map[string]string{},
|
||||
expectedConfig: &otelsetup.Config{
|
||||
ServiceName: otelsetup.DefaultConfig().ServiceName,
|
||||
ServiceVersion: otelsetup.DefaultConfig().ServiceVersion,
|
||||
EnableMetrics: true, // Default when not specified
|
||||
Environment: otelsetup.DefaultConfig().Environment,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "EnableMetrics false",
|
||||
config: map[string]string{
|
||||
"enableMetrics": "false",
|
||||
},
|
||||
expectedConfig: &otelsetup.Config{
|
||||
ServiceName: otelsetup.DefaultConfig().ServiceName,
|
||||
ServiceVersion: otelsetup.DefaultConfig().ServiceVersion,
|
||||
EnableMetrics: false,
|
||||
Environment: otelsetup.DefaultConfig().Environment,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Invalid enableMetrics defaults to true",
|
||||
config: map[string]string{
|
||||
"enableMetrics": "invalid",
|
||||
},
|
||||
expectedConfig: &otelsetup.Config{
|
||||
ServiceName: otelsetup.DefaultConfig().ServiceName,
|
||||
ServiceVersion: otelsetup.DefaultConfig().ServiceVersion,
|
||||
EnableMetrics: true, // Defaults to true on parse error
|
||||
Environment: otelsetup.DefaultConfig().Environment,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
telemetryProvider, cleanup, err := provider.New(ctx, tt.config)
|
||||
|
||||
require.NoError(t, err, "New() should not return error")
|
||||
require.NotNil(t, telemetryProvider, "New() should return non-nil provider")
|
||||
|
||||
if cleanup != nil {
|
||||
err := cleanup()
|
||||
assert.NoError(t, err, "cleanup() should not return error")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetricsProviderNew_BooleanParsing(t *testing.T) {
|
||||
provider := metricsProvider{}
|
||||
ctx := context.Background()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
enableMetrics string
|
||||
expected bool
|
||||
}{
|
||||
{
|
||||
name: "True string",
|
||||
enableMetrics: "true",
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "False string",
|
||||
enableMetrics: "false",
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "True uppercase",
|
||||
enableMetrics: "TRUE",
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "False uppercase",
|
||||
enableMetrics: "FALSE",
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "Invalid value defaults to true",
|
||||
enableMetrics: "invalid",
|
||||
expected: true, // Defaults to true on parse error
|
||||
},
|
||||
{
|
||||
name: "Empty string defaults to true",
|
||||
enableMetrics: "",
|
||||
expected: true, // Defaults to true when not specified
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
config := map[string]string{
|
||||
"enableMetrics": tt.enableMetrics,
|
||||
}
|
||||
|
||||
telemetryProvider, cleanup, err := provider.New(ctx, config)
|
||||
|
||||
require.NoError(t, err, "New() should not return error")
|
||||
require.NotNil(t, telemetryProvider, "New() should return non-nil provider")
|
||||
|
||||
if cleanup != nil {
|
||||
err := cleanup()
|
||||
assert.NoError(t, err, "cleanup() should not return error")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetricsProviderNew_CleanupFunction(t *testing.T) {
|
||||
provider := metricsProvider{}
|
||||
ctx := context.Background()
|
||||
|
||||
config := map[string]string{
|
||||
"serviceName": "test-service",
|
||||
"serviceVersion": "1.0.0",
|
||||
"enableMetrics": "true",
|
||||
"environment": "test",
|
||||
}
|
||||
|
||||
telemetryProvider, cleanup, err := provider.New(ctx, config)
|
||||
|
||||
require.NoError(t, err, "New() should not return error")
|
||||
require.NotNil(t, telemetryProvider, "New() should return non-nil provider")
|
||||
require.NotNil(t, cleanup, "New() should return non-nil cleanup function")
|
||||
|
||||
// Test that cleanup can be called successfully
|
||||
err = cleanup()
|
||||
assert.NoError(t, err, "cleanup() should not return error")
|
||||
}
|
||||
|
||||
func TestProviderVariable(t *testing.T) {
|
||||
assert.NotNil(t, Provider, "Provider should not be nil")
|
||||
|
||||
// Verify Provider implements the interface correctly
|
||||
ctx := context.Background()
|
||||
config := map[string]string{
|
||||
"serviceName": "test",
|
||||
"serviceVersion": "1.0.0",
|
||||
"enableMetrics": "true",
|
||||
}
|
||||
|
||||
telemetryProvider, cleanup, err := Provider.New(ctx, config)
|
||||
|
||||
require.NoError(t, err, "Provider.New() should not return error")
|
||||
require.NotNil(t, telemetryProvider, "Provider.New() should return non-nil provider")
|
||||
|
||||
if cleanup != nil {
|
||||
err := cleanup()
|
||||
assert.NoError(t, err, "cleanup() should not return error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetricsProviderNew_DefaultValues(t *testing.T) {
|
||||
provider := metricsProvider{}
|
||||
ctx := context.Background()
|
||||
|
||||
// Test with completely empty config
|
||||
config := map[string]string{}
|
||||
|
||||
telemetryProvider, cleanup, err := provider.New(ctx, config)
|
||||
|
||||
require.NoError(t, err, "New() should not return error with empty config")
|
||||
require.NotNil(t, telemetryProvider, "New() should return non-nil provider")
|
||||
|
||||
if cleanup != nil {
|
||||
err := cleanup()
|
||||
assert.NoError(t, err, "cleanup() should not return error")
|
||||
}
|
||||
}
|
||||
169
pkg/plugin/implementation/otelsetup/otelsetup.go
Normal file
169
pkg/plugin/implementation/otelsetup/otelsetup.go
Normal file
@@ -0,0 +1,169 @@
|
||||
package otelsetup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
clientprom "github.com/prometheus/client_golang/prometheus"
|
||||
clientpromhttp "github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"go.opentelemetry.io/contrib/instrumentation/runtime"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
|
||||
"go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/log"
|
||||
"github.com/beckn-one/beckn-onix/pkg/plugin"
|
||||
"github.com/beckn-one/beckn-onix/pkg/telemetry"
|
||||
)
|
||||
|
||||
// Setup wires the telemetry provider. This is the concrete implementation
|
||||
// behind the OtelSetupMetricsProvider interface.
|
||||
type Setup struct{}
|
||||
|
||||
// Config represents OpenTelemetry related configuration.
|
||||
type Config struct {
|
||||
ServiceName string `yaml:"serviceName"`
|
||||
ServiceVersion string `yaml:"serviceVersion"`
|
||||
EnableMetrics bool `yaml:"enableMetrics"`
|
||||
Environment string `yaml:"environment"`
|
||||
MetricsPort string `yaml:"metricsPort"`
|
||||
}
|
||||
|
||||
// DefaultConfig returns sensible defaults for telemetry configuration.
|
||||
func DefaultConfig() *Config {
|
||||
return &Config{
|
||||
ServiceName: "beckn-onix",
|
||||
ServiceVersion: "dev",
|
||||
EnableMetrics: true,
|
||||
Environment: "development",
|
||||
MetricsPort: "9090",
|
||||
}
|
||||
}
|
||||
|
||||
// ToPluginConfig converts Config to plugin.Config format.
|
||||
func ToPluginConfig(cfg *Config) *plugin.Config {
|
||||
return &plugin.Config{
|
||||
ID: "otelsetup",
|
||||
Config: map[string]string{
|
||||
"serviceName": cfg.ServiceName,
|
||||
"serviceVersion": cfg.ServiceVersion,
|
||||
"enableMetrics": fmt.Sprintf("%t", cfg.EnableMetrics),
|
||||
"environment": cfg.Environment,
|
||||
"metricsPort": cfg.MetricsPort,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// New initializes the underlying telemetry provider. The returned provider
|
||||
// exposes the HTTP handler and shutdown hooks that the core application can
|
||||
// manage directly.
|
||||
func (Setup) New(ctx context.Context, cfg *Config) (*telemetry.Provider, error) {
|
||||
if cfg == nil {
|
||||
return nil, fmt.Errorf("telemetry config cannot be nil")
|
||||
}
|
||||
|
||||
// Apply defaults if fields are empty
|
||||
if cfg.ServiceName == "" {
|
||||
cfg.ServiceName = DefaultConfig().ServiceName
|
||||
}
|
||||
if cfg.ServiceVersion == "" {
|
||||
cfg.ServiceVersion = DefaultConfig().ServiceVersion
|
||||
}
|
||||
if cfg.Environment == "" {
|
||||
cfg.Environment = DefaultConfig().Environment
|
||||
}
|
||||
if cfg.MetricsPort == "" {
|
||||
cfg.MetricsPort = DefaultConfig().MetricsPort
|
||||
}
|
||||
|
||||
if !cfg.EnableMetrics {
|
||||
log.Info(ctx, "OpenTelemetry metrics disabled")
|
||||
return &telemetry.Provider{
|
||||
Shutdown: func(context.Context) error { return nil },
|
||||
}, nil
|
||||
}
|
||||
|
||||
res, err := resource.New(
|
||||
ctx,
|
||||
resource.WithAttributes(
|
||||
attribute.String("service.name", cfg.ServiceName),
|
||||
attribute.String("service.version", cfg.ServiceVersion),
|
||||
attribute.String("deployment.environment", cfg.Environment),
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create telemetry resource: %w", err)
|
||||
}
|
||||
|
||||
registry := clientprom.NewRegistry()
|
||||
|
||||
exporter, err := otelprom.New(
|
||||
otelprom.WithRegisterer(registry),
|
||||
otelprom.WithoutUnits(),
|
||||
otelprom.WithoutScopeInfo(),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create prometheus exporter: %w", err)
|
||||
}
|
||||
|
||||
meterProvider := metric.NewMeterProvider(
|
||||
metric.WithReader(exporter),
|
||||
metric.WithResource(res),
|
||||
)
|
||||
|
||||
otel.SetMeterProvider(meterProvider)
|
||||
log.Infof(ctx, "OpenTelemetry metrics initialized for service=%s version=%s env=%s",
|
||||
cfg.ServiceName, cfg.ServiceVersion, cfg.Environment)
|
||||
|
||||
if err := runtime.Start(runtime.WithMinimumReadMemStatsInterval(0)); err != nil {
|
||||
log.Warnf(ctx, "Failed to start Go runtime instrumentation: %v", err)
|
||||
}
|
||||
|
||||
// Create metrics handler
|
||||
metricsHandler := clientpromhttp.HandlerFor(registry, clientpromhttp.HandlerOpts{})
|
||||
|
||||
// Create and start metrics HTTP server
|
||||
metricsMux := http.NewServeMux()
|
||||
metricsMux.Handle("/metrics", metricsHandler)
|
||||
|
||||
metricsServer := &http.Server{
|
||||
Addr: net.JoinHostPort("", cfg.MetricsPort),
|
||||
Handler: metricsMux,
|
||||
ReadTimeout: 10 * time.Second,
|
||||
WriteTimeout: 10 * time.Second,
|
||||
IdleTimeout: 30 * time.Second,
|
||||
}
|
||||
|
||||
var serverWg sync.WaitGroup
|
||||
serverWg.Add(1)
|
||||
go func() {
|
||||
defer serverWg.Done()
|
||||
log.Infof(ctx, "Metrics server listening on %s", metricsServer.Addr)
|
||||
if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
log.Errorf(ctx, fmt.Errorf("metrics server ListenAndServe: %w", err), "error listening and serving metrics")
|
||||
}
|
||||
}()
|
||||
|
||||
return &telemetry.Provider{
|
||||
MeterProvider: meterProvider,
|
||||
MetricsHandler: metricsHandler,
|
||||
Shutdown: func(shutdownCtx context.Context) error {
|
||||
log.Infof(ctx, "Shutting down metrics server...")
|
||||
// Shutdown the metrics server
|
||||
serverShutdownCtx, cancel := context.WithTimeout(shutdownCtx, 10*time.Second)
|
||||
defer cancel()
|
||||
if err := metricsServer.Shutdown(serverShutdownCtx); err != nil {
|
||||
log.Errorf(ctx, fmt.Errorf("metrics server shutdown: %w", err), "error shutting down metrics server")
|
||||
}
|
||||
serverWg.Wait()
|
||||
// Shutdown the meter provider
|
||||
return meterProvider.Shutdown(shutdownCtx)
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
259
pkg/plugin/implementation/otelsetup/otelsetup_test.go
Normal file
259
pkg/plugin/implementation/otelsetup/otelsetup_test.go
Normal file
@@ -0,0 +1,259 @@
|
||||
package otelsetup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSetup_New_Success(t *testing.T) {
|
||||
setup := Setup{}
|
||||
ctx := context.Background()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
cfg *Config
|
||||
}{
|
||||
{
|
||||
name: "Valid config with all fields",
|
||||
cfg: &Config{
|
||||
ServiceName: "test-service",
|
||||
ServiceVersion: "1.0.0",
|
||||
EnableMetrics: true,
|
||||
Environment: "test",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Valid config with metrics disabled",
|
||||
cfg: &Config{
|
||||
ServiceName: "test-service",
|
||||
ServiceVersion: "1.0.0",
|
||||
EnableMetrics: false,
|
||||
Environment: "test",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Config with empty fields uses defaults",
|
||||
cfg: &Config{
|
||||
ServiceName: "",
|
||||
ServiceVersion: "",
|
||||
EnableMetrics: true,
|
||||
Environment: "",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
provider, err := setup.New(ctx, tt.cfg)
|
||||
|
||||
require.NoError(t, err, "New() should not return error")
|
||||
require.NotNil(t, provider, "New() should return non-nil provider")
|
||||
require.NotNil(t, provider.Shutdown, "Provider should have shutdown function")
|
||||
|
||||
if tt.cfg.EnableMetrics {
|
||||
assert.NotNil(t, provider.MeterProvider, "MeterProvider should be set when metrics enabled")
|
||||
}
|
||||
|
||||
// Test shutdown
|
||||
err = provider.Shutdown(ctx)
|
||||
assert.NoError(t, err, "Shutdown should not return error")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetup_New_Failure(t *testing.T) {
|
||||
setup := Setup{}
|
||||
ctx := context.Background()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
cfg *Config
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "Nil config",
|
||||
cfg: nil,
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
provider, err := setup.New(ctx, tt.cfg)
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err, "New() should return error")
|
||||
assert.Nil(t, provider, "New() should return nil provider on error")
|
||||
} else {
|
||||
assert.NoError(t, err, "New() should not return error")
|
||||
assert.NotNil(t, provider, "New() should return non-nil provider")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetup_New_DefaultValues(t *testing.T) {
|
||||
setup := Setup{}
|
||||
ctx := context.Background()
|
||||
|
||||
// Test with empty fields - should use defaults
|
||||
cfg := &Config{
|
||||
ServiceName: "",
|
||||
ServiceVersion: "",
|
||||
EnableMetrics: true,
|
||||
Environment: "",
|
||||
}
|
||||
|
||||
provider, err := setup.New(ctx, cfg)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, provider)
|
||||
|
||||
// Verify defaults are applied by checking that provider is functional
|
||||
assert.NotNil(t, provider.MeterProvider, "MeterProvider should be set with defaults")
|
||||
|
||||
// Cleanup
|
||||
err = provider.Shutdown(ctx)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestSetup_New_MetricsDisabled(t *testing.T) {
|
||||
setup := Setup{}
|
||||
ctx := context.Background()
|
||||
|
||||
cfg := &Config{
|
||||
ServiceName: "test-service",
|
||||
ServiceVersion: "1.0.0",
|
||||
EnableMetrics: false,
|
||||
Environment: "test",
|
||||
}
|
||||
|
||||
provider, err := setup.New(ctx, cfg)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, provider)
|
||||
|
||||
// When metrics are disabled, MetricsHandler should be nil and MeterProvider should be nil
|
||||
assert.Nil(t, provider.MeterProvider, "MeterProvider should be nil when metrics disabled")
|
||||
|
||||
// Shutdown should still work
|
||||
err = provider.Shutdown(ctx)
|
||||
assert.NoError(t, err, "Shutdown should work even when metrics disabled")
|
||||
}
|
||||
|
||||
func TestToPluginConfig_Success(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
cfg *Config
|
||||
expectedID string
|
||||
expectedConfig map[string]string
|
||||
}{
|
||||
{
|
||||
name: "Valid config with all fields",
|
||||
cfg: &Config{
|
||||
ServiceName: "test-service",
|
||||
ServiceVersion: "1.0.0",
|
||||
EnableMetrics: true,
|
||||
Environment: "test",
|
||||
},
|
||||
expectedID: "otelsetup",
|
||||
expectedConfig: map[string]string{
|
||||
"serviceName": "test-service",
|
||||
"serviceVersion": "1.0.0",
|
||||
"enableMetrics": "true",
|
||||
"environment": "test",
|
||||
"metricsPort": "",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Config with enableMetrics false",
|
||||
cfg: &Config{
|
||||
ServiceName: "my-service",
|
||||
ServiceVersion: "2.0.0",
|
||||
EnableMetrics: false,
|
||||
Environment: "production",
|
||||
},
|
||||
expectedID: "otelsetup",
|
||||
expectedConfig: map[string]string{
|
||||
"serviceName": "my-service",
|
||||
"serviceVersion": "2.0.0",
|
||||
"enableMetrics": "false",
|
||||
"environment": "production",
|
||||
"metricsPort": "",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Config with empty fields",
|
||||
cfg: &Config{
|
||||
ServiceName: "",
|
||||
ServiceVersion: "",
|
||||
EnableMetrics: true,
|
||||
Environment: "",
|
||||
},
|
||||
expectedID: "otelsetup",
|
||||
expectedConfig: map[string]string{
|
||||
"serviceName": "",
|
||||
"serviceVersion": "",
|
||||
"enableMetrics": "true",
|
||||
"environment": "",
|
||||
"metricsPort": "",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result := ToPluginConfig(tt.cfg)
|
||||
|
||||
require.NotNil(t, result, "ToPluginConfig should return non-nil config")
|
||||
assert.Equal(t, tt.expectedID, result.ID, "Plugin ID should be 'otelsetup'")
|
||||
assert.Equal(t, tt.expectedConfig, result.Config, "Config map should match expected values")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestToPluginConfig_NilConfig(t *testing.T) {
|
||||
// Test that ToPluginConfig handles nil config
|
||||
// Note: This will panic if nil is passed, which is acceptable behavior
|
||||
// as the function expects a valid config. In practice, callers should check for nil.
|
||||
assert.Panics(t, func() {
|
||||
ToPluginConfig(nil)
|
||||
}, "ToPluginConfig should panic when given nil config")
|
||||
}
|
||||
|
||||
func TestToPluginConfig_BooleanConversion(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
enableMetrics bool
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "EnableMetrics true",
|
||||
enableMetrics: true,
|
||||
expected: "true",
|
||||
},
|
||||
{
|
||||
name: "EnableMetrics false",
|
||||
enableMetrics: false,
|
||||
expected: "false",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cfg := &Config{
|
||||
ServiceName: "test",
|
||||
ServiceVersion: "1.0.0",
|
||||
EnableMetrics: tt.enableMetrics,
|
||||
Environment: "test",
|
||||
MetricsPort: "",
|
||||
}
|
||||
|
||||
result := ToPluginConfig(cfg)
|
||||
require.NotNil(t, result)
|
||||
assert.Equal(t, tt.expected, result.Config["enableMetrics"], "enableMetrics should be converted to string correctly")
|
||||
assert.Equal(t, "", result.Config["metricsPort"], "metricsPort should be included even when empty")
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -683,7 +683,6 @@ func TestExcludeActionWithNonURLTargetTypes(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// TestV2RouteSuccess tests v2 routing with domain-agnostic behavior
|
||||
func TestV2RouteSuccess(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/log"
|
||||
"github.com/beckn-one/beckn-onix/pkg/plugin/definition"
|
||||
"github.com/beckn-one/beckn-onix/pkg/telemetry"
|
||||
)
|
||||
|
||||
type onixPlugin interface {
|
||||
@@ -196,6 +197,33 @@ func (m *Manager) Middleware(ctx context.Context, cfg *Config) (func(http.Handle
|
||||
return mwp.New(ctx, cfg.Config)
|
||||
}
|
||||
|
||||
// OtelSetup initializes OpenTelemetry via a dedicated plugin. The plugin is
|
||||
// expected to return a telemetry Provider that the core application can use for
|
||||
// instrumentation.
|
||||
func (m *Manager) OtelSetup(ctx context.Context, cfg *Config) (*telemetry.Provider, error) {
|
||||
if cfg == nil {
|
||||
log.Info(ctx, "Telemetry config not provided; skipping OpenTelemetry setup")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
otp, err := provider[definition.OtelSetupMetricsProvider](m.plugins, cfg.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err)
|
||||
}
|
||||
provider, closer, err := otp.New(ctx, cfg.Config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if closer != nil {
|
||||
m.closers = append(m.closers, func() {
|
||||
if err := closer(); err != nil {
|
||||
log.Errorf(context.Background(), err, "Failed to shutdown telemetry provider")
|
||||
}
|
||||
})
|
||||
}
|
||||
return provider, nil
|
||||
}
|
||||
|
||||
// TransportWrapper returns a TransportWrapper instance based on the provided configuration.
|
||||
func (m *Manager) TransportWrapper(ctx context.Context, cfg *Config) (definition.TransportWrapper, error) {
|
||||
twp, err := provider[definition.TransportWrapperProvider](m.plugins, cfg.ID)
|
||||
|
||||
28
pkg/telemetry/metrics_test.go
Normal file
28
pkg/telemetry/metrics_test.go
Normal file
@@ -0,0 +1,28 @@
|
||||
package telemetry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNewProviderAndMetrics(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
provider, err := NewTestProvider(ctx)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, provider)
|
||||
require.NotNil(t, provider.MetricsHandler)
|
||||
|
||||
metrics, err := GetMetrics(ctx)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, metrics)
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
req := httptest.NewRequest("GET", "/metrics", nil)
|
||||
provider.MetricsHandler.ServeHTTP(rec, req)
|
||||
require.Equal(t, 200, rec.Code)
|
||||
|
||||
require.NoError(t, provider.Shutdown(context.Background()))
|
||||
}
|
||||
86
pkg/telemetry/pluginMetrics.go
Normal file
86
pkg/telemetry/pluginMetrics.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package telemetry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
// Metrics exposes strongly typed metric instruments used across the adapter.
|
||||
// Note: Most metrics have been moved to their respective modules. Only plugin-level
|
||||
// metrics remain here. See:
|
||||
// - OTel setup: pkg/plugin/implementation/otelsetup
|
||||
// - Step metrics: core/module/handler/step_metrics.go
|
||||
// - Cache metrics: pkg/plugin/implementation/cache/cache_metrics.go
|
||||
// - Handler metrics: core/module/handler/handlerMetrics.go
|
||||
type Metrics struct {
|
||||
PluginExecutionDuration metric.Float64Histogram
|
||||
PluginErrorsTotal metric.Int64Counter
|
||||
}
|
||||
|
||||
var (
|
||||
metricsInstance *Metrics
|
||||
metricsOnce sync.Once
|
||||
metricsErr error
|
||||
)
|
||||
|
||||
// Attribute keys shared across instruments.
|
||||
var (
|
||||
AttrModule = attribute.Key("module")
|
||||
AttrSubsystem = attribute.Key("subsystem")
|
||||
AttrName = attribute.Key("name")
|
||||
AttrStep = attribute.Key("step")
|
||||
AttrRole = attribute.Key("role")
|
||||
AttrAction = attribute.Key("action")
|
||||
AttrHTTPMethod = attribute.Key("http_method")
|
||||
AttrHTTPStatus = attribute.Key("http_status_code")
|
||||
AttrStatus = attribute.Key("status")
|
||||
AttrErrorType = attribute.Key("error_type")
|
||||
AttrPluginID = attribute.Key("plugin_id")
|
||||
AttrPluginType = attribute.Key("plugin_type")
|
||||
AttrOperation = attribute.Key("operation")
|
||||
AttrRouteType = attribute.Key("route_type")
|
||||
AttrTargetType = attribute.Key("target_type")
|
||||
AttrSchemaVersion = attribute.Key("schema_version")
|
||||
)
|
||||
|
||||
// GetMetrics lazily initializes instruments and returns a cached reference.
|
||||
func GetMetrics(ctx context.Context) (*Metrics, error) {
|
||||
metricsOnce.Do(func() {
|
||||
metricsInstance, metricsErr = newMetrics()
|
||||
})
|
||||
return metricsInstance, metricsErr
|
||||
}
|
||||
|
||||
func newMetrics() (*Metrics, error) {
|
||||
meter := otel.GetMeterProvider().Meter(
|
||||
"github.com/beckn-one/beckn-onix/telemetry",
|
||||
metric.WithInstrumentationVersion("1.0.0"),
|
||||
)
|
||||
|
||||
m := &Metrics{}
|
||||
var err error
|
||||
|
||||
if m.PluginExecutionDuration, err = meter.Float64Histogram(
|
||||
"onix_plugin_execution_duration_seconds",
|
||||
metric.WithDescription("Plugin execution time"),
|
||||
metric.WithUnit("s"),
|
||||
metric.WithExplicitBucketBoundaries(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1),
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("onix_plugin_execution_duration_seconds: %w", err)
|
||||
}
|
||||
|
||||
if m.PluginErrorsTotal, err = meter.Int64Counter(
|
||||
"onix_plugin_errors_total",
|
||||
metric.WithDescription("Plugin level errors"),
|
||||
metric.WithUnit("{error}"),
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("onix_plugin_errors_total: %w", err)
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
15
pkg/telemetry/telemetry.go
Normal file
15
pkg/telemetry/telemetry.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package telemetry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"go.opentelemetry.io/otel/sdk/metric"
|
||||
)
|
||||
|
||||
// Provider holds references to telemetry components that need coordinated shutdown.
|
||||
type Provider struct {
|
||||
MeterProvider *metric.MeterProvider
|
||||
MetricsHandler http.Handler
|
||||
Shutdown func(context.Context) error
|
||||
}
|
||||
54
pkg/telemetry/test_helper.go
Normal file
54
pkg/telemetry/test_helper.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package telemetry
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
clientprom "github.com/prometheus/client_golang/prometheus"
|
||||
clientpromhttp "github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
|
||||
"go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
// NewTestProvider creates a minimal telemetry provider for testing purposes.
|
||||
// This avoids import cycles by not depending on the otelsetup package.
|
||||
func NewTestProvider(ctx context.Context) (*Provider, error) {
|
||||
res, err := resource.New(
|
||||
ctx,
|
||||
resource.WithAttributes(
|
||||
attribute.String("service.name", "test-service"),
|
||||
attribute.String("service.version", "test"),
|
||||
attribute.String("deployment.environment", "test"),
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
registry := clientprom.NewRegistry()
|
||||
exporter, err := otelprom.New(
|
||||
otelprom.WithRegisterer(registry),
|
||||
otelprom.WithoutUnits(),
|
||||
otelprom.WithoutScopeInfo(),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
meterProvider := metric.NewMeterProvider(
|
||||
metric.WithReader(exporter),
|
||||
metric.WithResource(res),
|
||||
)
|
||||
|
||||
otel.SetMeterProvider(meterProvider)
|
||||
|
||||
return &Provider{
|
||||
MeterProvider: meterProvider,
|
||||
MetricsHandler: clientpromhttp.HandlerFor(registry, clientpromhttp.HandlerOpts{}),
|
||||
Shutdown: func(ctx context.Context) error {
|
||||
return meterProvider.Shutdown(ctx)
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
Reference in New Issue
Block a user