diff --git a/CONFIG.md b/CONFIG.md index 57fa06a..77eec33 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -296,10 +296,10 @@ Each metric includes consistent labels such as `module`, `role`, `action`, `stat **Note**: Metric definitions are now located in their respective modules: - OTel setup: `pkg/plugin/implementation/otelsetup` -- Step metrics: `pkg/telemetry/step_metrics.go` -- Handler metrics: `core/module/handler/metrics.go` -- Cache metrics: `pkg/plugin/implementation/cache/metrics.go` -- Plugin metrics: `pkg/telemetry/metrics.go` +- Step metrics: `core/module/handler/step_metrics.go` +- Handler metrics: `core/module/handler/handlerMetrics.go` +- Cache metrics: `pkg/plugin/implementation/cache/cache_metrics.go` +- Plugin metrics: `pkg/telemetry/pluginMetrics.go` --- diff --git a/cmd/adapter/main.go b/cmd/adapter/main.go index 9f477fa..2d1d8c7 100644 --- a/cmd/adapter/main.go +++ b/cmd/adapter/main.go @@ -17,6 +17,7 @@ import ( "github.com/beckn-one/beckn-onix/core/module/handler" "github.com/beckn-one/beckn-onix/pkg/log" "github.com/beckn-one/beckn-onix/pkg/plugin" + "github.com/beckn-one/beckn-onix/pkg/telemetry" ) // ApplicationPlugins holds application-level plugin configurations. @@ -97,18 +98,31 @@ func validateConfig(cfg *Config) error { return nil } -// initPlugins initializes application-level plugins including telemetry. -func initPlugins(ctx context.Context, mgr *plugin.Manager, otelSetupCfg *plugin.Config) error { - if otelSetupCfg == nil { - log.Info(ctx, "Telemetry config not provided; skipping OpenTelemetry setup") +// loadAppPlugin is a generic function to load and validate application-level plugins. +func loadAppPlugin[T any](ctx context.Context, name string, cfg *plugin.Config, mgrFunc func(context.Context, *plugin.Config) (T, error)) error { + if cfg == nil { + log.Debugf(ctx, "Skipping %s plugin: not configured", name) return nil } - log.Infof(ctx, "Initializing telemetry via plugin id=%s", otelSetupCfg.ID) - _, err := mgr.OtelSetup(ctx, otelSetupCfg) + _, err := mgrFunc(ctx, cfg) if err != nil { - return fmt.Errorf("failed to initialize telemetry plugin: %w", err) + return fmt.Errorf("failed to load %s plugin (%s): %w", name, cfg.ID, err) } + + log.Debugf(ctx, "Loaded %s plugin: %s", name, cfg.ID) + return nil +} + +// initAppPlugins initializes application-level plugins including telemetry. +// This function is designed to be extensible for future plugin types. +func initAppPlugins(ctx context.Context, mgr *plugin.Manager, cfg ApplicationPlugins) error { + if err := loadAppPlugin(ctx, "OtelSetup", cfg.OtelSetup, func(ctx context.Context, cfg *plugin.Config) (*telemetry.Provider, error) { + return mgr.OtelSetup(ctx, cfg) + }); err != nil { + return fmt.Errorf("failed to initialize application plugins: %w", err) + } + return nil } @@ -148,7 +162,7 @@ func run(ctx context.Context, configPath string) error { log.Debug(ctx, "Plugin manager loaded.") // Initialize plugins including telemetry. - if err := initPlugins(ctx, mgr, cfg.Plugins.OtelSetup); err != nil { + if err := initAppPlugins(ctx, mgr, cfg.Plugins); err != nil { return fmt.Errorf("failed to initialize plugins: %w", err) } diff --git a/core/module/handler/metrics.go b/core/module/handler/handlerMetrics.go similarity index 100% rename from core/module/handler/metrics.go rename to core/module/handler/handlerMetrics.go diff --git a/core/module/handler/stdHandler.go b/core/module/handler/stdHandler.go index 4df22e8..9e9fefc 100644 --- a/core/module/handler/stdHandler.go +++ b/core/module/handler/stdHandler.go @@ -13,7 +13,6 @@ import ( "github.com/beckn-one/beckn-onix/pkg/plugin" "github.com/beckn-one/beckn-onix/pkg/plugin/definition" "github.com/beckn-one/beckn-onix/pkg/response" - "github.com/beckn-one/beckn-onix/pkg/telemetry" ) // stdHandler orchestrates the execution of defined processing steps. @@ -311,7 +310,7 @@ func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Conf if err != nil { return err } - instrumentedStep, wrapErr := telemetry.NewInstrumentedStep(s, step, h.moduleName) + instrumentedStep, wrapErr := NewInstrumentedStep(s, step, h.moduleName) if wrapErr != nil { log.Warnf(ctx, "Failed to instrument step %s: %v", step, wrapErr) h.steps = append(h.steps, s) diff --git a/pkg/telemetry/step_instrumentor.go b/core/module/handler/step_instrumentor.go similarity index 87% rename from pkg/telemetry/step_instrumentor.go rename to core/module/handler/step_instrumentor.go index b72643c..0869304 100644 --- a/pkg/telemetry/step_instrumentor.go +++ b/core/module/handler/step_instrumentor.go @@ -1,4 +1,4 @@ -package telemetry +package handler import ( "context" @@ -11,6 +11,7 @@ import ( "github.com/beckn-one/beckn-onix/pkg/log" "github.com/beckn-one/beckn-onix/pkg/model" + "github.com/beckn-one/beckn-onix/pkg/telemetry" ) // StepRunner represents the minimal contract required for step instrumentation. @@ -56,9 +57,9 @@ func (is *InstrumentedStep) Run(ctx *model.StepContext) error { duration := time.Since(start).Seconds() attrs := []attribute.KeyValue{ - AttrModule.String(is.moduleName), - AttrStep.String(is.stepName), - AttrRole.String(string(ctx.Role)), + telemetry.AttrModule.String(is.moduleName), + telemetry.AttrStep.String(is.stepName), + telemetry.AttrRole.String(string(ctx.Role)), } is.metrics.StepExecutionTotal.Add(ctx.Context, 1, metric.WithAttributes(attrs...)) @@ -73,10 +74,11 @@ func (is *InstrumentedStep) Run(ctx *model.StepContext) error { } } - errorAttrs := append(attrs, AttrErrorType.String(errorType)) + errorAttrs := append(attrs, telemetry.AttrErrorType.String(errorType)) is.metrics.StepErrorsTotal.Add(ctx.Context, 1, metric.WithAttributes(errorAttrs...)) log.Errorf(ctx.Context, err, "Step %s failed", is.stepName) } return err } + diff --git a/pkg/telemetry/step_instrumentor_test.go b/core/module/handler/step_instrumentor_test.go similarity index 86% rename from pkg/telemetry/step_instrumentor_test.go rename to core/module/handler/step_instrumentor_test.go index 02583ec..281aa7e 100644 --- a/pkg/telemetry/step_instrumentor_test.go +++ b/core/module/handler/step_instrumentor_test.go @@ -1,4 +1,4 @@ -package telemetry +package handler import ( "context" @@ -6,6 +6,7 @@ import ( "testing" "github.com/beckn-one/beckn-onix/pkg/model" + "github.com/beckn-one/beckn-onix/pkg/telemetry" "github.com/stretchr/testify/require" ) @@ -19,7 +20,7 @@ func (s stubStep) Run(ctx *model.StepContext) error { func TestInstrumentedStepSuccess(t *testing.T) { ctx := context.Background() - provider, err := NewTestProvider(ctx) + provider, err := telemetry.NewTestProvider(ctx) require.NoError(t, err) defer provider.Shutdown(context.Background()) @@ -35,7 +36,7 @@ func TestInstrumentedStepSuccess(t *testing.T) { func TestInstrumentedStepError(t *testing.T) { ctx := context.Background() - provider, err := NewTestProvider(ctx) + provider, err := telemetry.NewTestProvider(ctx) require.NoError(t, err) defer provider.Shutdown(context.Background()) @@ -48,3 +49,4 @@ func TestInstrumentedStepError(t *testing.T) { } require.Error(t, step.Run(stepCtx)) } + diff --git a/pkg/telemetry/step_metrics.go b/core/module/handler/step_metrics.go similarity index 99% rename from pkg/telemetry/step_metrics.go rename to core/module/handler/step_metrics.go index 8566d4f..e3fc418 100644 --- a/pkg/telemetry/step_metrics.go +++ b/core/module/handler/step_metrics.go @@ -1,4 +1,4 @@ -package telemetry +package handler import ( "context" diff --git a/pkg/telemetry/step_metrics_test.go b/core/module/handler/step_metrics_test.go similarity index 87% rename from pkg/telemetry/step_metrics_test.go rename to core/module/handler/step_metrics_test.go index 5345501..777821b 100644 --- a/pkg/telemetry/step_metrics_test.go +++ b/core/module/handler/step_metrics_test.go @@ -1,4 +1,4 @@ -package telemetry +package handler import ( "context" @@ -7,6 +7,7 @@ import ( "go.opentelemetry.io/otel/metric" + "github.com/beckn-one/beckn-onix/pkg/telemetry" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -15,7 +16,7 @@ func TestGetStepMetrics_Success(t *testing.T) { ctx := context.Background() // Initialize telemetry provider first - provider, err := NewTestProvider(ctx) + provider, err := telemetry.NewTestProvider(ctx) require.NoError(t, err) defer provider.Shutdown(context.Background()) @@ -34,7 +35,7 @@ func TestGetStepMetrics_ConcurrentAccess(t *testing.T) { ctx := context.Background() // Initialize telemetry provider first - provider, err := NewTestProvider(ctx) + provider, err := telemetry.NewTestProvider(ctx) require.NoError(t, err) defer provider.Shutdown(context.Background()) @@ -71,7 +72,7 @@ func TestStepMetrics_Instruments(t *testing.T) { ctx := context.Background() // Initialize telemetry provider - provider, err := NewTestProvider(ctx) + provider, err := telemetry.NewTestProvider(ctx) require.NoError(t, err) defer provider.Shutdown(context.Background()) @@ -87,19 +88,19 @@ func TestStepMetrics_Instruments(t *testing.T) { // Test StepExecutionDuration require.NotPanics(t, func() { metrics.StepExecutionDuration.Record(ctx, 0.5, - metric.WithAttributes(AttrStep.String("test-step"), AttrModule.String("test-module"))) + metric.WithAttributes(telemetry.AttrStep.String("test-step"), telemetry.AttrModule.String("test-module"))) }, "StepExecutionDuration.Record should not panic") // Test StepExecutionTotal require.NotPanics(t, func() { metrics.StepExecutionTotal.Add(ctx, 1, - metric.WithAttributes(AttrStep.String("test-step"), AttrModule.String("test-module"))) + metric.WithAttributes(telemetry.AttrStep.String("test-step"), telemetry.AttrModule.String("test-module"))) }, "StepExecutionTotal.Add should not panic") // Test StepErrorsTotal require.NotPanics(t, func() { metrics.StepErrorsTotal.Add(ctx, 1, - metric.WithAttributes(AttrStep.String("test-step"), AttrModule.String("test-module"))) + metric.WithAttributes(telemetry.AttrStep.String("test-step"), telemetry.AttrModule.String("test-module"))) }, "StepErrorsTotal.Add should not panic") // Verify metrics are exposed via HTTP handler @@ -113,7 +114,7 @@ func TestStepMetrics_MultipleCalls(t *testing.T) { ctx := context.Background() // Initialize telemetry provider - provider, err := NewTestProvider(ctx) + provider, err := telemetry.NewTestProvider(ctx) require.NoError(t, err) defer provider.Shutdown(context.Background()) @@ -127,3 +128,4 @@ func TestStepMetrics_MultipleCalls(t *testing.T) { assert.NotNil(t, metrics.StepErrorsTotal, "StepErrorsTotal should be initialized") } } + diff --git a/pkg/plugin/definition/metrics.go b/pkg/plugin/definition/metrics.go index b4f09ec..1850bb6 100644 --- a/pkg/plugin/definition/metrics.go +++ b/pkg/plugin/definition/metrics.go @@ -6,10 +6,10 @@ import ( "github.com/beckn-one/beckn-onix/pkg/telemetry" ) -// MetricsProvider encapsulates initialization of OpenTelemetry metrics +// OtelSetupMetricsProvider encapsulates initialization of OpenTelemetry metrics // providers. Implementations wire exporters and return a Provider that the core // application can manage. -type MetricsProvider interface { +type OtelSetupMetricsProvider interface { // New initializes a new telemetry provider instance with the given configuration. New(ctx context.Context, config map[string]string) (*telemetry.Provider, func() error, error) } diff --git a/pkg/plugin/implementation/cache/metrics.go b/pkg/plugin/implementation/cache/cache_metrics.go similarity index 100% rename from pkg/plugin/implementation/cache/metrics.go rename to pkg/plugin/implementation/cache/cache_metrics.go diff --git a/pkg/plugin/implementation/otelsetup/cmd/plugin.go b/pkg/plugin/implementation/otelsetup/cmd/plugin.go index 232d290..260231e 100644 --- a/pkg/plugin/implementation/otelsetup/cmd/plugin.go +++ b/pkg/plugin/implementation/otelsetup/cmd/plugin.go @@ -11,7 +11,7 @@ import ( "github.com/beckn-one/beckn-onix/pkg/telemetry" ) -// metricsProvider implements the MetricsProvider interface for the otelsetup plugin. +// metricsProvider implements the OtelSetupMetricsProvider interface for the otelsetup plugin. type metricsProvider struct { impl otelsetup.Setup } diff --git a/pkg/plugin/implementation/otelsetup/otelsetup.go b/pkg/plugin/implementation/otelsetup/otelsetup.go index 5af7d59..4b52d78 100644 --- a/pkg/plugin/implementation/otelsetup/otelsetup.go +++ b/pkg/plugin/implementation/otelsetup/otelsetup.go @@ -23,7 +23,7 @@ import ( ) // Setup wires the telemetry provider. This is the concrete implementation -// behind the MetricsProvider interface. +// behind the OtelSetupMetricsProvider interface. type Setup struct{} // Config represents OpenTelemetry related configuration. @@ -151,7 +151,8 @@ func (Setup) New(ctx context.Context, cfg *Config) (*telemetry.Provider, error) }() return &telemetry.Provider{ - MeterProvider: meterProvider, + MeterProvider: meterProvider, + MetricsHandler: metricsHandler, Shutdown: func(shutdownCtx context.Context) error { log.Infof(ctx, "Shutting down metrics server...") // Shutdown the metrics server diff --git a/pkg/plugin/manager.go b/pkg/plugin/manager.go index 4dd97dd..ebc4316 100644 --- a/pkg/plugin/manager.go +++ b/pkg/plugin/manager.go @@ -206,7 +206,7 @@ func (m *Manager) OtelSetup(ctx context.Context, cfg *Config) (*telemetry.Provid return nil, nil } - otp, err := provider[definition.MetricsProvider](m.plugins, cfg.ID) + otp, err := provider[definition.OtelSetupMetricsProvider](m.plugins, cfg.ID) if err != nil { return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err) } diff --git a/pkg/telemetry/metrics.go b/pkg/telemetry/pluginMetrics.go similarity index 93% rename from pkg/telemetry/metrics.go rename to pkg/telemetry/pluginMetrics.go index 6e4e920..c6d83ce 100644 --- a/pkg/telemetry/metrics.go +++ b/pkg/telemetry/pluginMetrics.go @@ -14,9 +14,9 @@ import ( // Note: Most metrics have been moved to their respective modules. Only plugin-level // metrics remain here. See: // - OTel setup: pkg/plugin/implementation/otelsetup -// - Step metrics: pkg/telemetry/step_metrics.go -// - Cache metrics: pkg/plugin/implementation/cache/metrics.go -// - Handler metrics: core/module/handler/metrics.go +// - Step metrics: core/module/handler/step_metrics.go +// - Cache metrics: pkg/plugin/implementation/cache/cache_metrics.go +// - Handler metrics: core/module/handler/handlerMetrics.go type Metrics struct { PluginExecutionDuration metric.Float64Histogram PluginErrorsTotal metric.Int64Counter