update as per the PR comment

This commit is contained in:
Manendra Pal Singh
2025-12-02 21:14:02 +05:30
parent 4f2a137482
commit 8fd7afb54a
6 changed files with 544 additions and 41 deletions

View File

@@ -17,6 +17,7 @@ import (
"github.com/beckn-one/beckn-onix/core/module/handler"
"github.com/beckn-one/beckn-onix/pkg/log"
"github.com/beckn-one/beckn-onix/pkg/plugin"
"github.com/beckn-one/beckn-onix/pkg/plugin/implementation/otelsetup"
"github.com/beckn-one/beckn-onix/pkg/telemetry"
)
@@ -93,6 +94,23 @@ func validateConfig(cfg *Config) error {
return nil
}
// initPlugins initializes application-level plugins including telemetry.
func initPlugins(ctx context.Context, mgr *plugin.Manager, telemetryCfg *telemetry.Config) (*telemetry.Provider, error) {
if telemetryCfg == nil {
log.Info(ctx, "Telemetry config not provided; skipping OpenTelemetry setup")
return nil, nil
}
log.Infof(ctx, "Initializing telemetry via plugin id=otelsetup")
pluginConfig := otelsetup.ToPluginConfig(telemetryCfg)
otelProvider, err := mgr.OtelSetup(ctx, pluginConfig)
if err != nil {
return nil, fmt.Errorf("failed to initialize telemetry plugin: %w", err)
}
return otelProvider, nil
}
// newServer creates and initializes the HTTP server.
func newServer(ctx context.Context, mgr handler.PluginManager, cfg *Config, otelProvider *telemetry.Provider) (http.Handler, error) {
mux := http.NewServeMux()
@@ -134,29 +152,10 @@ func run(ctx context.Context, configPath string) error {
closers = append(closers, closer)
log.Debug(ctx, "Plugin manager loaded.")
// Initialize telemetry via plugin.
var otelProvider *telemetry.Provider
if cfg.Telemetry == nil {
log.Info(ctx, "Telemetry config not provided; skipping OpenTelemetry setup")
} else {
log.Infof(ctx, "Initializing telemetry via plugin id=otelsetup")
// Convert telemetry.Config to plugin.Config
pluginConfig := &plugin.Config{
ID: "otelsetup",
Config: map[string]string{
"serviceName": cfg.Telemetry.ServiceName,
"serviceVersion": cfg.Telemetry.ServiceVersion,
"enableMetrics": fmt.Sprintf("%t", cfg.Telemetry.EnableMetrics),
"environment": cfg.Telemetry.Environment,
},
}
otelProvider, err = mgr.OtelSetup(ctx, pluginConfig)
if err != nil {
return fmt.Errorf("failed to initialize telemetry plugin: %w", err)
}
// Note: The closer is now handled by the plugin manager
// Initialize plugins including telemetry.
otelProvider, err := initPlugins(ctx, mgr, cfg.Telemetry)
if err != nil {
return fmt.Errorf("failed to initialize plugins: %w", err)
}
// Initialize HTTP server.

View File

@@ -63,10 +63,11 @@ modules:
id: router
config:
routingConfig: ./config/local-simple-routing.yaml
- id: reqpreprocessor
config:
uuidKeys: transaction_id,message_id
role: bap
reqpreprocessor:
id: reqpreprocessor
config:
uuidKeys: transaction_id,message_id
role: bap
steps:
- validateSign
- addRoute
@@ -108,10 +109,11 @@ modules:
routingConfig: ./config/local-simple-routing-BAPCaller.yaml
signer:
id: signer
- id: reqpreprocessor
config:
uuidKeys: transaction_id,message_id
role: bap
reqpreprocessor:
id: reqpreprocessor
config:
uuidKeys: transaction_id,message_id
role: bap
steps:
- addRoute
- sign
@@ -157,7 +159,6 @@ modules:
id: router
config:
routingConfig: ./config/local-simple-routing-BPPReceiver.yaml
middleware:
steps:
- validateSign
- addRoute
@@ -199,7 +200,6 @@ modules:
routingConfig: ./config/local-simple-routing.yaml
signer:
id: signer
middleware:
steps:
- addRoute
- sign

View File

@@ -277,15 +277,11 @@ func (s *addRouteStep) Run(ctx *model.StepContext) error {
func extractSchemaVersion(body []byte) string {
type contextEnvelope struct {
Context struct {
Version string `json:"version"`
CoreVersion string `json:"core_version"`
Version string `json:"version"`
} `json:"context"`
}
var payload contextEnvelope
if err := json.Unmarshal(body, &payload); err == nil {
if payload.Context.CoreVersion != "" {
return payload.Context.CoreVersion
}
if payload.Context.Version != "" {
return payload.Context.Version
}

View File

@@ -4,13 +4,37 @@ import (
"context"
"fmt"
clientprom "github.com/prometheus/client_golang/prometheus"
clientpromhttp "github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/contrib/instrumentation/runtime"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"github.com/beckn-one/beckn-onix/pkg/log"
"github.com/beckn-one/beckn-onix/pkg/plugin"
"github.com/beckn-one/beckn-onix/pkg/telemetry"
)
// Setup wires the telemetry provider using the shared telemetry package. This
// is the concrete implementation behind the MetricsProvider interface.
// Setup wires the telemetry provider. This is the concrete implementation
// behind the MetricsProvider interface.
type Setup struct{}
// ToPluginConfig converts telemetry.Config to plugin.Config format.
func ToPluginConfig(cfg *telemetry.Config) *plugin.Config {
return &plugin.Config{
ID: "otelsetup",
Config: map[string]string{
"serviceName": cfg.ServiceName,
"serviceVersion": cfg.ServiceVersion,
"enableMetrics": fmt.Sprintf("%t", cfg.EnableMetrics),
"environment": cfg.Environment,
},
}
}
// New initializes the underlying telemetry provider. The returned provider
// exposes the HTTP handler and shutdown hooks that the core application can
// manage directly.
@@ -18,5 +42,66 @@ func (Setup) New(ctx context.Context, cfg *telemetry.Config) (*telemetry.Provide
if cfg == nil {
return nil, fmt.Errorf("telemetry config cannot be nil")
}
return telemetry.NewProvider(ctx, cfg)
// Apply defaults if fields are empty
if cfg.ServiceName == "" {
cfg.ServiceName = telemetry.DefaultConfig().ServiceName
}
if cfg.ServiceVersion == "" {
cfg.ServiceVersion = telemetry.DefaultConfig().ServiceVersion
}
if cfg.Environment == "" {
cfg.Environment = telemetry.DefaultConfig().Environment
}
if !cfg.EnableMetrics {
log.Info(ctx, "OpenTelemetry metrics disabled")
return &telemetry.Provider{
Shutdown: func(context.Context) error { return nil },
}, nil
}
res, err := resource.New(
ctx,
resource.WithAttributes(
attribute.String("service.name", cfg.ServiceName),
attribute.String("service.version", cfg.ServiceVersion),
attribute.String("deployment.environment", cfg.Environment),
),
)
if err != nil {
return nil, fmt.Errorf("failed to create telemetry resource: %w", err)
}
registry := clientprom.NewRegistry()
exporter, err := otelprom.New(
otelprom.WithRegisterer(registry),
otelprom.WithoutUnits(),
otelprom.WithoutScopeInfo(),
)
if err != nil {
return nil, fmt.Errorf("failed to create prometheus exporter: %w", err)
}
meterProvider := metric.NewMeterProvider(
metric.WithReader(exporter),
metric.WithResource(res),
)
otel.SetMeterProvider(meterProvider)
log.Infof(ctx, "OpenTelemetry metrics initialized for service=%s version=%s env=%s",
cfg.ServiceName, cfg.ServiceVersion, cfg.Environment)
if err := runtime.Start(runtime.WithMinimumReadMemStatsInterval(0)); err != nil {
log.Warnf(ctx, "Failed to start Go runtime instrumentation: %v", err)
}
return &telemetry.Provider{
MeterProvider: meterProvider,
MetricsHandler: clientpromhttp.HandlerFor(registry, clientpromhttp.HandlerOpts{}),
Shutdown: func(ctx context.Context) error {
return meterProvider.Shutdown(ctx)
},
}, nil
}

View File

@@ -0,0 +1,274 @@
package otelsetup
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"github.com/beckn-one/beckn-onix/pkg/telemetry"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSetup_New_Success(t *testing.T) {
setup := Setup{}
ctx := context.Background()
tests := []struct {
name string
cfg *telemetry.Config
}{
{
name: "Valid config with all fields",
cfg: &telemetry.Config{
ServiceName: "test-service",
ServiceVersion: "1.0.0",
EnableMetrics: true,
Environment: "test",
},
},
{
name: "Valid config with metrics disabled",
cfg: &telemetry.Config{
ServiceName: "test-service",
ServiceVersion: "1.0.0",
EnableMetrics: false,
Environment: "test",
},
},
{
name: "Config with empty fields uses defaults",
cfg: &telemetry.Config{
ServiceName: "",
ServiceVersion: "",
EnableMetrics: true,
Environment: "",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
provider, err := setup.New(ctx, tt.cfg)
require.NoError(t, err, "New() should not return error")
require.NotNil(t, provider, "New() should return non-nil provider")
require.NotNil(t, provider.Shutdown, "Provider should have shutdown function")
if tt.cfg.EnableMetrics {
assert.NotNil(t, provider.MetricsHandler, "MetricsHandler should be set when metrics enabled")
assert.NotNil(t, provider.MeterProvider, "MeterProvider should be set when metrics enabled")
// Test that metrics handler works
rec := httptest.NewRecorder()
req := httptest.NewRequest("GET", "/metrics", nil)
provider.MetricsHandler.ServeHTTP(rec, req)
assert.Equal(t, http.StatusOK, rec.Code)
} else {
assert.Nil(t, provider.MetricsHandler, "MetricsHandler should be nil when metrics disabled")
}
// Test shutdown
err = provider.Shutdown(ctx)
assert.NoError(t, err, "Shutdown should not return error")
})
}
}
func TestSetup_New_Failure(t *testing.T) {
setup := Setup{}
ctx := context.Background()
tests := []struct {
name string
cfg *telemetry.Config
wantErr bool
}{
{
name: "Nil config",
cfg: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
provider, err := setup.New(ctx, tt.cfg)
if tt.wantErr {
assert.Error(t, err, "New() should return error")
assert.Nil(t, provider, "New() should return nil provider on error")
} else {
assert.NoError(t, err, "New() should not return error")
assert.NotNil(t, provider, "New() should return non-nil provider")
}
})
}
}
func TestSetup_New_DefaultValues(t *testing.T) {
setup := Setup{}
ctx := context.Background()
// Test with empty fields - should use defaults
cfg := &telemetry.Config{
ServiceName: "",
ServiceVersion: "",
EnableMetrics: true,
Environment: "",
}
provider, err := setup.New(ctx, cfg)
require.NoError(t, err)
require.NotNil(t, provider)
// Verify defaults are applied by checking that provider is functional
assert.NotNil(t, provider.MetricsHandler, "MetricsHandler should be set with defaults")
assert.NotNil(t, provider.MeterProvider, "MeterProvider should be set with defaults")
// Test metrics endpoint
rec := httptest.NewRecorder()
req := httptest.NewRequest("GET", "/metrics", nil)
provider.MetricsHandler.ServeHTTP(rec, req)
assert.Equal(t, http.StatusOK, rec.Code)
// Cleanup
err = provider.Shutdown(ctx)
assert.NoError(t, err)
}
func TestSetup_New_MetricsDisabled(t *testing.T) {
setup := Setup{}
ctx := context.Background()
cfg := &telemetry.Config{
ServiceName: "test-service",
ServiceVersion: "1.0.0",
EnableMetrics: false,
Environment: "test",
}
provider, err := setup.New(ctx, cfg)
require.NoError(t, err)
require.NotNil(t, provider)
// When metrics are disabled, MetricsHandler should be nil
assert.Nil(t, provider.MetricsHandler, "MetricsHandler should be nil when metrics disabled")
assert.Nil(t, provider.MeterProvider, "MeterProvider should be nil when metrics disabled")
// Shutdown should still work
err = provider.Shutdown(ctx)
assert.NoError(t, err, "Shutdown should work even when metrics disabled")
}
func TestToPluginConfig_Success(t *testing.T) {
tests := []struct {
name string
cfg *telemetry.Config
expectedID string
expectedConfig map[string]string
}{
{
name: "Valid config with all fields",
cfg: &telemetry.Config{
ServiceName: "test-service",
ServiceVersion: "1.0.0",
EnableMetrics: true,
Environment: "test",
},
expectedID: "otelsetup",
expectedConfig: map[string]string{
"serviceName": "test-service",
"serviceVersion": "1.0.0",
"enableMetrics": "true",
"environment": "test",
},
},
{
name: "Config with enableMetrics false",
cfg: &telemetry.Config{
ServiceName: "my-service",
ServiceVersion: "2.0.0",
EnableMetrics: false,
Environment: "production",
},
expectedID: "otelsetup",
expectedConfig: map[string]string{
"serviceName": "my-service",
"serviceVersion": "2.0.0",
"enableMetrics": "false",
"environment": "production",
},
},
{
name: "Config with empty fields",
cfg: &telemetry.Config{
ServiceName: "",
ServiceVersion: "",
EnableMetrics: true,
Environment: "",
},
expectedID: "otelsetup",
expectedConfig: map[string]string{
"serviceName": "",
"serviceVersion": "",
"enableMetrics": "true",
"environment": "",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := ToPluginConfig(tt.cfg)
require.NotNil(t, result, "ToPluginConfig should return non-nil config")
assert.Equal(t, tt.expectedID, result.ID, "Plugin ID should be 'otelsetup'")
assert.Equal(t, tt.expectedConfig, result.Config, "Config map should match expected values")
})
}
}
func TestToPluginConfig_NilConfig(t *testing.T) {
// Test that ToPluginConfig handles nil config
// Note: This will panic if nil is passed, which is acceptable behavior
// as the function expects a valid config. In practice, callers should check for nil.
assert.Panics(t, func() {
ToPluginConfig(nil)
}, "ToPluginConfig should panic when given nil config")
}
func TestToPluginConfig_BooleanConversion(t *testing.T) {
tests := []struct {
name string
enableMetrics bool
expected string
}{
{
name: "EnableMetrics true",
enableMetrics: true,
expected: "true",
},
{
name: "EnableMetrics false",
enableMetrics: false,
expected: "false",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &telemetry.Config{
ServiceName: "test",
ServiceVersion: "1.0.0",
EnableMetrics: tt.enableMetrics,
Environment: "test",
}
result := ToPluginConfig(cfg)
require.NotNil(t, result)
assert.Equal(t, tt.expected, result.Config["enableMetrics"], "enableMetrics should be converted to string correctly")
})
}
}

View File

@@ -0,0 +1,149 @@
package telemetry
import (
"context"
"net/http/httptest"
"testing"
"go.opentelemetry.io/otel/metric"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestGetStepMetrics_Success(t *testing.T) {
ctx := context.Background()
// Initialize telemetry provider first
provider, err := NewProvider(ctx, &Config{
ServiceName: "test-service",
ServiceVersion: "1.0.0",
EnableMetrics: true,
Environment: "test",
})
require.NoError(t, err)
defer provider.Shutdown(context.Background())
// Test getting step metrics
metrics, err := GetStepMetrics(ctx)
require.NoError(t, err, "GetStepMetrics() should not return error")
require.NotNil(t, metrics, "GetStepMetrics() should return non-nil metrics")
// Verify all metric instruments are initialized
assert.NotNil(t, metrics.StepExecutionDuration, "StepExecutionDuration should be initialized")
assert.NotNil(t, metrics.StepExecutionTotal, "StepExecutionTotal should be initialized")
assert.NotNil(t, metrics.StepErrorsTotal, "StepErrorsTotal should be initialized")
}
func TestGetStepMetrics_ConcurrentAccess(t *testing.T) {
ctx := context.Background()
// Initialize telemetry provider first
provider, err := NewProvider(ctx, &Config{
ServiceName: "test-service",
ServiceVersion: "1.0.0",
EnableMetrics: true,
Environment: "test",
})
require.NoError(t, err)
defer provider.Shutdown(context.Background())
// Test that GetStepMetrics is safe for concurrent access
// and returns the same instance (singleton pattern)
metrics1, err1 := GetStepMetrics(ctx)
require.NoError(t, err1)
require.NotNil(t, metrics1)
metrics2, err2 := GetStepMetrics(ctx)
require.NoError(t, err2)
require.NotNil(t, metrics2)
// Should return the same instance
assert.Equal(t, metrics1, metrics2, "GetStepMetrics should return the same instance")
}
func TestGetStepMetrics_WithoutProvider(t *testing.T) {
ctx := context.Background()
// Test getting step metrics without initializing provider
// This should still work but may not have a valid meter provider
metrics, err := GetStepMetrics(ctx)
// Note: This might succeed or fail depending on OTel's default behavior
// We're just checking it doesn't panic
if err != nil {
t.Logf("GetStepMetrics returned error (expected if no provider): %v", err)
} else {
assert.NotNil(t, metrics, "Metrics should be returned even without explicit provider")
}
}
func TestStepMetrics_Instruments(t *testing.T) {
ctx := context.Background()
// Initialize telemetry provider
provider, err := NewProvider(ctx, &Config{
ServiceName: "test-service",
ServiceVersion: "1.0.0",
EnableMetrics: true,
Environment: "test",
})
require.NoError(t, err)
defer provider.Shutdown(context.Background())
// Get step metrics
metrics, err := GetStepMetrics(ctx)
require.NoError(t, err)
require.NotNil(t, metrics)
// Test that we can record metrics (this tests the instruments are functional)
// Note: We can't easily verify the metrics were recorded without querying the exporter,
// but we can verify the instruments are not nil and can be called without panicking
// Test StepExecutionDuration
require.NotPanics(t, func() {
metrics.StepExecutionDuration.Record(ctx, 0.5,
metric.WithAttributes(AttrStep.String("test-step"), AttrModule.String("test-module")))
}, "StepExecutionDuration.Record should not panic")
// Test StepExecutionTotal
require.NotPanics(t, func() {
metrics.StepExecutionTotal.Add(ctx, 1,
metric.WithAttributes(AttrStep.String("test-step"), AttrModule.String("test-module")))
}, "StepExecutionTotal.Add should not panic")
// Test StepErrorsTotal
require.NotPanics(t, func() {
metrics.StepErrorsTotal.Add(ctx, 1,
metric.WithAttributes(AttrStep.String("test-step"), AttrModule.String("test-module")))
}, "StepErrorsTotal.Add should not panic")
// Verify metrics are exposed via HTTP handler
rec := httptest.NewRecorder()
req := httptest.NewRequest("GET", "/metrics", nil)
provider.MetricsHandler.ServeHTTP(rec, req)
assert.Equal(t, 200, rec.Code, "Metrics endpoint should return 200")
}
func TestStepMetrics_MultipleCalls(t *testing.T) {
ctx := context.Background()
// Initialize telemetry provider
provider, err := NewProvider(ctx, &Config{
ServiceName: "test-service",
ServiceVersion: "1.0.0",
EnableMetrics: true,
Environment: "test",
})
require.NoError(t, err)
defer provider.Shutdown(context.Background())
// Call GetStepMetrics multiple times
for i := 0; i < 10; i++ {
metrics, err := GetStepMetrics(ctx)
require.NoError(t, err, "GetStepMetrics should succeed on call %d", i)
require.NotNil(t, metrics, "GetStepMetrics should return non-nil on call %d", i)
assert.NotNil(t, metrics.StepExecutionDuration, "StepExecutionDuration should be initialized")
assert.NotNil(t, metrics.StepExecutionTotal, "StepExecutionTotal should be initialized")
assert.NotNil(t, metrics.StepErrorsTotal, "StepErrorsTotal should be initialized")
}
}