update the as per the comment
This commit is contained in:
@@ -13,7 +13,6 @@ import (
|
||||
"github.com/beckn-one/beckn-onix/pkg/plugin"
|
||||
"github.com/beckn-one/beckn-onix/pkg/plugin/definition"
|
||||
"github.com/beckn-one/beckn-onix/pkg/response"
|
||||
"github.com/beckn-one/beckn-onix/pkg/telemetry"
|
||||
)
|
||||
|
||||
// stdHandler orchestrates the execution of defined processing steps.
|
||||
@@ -311,7 +310,7 @@ func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Conf
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
instrumentedStep, wrapErr := telemetry.NewInstrumentedStep(s, step, h.moduleName)
|
||||
instrumentedStep, wrapErr := NewInstrumentedStep(s, step, h.moduleName)
|
||||
if wrapErr != nil {
|
||||
log.Warnf(ctx, "Failed to instrument step %s: %v", step, wrapErr)
|
||||
h.steps = append(h.steps, s)
|
||||
|
||||
84
core/module/handler/step_instrumentor.go
Normal file
84
core/module/handler/step_instrumentor.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/log"
|
||||
"github.com/beckn-one/beckn-onix/pkg/model"
|
||||
"github.com/beckn-one/beckn-onix/pkg/telemetry"
|
||||
)
|
||||
|
||||
// StepRunner represents the minimal contract required for step instrumentation.
|
||||
type StepRunner interface {
|
||||
Run(*model.StepContext) error
|
||||
}
|
||||
|
||||
// InstrumentedStep wraps a processing step with telemetry instrumentation.
|
||||
type InstrumentedStep struct {
|
||||
step StepRunner
|
||||
stepName string
|
||||
moduleName string
|
||||
metrics *StepMetrics
|
||||
}
|
||||
|
||||
// NewInstrumentedStep returns a telemetry enabled wrapper around a definition.Step.
|
||||
func NewInstrumentedStep(step StepRunner, stepName, moduleName string) (*InstrumentedStep, error) {
|
||||
metrics, err := GetStepMetrics(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &InstrumentedStep{
|
||||
step: step,
|
||||
stepName: stepName,
|
||||
moduleName: moduleName,
|
||||
metrics: metrics,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type becknError interface {
|
||||
BecknError() *model.Error
|
||||
}
|
||||
|
||||
// Run executes the underlying step and records RED style metrics.
|
||||
func (is *InstrumentedStep) Run(ctx *model.StepContext) error {
|
||||
if is.metrics == nil {
|
||||
return is.step.Run(ctx)
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
err := is.step.Run(ctx)
|
||||
duration := time.Since(start).Seconds()
|
||||
|
||||
attrs := []attribute.KeyValue{
|
||||
telemetry.AttrModule.String(is.moduleName),
|
||||
telemetry.AttrStep.String(is.stepName),
|
||||
telemetry.AttrRole.String(string(ctx.Role)),
|
||||
}
|
||||
|
||||
is.metrics.StepExecutionTotal.Add(ctx.Context, 1, metric.WithAttributes(attrs...))
|
||||
is.metrics.StepExecutionDuration.Record(ctx.Context, duration, metric.WithAttributes(attrs...))
|
||||
|
||||
if err != nil {
|
||||
errorType := fmt.Sprintf("%T", err)
|
||||
var becknErr becknError
|
||||
if errors.As(err, &becknErr) {
|
||||
if be := becknErr.BecknError(); be != nil && be.Code != "" {
|
||||
errorType = be.Code
|
||||
}
|
||||
}
|
||||
|
||||
errorAttrs := append(attrs, telemetry.AttrErrorType.String(errorType))
|
||||
is.metrics.StepErrorsTotal.Add(ctx.Context, 1, metric.WithAttributes(errorAttrs...))
|
||||
log.Errorf(ctx.Context, err, "Step %s failed", is.stepName)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
52
core/module/handler/step_instrumentor_test.go
Normal file
52
core/module/handler/step_instrumentor_test.go
Normal file
@@ -0,0 +1,52 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/model"
|
||||
"github.com/beckn-one/beckn-onix/pkg/telemetry"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type stubStep struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (s stubStep) Run(ctx *model.StepContext) error {
|
||||
return s.err
|
||||
}
|
||||
|
||||
func TestInstrumentedStepSuccess(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
provider, err := telemetry.NewTestProvider(ctx)
|
||||
require.NoError(t, err)
|
||||
defer provider.Shutdown(context.Background())
|
||||
|
||||
step, err := NewInstrumentedStep(stubStep{}, "test-step", "test-module")
|
||||
require.NoError(t, err)
|
||||
|
||||
stepCtx := &model.StepContext{
|
||||
Context: context.Background(),
|
||||
Role: model.RoleBAP,
|
||||
}
|
||||
require.NoError(t, step.Run(stepCtx))
|
||||
}
|
||||
|
||||
func TestInstrumentedStepError(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
provider, err := telemetry.NewTestProvider(ctx)
|
||||
require.NoError(t, err)
|
||||
defer provider.Shutdown(context.Background())
|
||||
|
||||
step, err := NewInstrumentedStep(stubStep{err: errors.New("boom")}, "test-step", "test-module")
|
||||
require.NoError(t, err)
|
||||
|
||||
stepCtx := &model.StepContext{
|
||||
Context: context.Background(),
|
||||
Role: model.RoleBAP,
|
||||
}
|
||||
require.Error(t, step.Run(stepCtx))
|
||||
}
|
||||
|
||||
69
core/module/handler/step_metrics.go
Normal file
69
core/module/handler/step_metrics.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
// StepMetrics exposes step execution metric instruments.
|
||||
type StepMetrics struct {
|
||||
StepExecutionDuration metric.Float64Histogram
|
||||
StepExecutionTotal metric.Int64Counter
|
||||
StepErrorsTotal metric.Int64Counter
|
||||
}
|
||||
|
||||
var (
|
||||
stepMetricsInstance *StepMetrics
|
||||
stepMetricsOnce sync.Once
|
||||
stepMetricsErr error
|
||||
)
|
||||
|
||||
// GetStepMetrics lazily initializes step metric instruments and returns a cached reference.
|
||||
func GetStepMetrics(ctx context.Context) (*StepMetrics, error) {
|
||||
stepMetricsOnce.Do(func() {
|
||||
stepMetricsInstance, stepMetricsErr = newStepMetrics()
|
||||
})
|
||||
return stepMetricsInstance, stepMetricsErr
|
||||
}
|
||||
|
||||
func newStepMetrics() (*StepMetrics, error) {
|
||||
meter := otel.GetMeterProvider().Meter(
|
||||
"github.com/beckn-one/beckn-onix/telemetry",
|
||||
metric.WithInstrumentationVersion("1.0.0"),
|
||||
)
|
||||
|
||||
m := &StepMetrics{}
|
||||
var err error
|
||||
|
||||
if m.StepExecutionDuration, err = meter.Float64Histogram(
|
||||
"onix_step_execution_duration_seconds",
|
||||
metric.WithDescription("Duration of individual processing steps"),
|
||||
metric.WithUnit("s"),
|
||||
metric.WithExplicitBucketBoundaries(0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5),
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("onix_step_execution_duration_seconds: %w", err)
|
||||
}
|
||||
|
||||
if m.StepExecutionTotal, err = meter.Int64Counter(
|
||||
"onix_step_executions_total",
|
||||
metric.WithDescription("Total processing step executions"),
|
||||
metric.WithUnit("{execution}"),
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("onix_step_executions_total: %w", err)
|
||||
}
|
||||
|
||||
if m.StepErrorsTotal, err = meter.Int64Counter(
|
||||
"onix_step_errors_total",
|
||||
metric.WithDescription("Processing step errors"),
|
||||
metric.WithUnit("{error}"),
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("onix_step_errors_total: %w", err)
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
131
core/module/handler/step_metrics_test.go
Normal file
131
core/module/handler/step_metrics_test.go
Normal file
@@ -0,0 +1,131 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/telemetry"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestGetStepMetrics_Success(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Initialize telemetry provider first
|
||||
provider, err := telemetry.NewTestProvider(ctx)
|
||||
require.NoError(t, err)
|
||||
defer provider.Shutdown(context.Background())
|
||||
|
||||
// Test getting step metrics
|
||||
metrics, err := GetStepMetrics(ctx)
|
||||
require.NoError(t, err, "GetStepMetrics() should not return error")
|
||||
require.NotNil(t, metrics, "GetStepMetrics() should return non-nil metrics")
|
||||
|
||||
// Verify all metric instruments are initialized
|
||||
assert.NotNil(t, metrics.StepExecutionDuration, "StepExecutionDuration should be initialized")
|
||||
assert.NotNil(t, metrics.StepExecutionTotal, "StepExecutionTotal should be initialized")
|
||||
assert.NotNil(t, metrics.StepErrorsTotal, "StepErrorsTotal should be initialized")
|
||||
}
|
||||
|
||||
func TestGetStepMetrics_ConcurrentAccess(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Initialize telemetry provider first
|
||||
provider, err := telemetry.NewTestProvider(ctx)
|
||||
require.NoError(t, err)
|
||||
defer provider.Shutdown(context.Background())
|
||||
|
||||
// Test that GetStepMetrics is safe for concurrent access
|
||||
// and returns the same instance (singleton pattern)
|
||||
metrics1, err1 := GetStepMetrics(ctx)
|
||||
require.NoError(t, err1)
|
||||
require.NotNil(t, metrics1)
|
||||
|
||||
metrics2, err2 := GetStepMetrics(ctx)
|
||||
require.NoError(t, err2)
|
||||
require.NotNil(t, metrics2)
|
||||
|
||||
// Should return the same instance
|
||||
assert.Equal(t, metrics1, metrics2, "GetStepMetrics should return the same instance")
|
||||
}
|
||||
|
||||
func TestGetStepMetrics_WithoutProvider(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Test getting step metrics without initializing provider
|
||||
// This should still work but may not have a valid meter provider
|
||||
metrics, err := GetStepMetrics(ctx)
|
||||
// Note: This might succeed or fail depending on OTel's default behavior
|
||||
// We're just checking it doesn't panic
|
||||
if err != nil {
|
||||
t.Logf("GetStepMetrics returned error (expected if no provider): %v", err)
|
||||
} else {
|
||||
assert.NotNil(t, metrics, "Metrics should be returned even without explicit provider")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStepMetrics_Instruments(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Initialize telemetry provider
|
||||
provider, err := telemetry.NewTestProvider(ctx)
|
||||
require.NoError(t, err)
|
||||
defer provider.Shutdown(context.Background())
|
||||
|
||||
// Get step metrics
|
||||
metrics, err := GetStepMetrics(ctx)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, metrics)
|
||||
|
||||
// Test that we can record metrics (this tests the instruments are functional)
|
||||
// Note: We can't easily verify the metrics were recorded without querying the exporter,
|
||||
// but we can verify the instruments are not nil and can be called without panicking
|
||||
|
||||
// Test StepExecutionDuration
|
||||
require.NotPanics(t, func() {
|
||||
metrics.StepExecutionDuration.Record(ctx, 0.5,
|
||||
metric.WithAttributes(telemetry.AttrStep.String("test-step"), telemetry.AttrModule.String("test-module")))
|
||||
}, "StepExecutionDuration.Record should not panic")
|
||||
|
||||
// Test StepExecutionTotal
|
||||
require.NotPanics(t, func() {
|
||||
metrics.StepExecutionTotal.Add(ctx, 1,
|
||||
metric.WithAttributes(telemetry.AttrStep.String("test-step"), telemetry.AttrModule.String("test-module")))
|
||||
}, "StepExecutionTotal.Add should not panic")
|
||||
|
||||
// Test StepErrorsTotal
|
||||
require.NotPanics(t, func() {
|
||||
metrics.StepErrorsTotal.Add(ctx, 1,
|
||||
metric.WithAttributes(telemetry.AttrStep.String("test-step"), telemetry.AttrModule.String("test-module")))
|
||||
}, "StepErrorsTotal.Add should not panic")
|
||||
|
||||
// Verify metrics are exposed via HTTP handler
|
||||
rec := httptest.NewRecorder()
|
||||
req := httptest.NewRequest("GET", "/metrics", nil)
|
||||
provider.MetricsHandler.ServeHTTP(rec, req)
|
||||
assert.Equal(t, 200, rec.Code, "Metrics endpoint should return 200")
|
||||
}
|
||||
|
||||
func TestStepMetrics_MultipleCalls(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Initialize telemetry provider
|
||||
provider, err := telemetry.NewTestProvider(ctx)
|
||||
require.NoError(t, err)
|
||||
defer provider.Shutdown(context.Background())
|
||||
|
||||
// Call GetStepMetrics multiple times
|
||||
for i := 0; i < 10; i++ {
|
||||
metrics, err := GetStepMetrics(ctx)
|
||||
require.NoError(t, err, "GetStepMetrics should succeed on call %d", i)
|
||||
require.NotNil(t, metrics, "GetStepMetrics should return non-nil on call %d", i)
|
||||
assert.NotNil(t, metrics.StepExecutionDuration, "StepExecutionDuration should be initialized")
|
||||
assert.NotNil(t, metrics.StepExecutionTotal, "StepExecutionTotal should be initialized")
|
||||
assert.NotNil(t, metrics.StepErrorsTotal, "StepErrorsTotal should be initialized")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user