Feat: configure audit fields and metrics for onix adapter and add local configuration for onix adapterZ

This commit is contained in:
Manendra Pal Singh
2026-02-23 16:08:44 +05:30
parent 2745047b27
commit ab89102711
29 changed files with 2167 additions and 441 deletions

View File

@@ -0,0 +1,120 @@
package handler
import (
"context"
"fmt"
"net/http"
"strconv"
"sync"
"time"
"github.com/beckn-one/beckn-onix/pkg/telemetry"
"github.com/google/uuid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
type HTTPMetrics struct {
HttpRequestCount metric.Int64Counter
}
var (
httlMetricsInstance *HTTPMetrics
httpMetricsOnce sync.Once
httpMetricsErr error
)
func newHTTPMetrics() (*HTTPMetrics, error) {
meter := otel.GetMeterProvider().Meter(telemetry.ScopeName,
metric.WithInstrumentationVersion(telemetry.ScopeVersion))
m := &HTTPMetrics{}
var err error
if m.HttpRequestCount, err = meter.Int64Counter(
"onix_http_request_count",
metric.WithDescription("Total HTTP requests by status, route, method, role and calle "),
metric.WithUnit("1"),
); err != nil {
return nil, fmt.Errorf("onix_http_request_count: %w", err)
}
return m, nil
}
func GetHTTPMetrics(ctx context.Context) (*HTTPMetrics, error) {
httpMetricsOnce.Do(func() {
httlMetricsInstance, httpMetricsErr = newHTTPMetrics()
})
return httlMetricsInstance, httpMetricsErr
}
// StatusClass returns the HTTP status class string (e.g. 200 -> "2xx").
func StatusClass(statusCode int) string {
switch {
case statusCode >= 100 && statusCode < 200:
return "1xx"
case statusCode >= 200 && statusCode < 300:
return "2xx"
case statusCode >= 300 && statusCode < 400:
return "3xx"
case statusCode >= 400 && statusCode < 500:
return "4xx"
default:
return "5xx"
}
}
func RecordHTTPRequest(ctx context.Context, statusCode int, action, role, caller string) {
m, err := GetHTTPMetrics(ctx)
if err != nil || m == nil {
return
}
status := StatusClass(statusCode)
attributes := []attribute.KeyValue{
telemetry.AttrHTTPStatus.String(status),
telemetry.AttrAction.String(action),
telemetry.AttrRole.String(role),
telemetry.AttrCaller.String(caller),
}
metric_code := action + "_api_total_count"
category := "NetworkHealth"
if action == "/search" || action == "/discovery" {
category = "Discovery"
}
attributes = append(attributes, specHttpMetricAttr(metric_code, category)...) //TODO: need to update as per the furthur discussion
m.HttpRequestCount.Add(ctx, 1, metric.WithAttributes(attributes...))
}
type responseRecorder struct {
http.ResponseWriter
statusCode int
written bool
record func()
}
func (r *responseRecorder) WriteHeader(statusCode int) {
if !r.written {
r.written = true
r.statusCode = statusCode
if r.record != nil {
r.record()
}
}
r.ResponseWriter.WriteHeader(statusCode)
}
func specHttpMetricAttr(metricCode, category string) []attribute.KeyValue {
granularity, frequency := telemetry.GetNetworkMetricsConfig()
return []attribute.KeyValue{
telemetry.AttrMetricUUID.String(uuid.New().String()),
telemetry.AttrMetricCode.String(metricCode),
telemetry.AttrMetricCategory.String(category),
telemetry.AttrMetricGranularity.String(granularity),
telemetry.AttrMetricFrequency.String(frequency),
telemetry.AttrObservedTimeUnixNano.String(strconv.FormatInt(time.Now().UnixNano(), 10)),
}
}

View File

@@ -7,12 +7,22 @@ import (
"io"
"net/http"
"net/http/httputil"
"strconv"
"time"
"github.com/beckn-one/beckn-onix/pkg/log"
"github.com/beckn-one/beckn-onix/pkg/model"
"github.com/beckn-one/beckn-onix/pkg/plugin"
"github.com/beckn-one/beckn-onix/pkg/plugin/definition"
"github.com/beckn-one/beckn-onix/pkg/response"
"github.com/beckn-one/beckn-onix/pkg/telemetry"
"github.com/google/uuid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
auditlog "go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
// stdHandler orchestrates the execution of defined processing steps.
@@ -94,31 +104,74 @@ func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Header.Del("X-Role")
}()
ctx, err := h.stepCtx(r, w.Header())
// to start a new trace
propagator := otel.GetTextMapPropagator()
traceCtx := propagator.Extract(r.Context(), propagation.HeaderCarrier(r.Header))
tracer := otel.Tracer(telemetry.ScopeName, trace.WithInstrumentationVersion(telemetry.ScopeVersion))
spanName := r.URL.Path
traceCtx, span := tracer.Start(traceCtx, spanName, trace.WithSpanKind(trace.SpanKindServer))
//to build the request with trace
r = r.WithContext(traceCtx)
var recordOnce func()
wrapped := &responseRecorder{
ResponseWriter: w,
statusCode: http.StatusOK,
record: nil,
}
caller := "unknown"
if v, ok := r.Context().Value(model.ContextKeyCallerID).(string); ok && v != "" {
caller = v
}
httpMeter, _ := GetHTTPMetrics(r.Context())
if httpMeter != nil {
recordOnce = func() {
RecordHTTPRequest(r.Context(), wrapped.statusCode, r.URL.Path, string(h.role), caller)
}
wrapped.record = recordOnce
}
// set beckn attribute
setBecknAttr(span, r, h)
stepCtx, err := h.stepCtx(r, w.Header())
if err != nil {
log.Errorf(r.Context(), err, "stepCtx(r):%v", err)
response.SendNack(r.Context(), w, err)
response.SendNack(r.Context(), wrapped, err)
return
}
log.Request(r.Context(), r, ctx.Body)
log.Request(r.Context(), r, stepCtx.Body)
defer func() {
span.SetAttributes(attribute.Int("http.response.status_code", wrapped.statusCode), attribute.String("observedTimeUnixNano", strconv.FormatInt(time.Now().UnixNano(), 10)))
if wrapped.statusCode < 200 || wrapped.statusCode >= 400 {
span.SetStatus(codes.Error, "status code is invalid")
}
body := stepCtx.Body
go telemetry.EmitAuditLogs(r.Context(), body, auditlog.Int("http.response.status_code", wrapped.statusCode))
span.End()
}()
// Execute processing steps.
for _, step := range h.steps {
if err := step.Run(ctx); err != nil {
log.Errorf(ctx, err, "%T.run():%v", step, err)
response.SendNack(ctx, w, err)
if err := step.Run(stepCtx); err != nil {
log.Errorf(stepCtx, err, "%T.run():%v", step, err)
response.SendNack(stepCtx, wrapped, err)
return
}
}
// Restore request body before forwarding or publishing.
r.Body = io.NopCloser(bytes.NewReader(ctx.Body))
if ctx.Route == nil {
r.Body = io.NopCloser(bytes.NewReader(stepCtx.Body))
if stepCtx.Route == nil {
response.SendAck(w)
return
}
// Handle routing based on the defined route type.
route(ctx, r, w, h.publisher, h.httpClient)
route(stepCtx, r, wrapped, h.publisher, h.httpClient)
}
// stepCtx creates a new StepContext for processing an HTTP request.
@@ -321,3 +374,42 @@ func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Conf
log.Infof(ctx, "Processor steps initialized: %v", cfg.Steps)
return nil
}
func setBecknAttr(span trace.Span, r *http.Request, h *stdHandler) {
recipientID := h.SubscriberID
if v, ok := r.Context().Value(model.ContextKeySubscriberID).(string); ok {
recipientID = v
}
senderID := ""
if v, ok := r.Context().Value(model.ContextKeyCallerID).(string); ok {
senderID = v
}
attrs := []attribute.KeyValue{
attribute.String("recipient.id", recipientID),
attribute.String("sender.id", senderID),
attribute.String("span_uuid", uuid.New().String()),
attribute.String("http.request.method", r.Method),
attribute.String("http.route", r.URL.Path),
}
if trxID, ok := r.Context().Value(model.ContextKeyTxnID).(string); ok {
attrs = append(attrs, attribute.String("transaction_id", trxID))
}
if mesID, ok := r.Context().Value(model.ContextKeyMsgID).(string); ok {
attrs = append(attrs, attribute.String("message_id", mesID))
}
if parentID, ok := r.Context().Value(model.ContextKeyParentID).(string); ok && parentID != "" {
attrs = append(attrs, attribute.String("parentSpanId", parentID))
}
if r.Host != "" {
attrs = append(attrs, attribute.String("server.address", r.Host))
}
if ua := r.UserAgent(); ua != "" {
attrs = append(attrs, attribute.String("user_agent.original", ua))
}
span.SetAttributes(attrs...)
}

View File

@@ -7,7 +7,9 @@ import (
"strings"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"github.com/beckn-one/beckn-onix/pkg/log"
"github.com/beckn-one/beckn-onix/pkg/model"
@@ -38,24 +40,42 @@ func (s *signStep) Run(ctx *model.StepContext) error {
if len(ctx.SubID) == 0 {
return model.NewBadReqErr(fmt.Errorf("subscriberID not set"))
}
keySet, err := s.km.Keyset(ctx, ctx.SubID)
if err != nil {
return fmt.Errorf("failed to get signing key: %w", err)
}
createdAt := time.Now().Unix()
validTill := time.Now().Add(5 * time.Minute).Unix()
sign, err := s.signer.Sign(ctx, ctx.Body, keySet.SigningPrivate, createdAt, validTill)
if err != nil {
return fmt.Errorf("failed to sign request: %w", err)
tracer := otel.Tracer("beckn-onix")
var keySet *model.Keyset
{
// to create span to finding the key set
keySetCtx, keySetSpan := tracer.Start(ctx.Context, "keyset")
defer keySetSpan.End()
ks, err := s.km.Keyset(keySetCtx, ctx.SubID)
if err != nil {
return fmt.Errorf("failed to get signing key: %w", err)
}
keySet = ks
}
authHeader := s.generateAuthHeader(ctx.SubID, keySet.UniqueKeyID, createdAt, validTill, sign)
log.Debugf(ctx, "Signature generated: %v", sign)
header := model.AuthHeaderSubscriber
if ctx.Role == model.RoleGateway {
header = model.AuthHeaderGateway
{
// to create span for the signa
signerCtx, signerSpan := tracer.Start(ctx.Context, "sign")
defer signerSpan.End()
createdAt := time.Now().Unix()
validTill := time.Now().Add(5 * time.Minute).Unix()
sign, err := s.signer.Sign(signerCtx, ctx.Body, keySet.SigningPrivate, createdAt, validTill)
if err != nil {
return fmt.Errorf("failed to sign request: %w", err)
}
authHeader := s.generateAuthHeader(ctx.SubID, keySet.UniqueKeyID, createdAt, validTill, sign)
log.Debugf(ctx, "Signature generated: %v", sign)
header := model.AuthHeaderSubscriber
if ctx.Role == model.RoleGateway {
header = model.AuthHeaderGateway
}
ctx.Request.Header.Set(header, authHeader)
}
ctx.Request.Header.Set(header, authHeader)
return nil
}
@@ -93,8 +113,20 @@ func newValidateSignStep(signValidator definition.SignValidator, km definition.K
// Run executes the validation step.
func (s *validateSignStep) Run(ctx *model.StepContext) error {
err := s.validateHeaders(ctx)
s.recordMetrics(ctx, err)
tracer := otel.Tracer(telemetry.ScopeName, trace.WithInstrumentationVersion(telemetry.ScopeVersion))
spanCtx, span := tracer.Start(ctx.Context, "validate-sign")
defer span.End()
stepCtx := &model.StepContext{
Context: spanCtx,
Request: ctx.Request,
Body: ctx.Body,
Role: ctx.Role,
SubID: ctx.SubID,
RespHeader: ctx.RespHeader,
Route: ctx.Route,
}
err := s.validateHeaders(stepCtx)
s.recordMetrics(stepCtx, err)
return err
}

View File

@@ -6,8 +6,10 @@ import (
"fmt"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"github.com/beckn-one/beckn-onix/pkg/log"
"github.com/beckn-one/beckn-onix/pkg/model"
@@ -52,18 +54,34 @@ func (is *InstrumentedStep) Run(ctx *model.StepContext) error {
return is.step.Run(ctx)
}
tracer := otel.Tracer(telemetry.ScopeName, trace.WithInstrumentationVersion(telemetry.ScopeVersion))
stepName := "step:" + is.stepName
spanCtx, span := tracer.Start(ctx.Context, stepName)
defer span.End()
// run step with context that contains the step span
stepCtx := &model.StepContext{
Context: spanCtx,
Request: ctx.Request,
Body: ctx.Body,
Role: ctx.Role,
SubID: ctx.SubID,
RespHeader: ctx.RespHeader,
Route: ctx.Route,
}
start := time.Now()
err := is.step.Run(ctx)
err := is.step.Run(stepCtx)
duration := time.Since(start).Seconds()
attrs := []attribute.KeyValue{
telemetry.AttrModule.String(is.moduleName),
telemetry.AttrStep.String(is.stepName),
telemetry.AttrRole.String(string(ctx.Role)),
telemetry.AttrRole.String(string(stepCtx.Role)),
}
is.metrics.StepExecutionTotal.Add(ctx.Context, 1, metric.WithAttributes(attrs...))
is.metrics.StepExecutionDuration.Record(ctx.Context, duration, metric.WithAttributes(attrs...))
is.metrics.StepExecutionTotal.Add(stepCtx.Context, 1, metric.WithAttributes(attrs...))
is.metrics.StepExecutionDuration.Record(stepCtx.Context, duration, metric.WithAttributes(attrs...))
if err != nil {
errorType := fmt.Sprintf("%T", err)
@@ -75,10 +93,12 @@ func (is *InstrumentedStep) Run(ctx *model.StepContext) error {
}
errorAttrs := append(attrs, telemetry.AttrErrorType.String(errorType))
is.metrics.StepErrorsTotal.Add(ctx.Context, 1, metric.WithAttributes(errorAttrs...))
log.Errorf(ctx.Context, err, "Step %s failed", is.stepName)
is.metrics.StepErrorsTotal.Add(stepCtx.Context, 1, metric.WithAttributes(errorAttrs...))
log.Errorf(stepCtx.Context, err, "Step %s failed", is.stepName)
}
if stepCtx.Route != nil {
ctx.Route = stepCtx.Route
}
return err
}

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"sync"
"github.com/beckn-one/beckn-onix/pkg/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)
@@ -32,8 +33,8 @@ func GetStepMetrics(ctx context.Context) (*StepMetrics, error) {
func newStepMetrics() (*StepMetrics, error) {
meter := otel.GetMeterProvider().Meter(
"github.com/beckn-one/beckn-onix/telemetry",
metric.WithInstrumentationVersion("1.0.0"),
telemetry.ScopeName,
metric.WithInstrumentationVersion(telemetry.ScopeVersion),
)
m := &StepMetrics{}
@@ -66,4 +67,3 @@ func newStepMetrics() (*StepMetrics, error) {
return m, nil
}

View File

@@ -2,7 +2,7 @@ package handler
import (
"context"
"net/http/httptest"
"sync"
"testing"
"go.opentelemetry.io/otel/metric"
@@ -103,11 +103,8 @@ func TestStepMetrics_Instruments(t *testing.T) {
metric.WithAttributes(telemetry.AttrStep.String("test-step"), telemetry.AttrModule.String("test-module")))
}, "StepErrorsTotal.Add should not panic")
// Verify metrics are exposed via HTTP handler
rec := httptest.NewRecorder()
req := httptest.NewRequest("GET", "/metrics", nil)
provider.MetricsHandler.ServeHTTP(rec, req)
assert.Equal(t, 200, rec.Code, "Metrics endpoint should return 200")
// MeterProvider is set by NewTestProvider; metrics are recorded via OTel SDK
assert.NotNil(t, provider.MeterProvider, "MeterProvider should be set")
}
func TestStepMetrics_MultipleCalls(t *testing.T) {
@@ -129,3 +126,113 @@ func TestStepMetrics_MultipleCalls(t *testing.T) {
}
}
func TestStepMetrics_RecordWithDifferentAttributes(t *testing.T) {
ctx := context.Background()
provider, err := telemetry.NewTestProvider(ctx)
require.NoError(t, err)
defer provider.Shutdown(context.Background())
metrics, err := GetStepMetrics(ctx)
require.NoError(t, err)
require.NotNil(t, metrics)
attrsList := []struct {
step string
module string
}{
{"test-step", "test-module"},
{"", "module-only"},
{"step-only", ""},
{"", ""},
{"long-step-name-with-many-parts", "long-module-name"},
}
for _, a := range attrsList {
attrs := metric.WithAttributes(
telemetry.AttrStep.String(a.step),
telemetry.AttrModule.String(a.module),
)
require.NotPanics(t, func() {
metrics.StepExecutionDuration.Record(ctx, 0.01, attrs)
metrics.StepExecutionTotal.Add(ctx, 1, attrs)
metrics.StepErrorsTotal.Add(ctx, 0, attrs)
}, "Recording with step=%q module=%q should not panic", a.step, a.module)
}
}
func TestStepMetrics_DurationValues(t *testing.T) {
ctx := context.Background()
provider, err := telemetry.NewTestProvider(ctx)
require.NoError(t, err)
defer provider.Shutdown(context.Background())
metrics, err := GetStepMetrics(ctx)
require.NoError(t, err)
require.NotNil(t, metrics)
attrs := metric.WithAttributes(
telemetry.AttrStep.String("test-step"),
telemetry.AttrModule.String("test-module"),
)
durations := []float64{0, 0.0005, 0.001, 0.01, 0.1, 0.5}
for _, d := range durations {
d := d
require.NotPanics(t, func() {
metrics.StepExecutionDuration.Record(ctx, d, attrs)
}, "StepExecutionDuration.Record(%.4f) should not panic", d)
}
}
func TestStepMetrics_ConcurrentRecord(t *testing.T) {
ctx := context.Background()
provider, err := telemetry.NewTestProvider(ctx)
require.NoError(t, err)
defer provider.Shutdown(context.Background())
metrics, err := GetStepMetrics(ctx)
require.NoError(t, err)
require.NotNil(t, metrics)
attrs := metric.WithAttributes(
telemetry.AttrStep.String("concurrent-step"),
telemetry.AttrModule.String("concurrent-module"),
)
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
metrics.StepExecutionDuration.Record(ctx, 0.05, attrs)
metrics.StepExecutionTotal.Add(ctx, 1, attrs)
metrics.StepErrorsTotal.Add(ctx, 0, attrs)
}()
}
wg.Wait()
}
func TestStepMetrics_WithTraceProvider(t *testing.T) {
ctx := context.Background()
provider, sr, err := telemetry.NewTestProviderWithTrace(ctx)
require.NoError(t, err)
require.NotNil(t, provider)
require.NotNil(t, sr)
defer provider.Shutdown(ctx)
metrics, err := GetStepMetrics(ctx)
require.NoError(t, err)
require.NotNil(t, metrics)
assert.NotNil(t, provider.MeterProvider, "MeterProvider should be set")
assert.NotNil(t, provider.TraceProvider, "TraceProvider should be set")
attrs := metric.WithAttributes(
telemetry.AttrStep.String("trace-test-step"),
telemetry.AttrModule.String("trace-test-module"),
)
require.NotPanics(t, func() {
metrics.StepExecutionDuration.Record(ctx, 0.1, attrs)
metrics.StepExecutionTotal.Add(ctx, 1, attrs)
metrics.StepErrorsTotal.Add(ctx, 0, attrs)
}, "Step metrics should work when trace provider is also set")
}