Files
onix/core/module/handler/step_instrumentor.go
2026-02-26 14:45:35 +05:30

106 lines
2.8 KiB
Go

package handler
import (
"context"
"errors"
"fmt"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"github.com/beckn-one/beckn-onix/pkg/log"
"github.com/beckn-one/beckn-onix/pkg/model"
"github.com/beckn-one/beckn-onix/pkg/telemetry"
)
// StepRunner represents the minimal contract required for step instrumentation.
type StepRunner interface {
Run(*model.StepContext) error
}
// InstrumentedStep wraps a processing step with telemetry instrumentation.
type InstrumentedStep struct {
step StepRunner
stepName string
moduleName string
metrics *StepMetrics
}
// NewInstrumentedStep returns a telemetry enabled wrapper around a definition.Step.
func NewInstrumentedStep(step StepRunner, stepName, moduleName string) (*InstrumentedStep, error) {
metrics, err := GetStepMetrics(context.Background())
if err != nil {
return nil, err
}
return &InstrumentedStep{
step: step,
stepName: stepName,
moduleName: moduleName,
metrics: metrics,
}, nil
}
type becknError interface {
BecknError() *model.Error
}
// Run executes the underlying step and records RED style metrics.
func (is *InstrumentedStep) Run(ctx *model.StepContext) error {
if is.metrics == nil {
return is.step.Run(ctx)
}
tracer := otel.Tracer(telemetry.ScopeName, trace.WithInstrumentationVersion(telemetry.ScopeVersion))
stepName := "step:" + is.stepName
spanCtx, span := tracer.Start(ctx.Context, stepName)
defer span.End()
// run step with context that contains the step span
stepCtx := &model.StepContext{
Context: spanCtx,
Request: ctx.Request,
Body: ctx.Body,
Role: ctx.Role,
SubID: ctx.SubID,
RespHeader: ctx.RespHeader,
Route: ctx.Route,
}
start := time.Now()
err := is.step.Run(stepCtx)
duration := time.Since(start).Seconds()
attrs := []attribute.KeyValue{
telemetry.AttrModule.String(is.moduleName),
telemetry.AttrStep.String(is.stepName),
telemetry.AttrRole.String(string(stepCtx.Role)),
}
is.metrics.StepExecutionTotal.Add(stepCtx.Context, 1, metric.WithAttributes(attrs...))
is.metrics.StepExecutionDuration.Record(stepCtx.Context, duration, metric.WithAttributes(attrs...))
if err != nil {
errorType := fmt.Sprintf("%T", err)
var becknErr becknError
if errors.As(err, &becknErr) {
if be := becknErr.BecknError(); be != nil && be.Code != "" {
errorType = be.Code
}
}
errorAttrs := append(attrs, telemetry.AttrErrorType.String(errorType))
is.metrics.StepErrorsTotal.Add(stepCtx.Context, 1, metric.WithAttributes(errorAttrs...))
log.Errorf(stepCtx.Context, err, "Step %s failed", is.stepName)
}
if stepCtx.Route != nil {
ctx.Route = stepCtx.Route
}
ctx.WithContext(stepCtx.Context)
return err
}