From ab891027112db508eb6a2a3f6da6199f45aa67fd Mon Sep 17 00:00:00 2001 From: Manendra Pal Singh Date: Mon, 23 Feb 2026 16:08:44 +0530 Subject: [PATCH] Feat: configure audit fields and metrics for onix adapter and add local configuration for onix adapterZ --- config/onix/adapter.local.yaml | 221 ++++++++ config/onix/adapter.yaml | 212 +++---- config/onix/audit-fields.yaml | 24 + config/onix/bapTxnCaller-routing.yaml | 48 +- config/onix/bapTxnReciever-routing.yaml | 51 +- core/module/handler/http_metric.go | 120 ++++ core/module/handler/stdHandler.go | 110 +++- core/module/handler/step.go | 66 ++- core/module/handler/step_instrumentor.go | 34 +- core/module/handler/step_metrics.go | 6 +- core/module/handler/step_metrics_test.go | 119 +++- go.mod | 38 +- go.sum | 100 +++- pkg/log/log.go | 6 + pkg/model/model.go | 4 + pkg/plugin/implementation/cache/cache.go | 19 +- .../implementation/otelsetup/cmd/plugin.go | 82 ++- .../implementation/otelsetup/otelsetup.go | 225 +++++--- .../otelsetup/otelsetup_test.go | 100 +++- .../reqpreprocessor/reqpreprocessor.go | 14 + .../simplekeymanager/simplekeymanager.go | 52 +- pkg/plugin/manager.go | 4 +- pkg/telemetry/audit.go | 56 ++ pkg/telemetry/audit_fields.go | 216 ++++++++ pkg/telemetry/audit_fields_test.go | 518 ++++++++++++++++++ pkg/telemetry/metrics_test.go | 31 +- pkg/telemetry/pluginMetrics.go | 64 ++- pkg/telemetry/telemetry.go | 15 +- pkg/telemetry/test_helper.go | 53 +- 29 files changed, 2167 insertions(+), 441 deletions(-) create mode 100644 config/onix/adapter.local.yaml create mode 100644 config/onix/audit-fields.yaml create mode 100644 core/module/handler/http_metric.go create mode 100644 pkg/telemetry/audit.go create mode 100644 pkg/telemetry/audit_fields.go create mode 100644 pkg/telemetry/audit_fields_test.go diff --git a/config/onix/adapter.local.yaml b/config/onix/adapter.local.yaml new file mode 100644 index 0000000..ebaff50 --- /dev/null +++ b/config/onix/adapter.local.yaml @@ -0,0 +1,221 @@ +appName: "onix" +log: + level: debug + destinations: + - type: stdout + contextKeys: + - transaction_id + - message_id + - subscriber_id + - module_id +http: + port: 8080 + timeout: + read: 30 + write: 30 + idle: 30 +pluginManager: + root: ./plugins + remoteRoot: "" +modules: + - name: bapTxnReciever + path: /bap/reciever/ + handler: + type: std + role: bap + httpClientConfig: + maxIdleConns: 1000 + maxIdleConnsPerHost: 200 + idleConnTimeout: 300s + responseHeaderTimeout: 5s + plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms + keyManager: + id: secretskeymanager + config: + projectID: ${projectID} + cache: + id: redis + config: + addr: 10.81.192.4:6379 + schemaValidator: + id: schemavalidator + config: + schemaDir: /mnt/gcs/configs/schemas + signValidator: + id: signvalidator + publisher: + id: publisher + config: + project: ${projectID} + topic: bapNetworkReciever + router: + id: router + config: + routingConfigPath: /mnt/gcs/configs/bapTxnReciever-routing.yaml + middleware: + - id: reqpreprocessor + config: + contextKeys: transaction_id,message_id + role: bap + steps: + - validateSign + - addRoute + - validateSchema + - name: bapTxnCaller + path: /bap/caller/ + handler: + type: std + role: bap + httpClientConfig: + maxIdleConns: 1000 + maxIdleConnsPerHost: 200 + idleConnTimeout: 300s + responseHeaderTimeout: 5s + plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms + keyManager: + id: secretskeymanager + config: + projectID: ${projectID} + cache: + id: redis + config: + addr: 192.168.1.1:6379 + schemaValidator: + id: schemavalidator + config: + schemaDir: /mnt/gcs/configs/schemas + signer: + id: signer + publisher: + id: publisher + config: + project: ${projectID} + topic: bapNetworkReciever + router: + id: router + config: + routingConfigPath: /mnt/gcs/configs/bapTxnCaller-routing.yaml + middleware: + - id: reqpreprocessor + config: + contextKeys: transaction_id,message_id + role: bap + steps: + - validateSchema + - addRoute + - sign + - name: bppTxnReciever + path: /bpp/reciever/ + handler: + type: std + role: bpp + subscriberId: bpp1 + httpClientConfig: + maxIdleConns: 1000 + maxIdleConnsPerHost: 200 + idleConnTimeout: 300s + responseHeaderTimeout: 5s + plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms + keyManager: + id: secretskeymanager + config: + projectID: ${projectID} + cache: + id: redis + config: + addr: 192.168.1.1:6379 + schemaValidator: + id: schemavalidator + config: + schemaDir: /mnt/gcs/configs/schemas + signValidator: + id: signvalidator + publisher: + id: publisher + config: + project: ${projectID} + topic: bapNetworkReciever + router: + id: router + config: + routingConfigPath: /mnt/gcs/configs/bppTxnReciever-routing.yaml + middleware: + - id: reqpreprocessor + config: + contextKeys: transaction_id,message_id + role: bpp + steps: + - validateSign + - addRoute + - validateSchema + - name: bppTxnCaller + path: /bpp/caller/ + handler: + type: std + role: bpp + httpClientConfig: + maxIdleConns: 1000 + maxIdleConnsPerHost: 200 + idleConnTimeout: 300s + responseHeaderTimeout: 5s + plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms + keyManager: + id: secretskeymanager + config: + projectID: ${projectID} + cache: + id: redis + config: + addr: 192.168.1.1:6379 + schemaValidator: + id: schemavalidator + config: + schemaDir: /mnt/gcs/configs/schemas + signer: + id: signer + publisher: + id: publisher + config: + project: ${projectID} + topic: bapNetworkReciever + router: + id: router + config: + routingConfigPath: /mnt/gcs/configs/bppTxnCaller-routing.yaml + middleware: + - id: reqpreprocessor + config: + contextKeys: transaction_id,message_id + role: bpp + steps: + - validateSchema + - addRoute + - sign diff --git a/config/onix/adapter.yaml b/config/onix/adapter.yaml index 403f616..4349e57 100644 --- a/config/onix/adapter.yaml +++ b/config/onix/adapter.yaml @@ -1,4 +1,4 @@ -appName: "onix" +appName: "onix-ev-charging" log: level: debug destinations: @@ -8,21 +8,49 @@ log: - message_id - subscriber_id - module_id + - parent_id + + +# OpenTelemetry (OTLP) - metrics and traces sent to OTEL collector, then to Loki/backend +plugins: + otelsetup: + id: otelsetup + config: + serviceName: "onix-ev-charging-bap" + serviceVersion: "1.0.0" + environment: "development" + domain: "ev_charging" + otlpEndpoint: "otel-collector:4317" + enableMetrics: "true" + networkMetricsGranularity: "2min" + networkMetricsFrequency: "4min" + enableTracing: "true" + enableLogs: "true" + timeInterval: "5" + auditFieldsConfig: "/app/config/audit-fields.yaml" + + +# this is the port for the bap plugin where bap app can dump the requests to the plugin http: - port: 8080 + port: 8001 timeout: read: 30 write: 30 idle: 30 + pluginManager: root: /app/plugins - remoteRoot: /mnt/gcs/plugins/plugins_bundle.zip + modules: - - name: bapTxnReciever - path: /bap/reciever/ + # BAP Receiver - Receives callbacks from CDS (Phase 1) and BPPs (Phase 2+) + # Phase 1: Receives on_search from CDS with aggregated catalog + # Phase 2+: Receives callbacks from BPPs (on_select, on_init, on_confirm, etc.) + - name: bapTxnReceiver + path: /bap/receiver/ handler: type: std role: bap + subscriberId: ev-charging.sandbox1.com httpClientConfig: maxIdleConns: 1000 maxIdleConnsPerHost: 200 @@ -32,47 +60,55 @@ modules: registry: id: registry config: - url: http://localhost:8080/reg + url: http://mock-registry:3030 retry_max: 3 retry_wait_min: 100ms retry_wait_max: 500ms keyManager: - id: secretskeymanager + id: simplekeymanager config: - projectID: ${projectID} + networkParticipant: example-bap.com + keyId: bap-key-1 + signingPrivateKey: xnKF3BIg3Ei+ZEvxBtK0Mm4GRG1Mr0+K9IrxT6CnHEE= + signingPublicKey: MKA6fln8vmU2Qn80Y7dLzagpaPNqQWOlvGglMo5s0IU= + encrPrivateKey: xnKF3BIg3Ei+ZEvxBtK0Mm4GRG1Mr0+K9IrxT6CnHEE= + encrPublicKey: MKA6fln8vmU2Qn80Y7dLzagpaPNqQWOlvGglMo5s0IU= cache: - id: redis + id: cache config: - addr: 10.81.192.4:6379 + addr: redis-bap:6379 schemaValidator: - id: schemavalidator + id: schemav2validator config: - schemaDir: /mnt/gcs/configs/schemas + type: url + location: https://raw.githubusercontent.com/beckn/protocol-specifications-new/refs/heads/main/api/beckn.yaml + cacheTTL: "3600" signValidator: id: signvalidator - publisher: - id: publisher - config: - project: ${projectID} - topic: bapNetworkReciever router: id: router config: - routingConfigPath: /mnt/gcs/configs/bapTxnReciever-routing.yaml + routingConfig: /app/config/bapTxnReciever-routing.yaml middleware: - id: reqpreprocessor config: - contextKeys: transaction_id,message_id + contextKeys: transaction_id,message_id,parent_id role: bap steps: - validateSign - addRoute - validateSchema + + # BAP Caller - Entry point for all requests from BAP + # Phase 1: Routes search to external CDS for aggregation + # Phase 2+: Routes other requests directly to BPP (bypasses CDS) + # Uses bpp_uri from context for dynamic routing in Phase 2+ - name: bapTxnCaller path: /bap/caller/ handler: type: std role: bap + subscriberId: ev-charging.sandbox1.com httpClientConfig: maxIdleConns: 1000 maxIdleConnsPerHost: 200 @@ -82,140 +118,42 @@ modules: registry: id: registry config: - url: http://localhost:8080/reg + url: http://mock-registry:3030 retry_max: 3 retry_wait_min: 100ms retry_wait_max: 500ms keyManager: - id: secretskeymanager + id: simplekeymanager config: - projectID: ${projectID} + networkParticipant: example-bap.com + keyId: bap-key-1 + signingPrivateKey: xnKF3BIg3Ei+ZEvxBtK0Mm4GRG1Mr0+K9IrxT6CnHEE= + signingPublicKey: MKA6fln8vmU2Qn80Y7dLzagpaPNqQWOlvGglMo5s0IU= + encrPrivateKey: xnKF3BIg3Ei+ZEvxBtK0Mm4GRG1Mr0+K9IrxT6CnHEE= + encrPublicKey: MKA6fln8vmU2Qn80Y7dLzagpaPNqQWOlvGglMo5s0IU= cache: - id: redis + id: cache config: - addr: 192.168.1.1:6379 + addr: redis-bap:6379 schemaValidator: - id: schemavalidator + id: schemav2validator config: - schemaDir: /mnt/gcs/configs/schemas - signer: - id: signer - publisher: - id: publisher - config: - project: ${projectID} - topic: bapNetworkReciever + type: url + location: https://raw.githubusercontent.com/beckn/protocol-specifications-new/refs/heads/main/api/beckn.yaml + cacheTTL: "3600" router: id: router config: - routingConfigPath: /mnt/gcs/configs/bapTxnCaller-routing.yaml + routingConfig: /app/config/bapTxnCaller-routing.yaml + signer: + id: signer middleware: - id: reqpreprocessor config: - contextKeys: transaction_id,message_id + contextKeys: transaction_id,message_id,parent_id role: bap + steps: - validateSchema - addRoute - - sign - - name: bppTxnReciever - path: /bpp/reciever/ - handler: - type: std - role: bpp - subscriberId: bpp1 - httpClientConfig: - maxIdleConns: 1000 - maxIdleConnsPerHost: 200 - idleConnTimeout: 300s - responseHeaderTimeout: 5s - plugins: - registry: - id: registry - config: - url: http://localhost:8080/reg - retry_max: 3 - retry_wait_min: 100ms - retry_wait_max: 500ms - keyManager: - id: secretskeymanager - config: - projectID: ${projectID} - cache: - id: redis - config: - addr: 192.168.1.1:6379 - schemaValidator: - id: schemavalidator - config: - schemaDir: /mnt/gcs/configs/schemas - signValidator: - id: signvalidator - publisher: - id: publisher - config: - project: ${projectID} - topic: bapNetworkReciever - router: - id: router - config: - routingConfigPath: /mnt/gcs/configs/bppTxnReciever-routing.yaml - middleware: - - id: reqpreprocessor - config: - contextKeys: transaction_id,message_id - role: bpp - steps: - - validateSign - - addRoute - - validateSchema - - name: bppTxnCaller - path: /bpp/caller/ - handler: - type: std - role: bpp - httpClientConfig: - maxIdleConns: 1000 - maxIdleConnsPerHost: 200 - idleConnTimeout: 300s - responseHeaderTimeout: 5s - plugins: - registry: - id: registry - config: - url: http://localhost:8080/reg - retry_max: 3 - retry_wait_min: 100ms - retry_wait_max: 500ms - keyManager: - id: secretskeymanager - config: - projectID: ${projectID} - cache: - id: redis - config: - addr: 192.168.1.1:6379 - schemaValidator: - id: schemavalidator - config: - schemaDir: /mnt/gcs/configs/schemas - signer: - id: signer - publisher: - id: publisher - config: - project: ${projectID} - topic: bapNetworkReciever - router: - id: router - config: - routingConfigPath: /mnt/gcs/configs/bppTxnCaller-routing.yaml - middleware: - - id: reqpreprocessor - config: - contextKeys: transaction_id,message_id - role: bpp - steps: - - validateSchema - - addRoute - - sign + - sign \ No newline at end of file diff --git a/config/onix/audit-fields.yaml b/config/onix/audit-fields.yaml new file mode 100644 index 0000000..3e332a2 --- /dev/null +++ b/config/onix/audit-fields.yaml @@ -0,0 +1,24 @@ +auditRules: + default: + - context.transaction_id + - context.message_id + - context.action + - context.domain + - context.bap_id + - context.bpp_id + + search: + - context.transaction_id + - context.message_id + - context.action + - context.timestamp + - message.intent + + select: + - context.transaction_id + - context.message_id + - context.action + - context.timestamp + - message.order.beckn:buyer.beckn:id + - message.order.beckn:orderItems.beckn:acceptedOffer.beckn:id + diff --git a/config/onix/bapTxnCaller-routing.yaml b/config/onix/bapTxnCaller-routing.yaml index b1d5a44..404d0fe 100644 --- a/config/onix/bapTxnCaller-routing.yaml +++ b/config/onix/bapTxnCaller-routing.yaml @@ -1,25 +1,41 @@ +# ONIX BAP Caller Routing Configuration + +# Supports Phase 1 (Discover Aggregation) and Phase 2+ (Direct BPP Routing) + +# Phase 1: Discover (Aggregation via CDS) + +# Phase 2+: Other Requests (Direct to BPP, NO CDS involvement) + +# These routes use bpp_uri from context (provided in on_discover aggregated response) + routingRules: - - domain: "ONDC:TRV10" - version: "2.0.0" - routingType: "bpp" + + # Phase 1: Discover to CDS + + - domain: ev_charging_network + version: "1.0.0" + targetType: url target: - url: "https://gateway.example.com" + url: http://mock-cds:8082/csd + excludeAction: false endpoints: - - search - - domain: "ONDC:TRV10" - version: "2.0.0" - routingType: "bpp" + - discover + + + + # Phase 2+: Other actions to BPP (via context_endpoint) + + - domain: ev_charging_network + version: "1.0.0" + targetType: bpp + target: {} endpoints: - select - init - confirm - status + - track - cancel - - domain: "ONDC:TRV12" - version: "2.0.0" - routingType: "bpp" - endpoints: - - select - - init - - confirm - - status \ No newline at end of file + - update + - rating + - support diff --git a/config/onix/bapTxnReciever-routing.yaml b/config/onix/bapTxnReciever-routing.yaml index ca4a478..dfdaa81 100644 --- a/config/onix/bapTxnReciever-routing.yaml +++ b/config/onix/bapTxnReciever-routing.yaml @@ -1,20 +1,47 @@ +# ONIX BAP Receiver Routing Configuration + +# Supports Phase 1 (Discover Aggregation) and Phase 2+ (Direct BPP Callbacks) + + + +# Phase 1: Discover Aggregation + +# Phase 2+: Other Callbacks (Direct from BPPs to BAP, NO CDS involvement) + +# These routes use bap_uri from context to route callbacks back to originating BAP + + + routingRules: - - domain: "ONDC:TRV10" - version: "2.0.0" - routingType: "url" + + # Phase 1: on_discover callback to BAP (routed to mock-bap for testing) + - domain: ev_charging_network + version: "1.0.0" + targetType: url target: - url: "https://services-backend/trv/v1" + url: http://mock-bap:9001 + excludeAction: false + endpoints: + - on_discover + + + + + # Phase 2+: Other callbacks to BAP (routed to mock-bap for testing) + + - domain: ev_charging_network + version: "1.0.0" + targetType: url + target: + url: http://mock-bap:9001 + excludeAction: false endpoints: - on_select - on_init - on_confirm - on_status - - on_update + - on_track - on_cancel - - domain: "ONDC:TRV10" - version: "2.0.0" - routingType: "msgq" - target: - topic_id: "trv_topic_id1" - endpoints: - - on_search \ No newline at end of file + - on_update + - on_rating + - on_support diff --git a/core/module/handler/http_metric.go b/core/module/handler/http_metric.go new file mode 100644 index 0000000..54e3323 --- /dev/null +++ b/core/module/handler/http_metric.go @@ -0,0 +1,120 @@ +package handler + +import ( + "context" + "fmt" + "net/http" + "strconv" + "sync" + "time" + + "github.com/beckn-one/beckn-onix/pkg/telemetry" + "github.com/google/uuid" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +type HTTPMetrics struct { + HttpRequestCount metric.Int64Counter +} + +var ( + httlMetricsInstance *HTTPMetrics + httpMetricsOnce sync.Once + httpMetricsErr error +) + +func newHTTPMetrics() (*HTTPMetrics, error) { + + meter := otel.GetMeterProvider().Meter(telemetry.ScopeName, + metric.WithInstrumentationVersion(telemetry.ScopeVersion)) + m := &HTTPMetrics{} + var err error + + if m.HttpRequestCount, err = meter.Int64Counter( + "onix_http_request_count", + metric.WithDescription("Total HTTP requests by status, route, method, role and calle "), + metric.WithUnit("1"), + ); err != nil { + return nil, fmt.Errorf("onix_http_request_count: %w", err) + } + + return m, nil +} + +func GetHTTPMetrics(ctx context.Context) (*HTTPMetrics, error) { + httpMetricsOnce.Do(func() { + httlMetricsInstance, httpMetricsErr = newHTTPMetrics() + }) + return httlMetricsInstance, httpMetricsErr +} + +// StatusClass returns the HTTP status class string (e.g. 200 -> "2xx"). +func StatusClass(statusCode int) string { + switch { + case statusCode >= 100 && statusCode < 200: + return "1xx" + case statusCode >= 200 && statusCode < 300: + return "2xx" + case statusCode >= 300 && statusCode < 400: + return "3xx" + case statusCode >= 400 && statusCode < 500: + return "4xx" + default: + return "5xx" + } +} + +func RecordHTTPRequest(ctx context.Context, statusCode int, action, role, caller string) { + m, err := GetHTTPMetrics(ctx) + if err != nil || m == nil { + return + } + status := StatusClass(statusCode) + attributes := []attribute.KeyValue{ + telemetry.AttrHTTPStatus.String(status), + telemetry.AttrAction.String(action), + telemetry.AttrRole.String(role), + telemetry.AttrCaller.String(caller), + } + + metric_code := action + "_api_total_count" + category := "NetworkHealth" + if action == "/search" || action == "/discovery" { + category = "Discovery" + } + attributes = append(attributes, specHttpMetricAttr(metric_code, category)...) //TODO: need to update as per the furthur discussion + m.HttpRequestCount.Add(ctx, 1, metric.WithAttributes(attributes...)) +} + +type responseRecorder struct { + http.ResponseWriter + statusCode int + written bool + record func() +} + +func (r *responseRecorder) WriteHeader(statusCode int) { + if !r.written { + r.written = true + r.statusCode = statusCode + if r.record != nil { + r.record() + } + } + r.ResponseWriter.WriteHeader(statusCode) +} + +func specHttpMetricAttr(metricCode, category string) []attribute.KeyValue { + + granularity, frequency := telemetry.GetNetworkMetricsConfig() + return []attribute.KeyValue{ + telemetry.AttrMetricUUID.String(uuid.New().String()), + telemetry.AttrMetricCode.String(metricCode), + telemetry.AttrMetricCategory.String(category), + telemetry.AttrMetricGranularity.String(granularity), + telemetry.AttrMetricFrequency.String(frequency), + telemetry.AttrObservedTimeUnixNano.String(strconv.FormatInt(time.Now().UnixNano(), 10)), + } +} diff --git a/core/module/handler/stdHandler.go b/core/module/handler/stdHandler.go index 9e9fefc..ab50ffb 100644 --- a/core/module/handler/stdHandler.go +++ b/core/module/handler/stdHandler.go @@ -7,12 +7,22 @@ import ( "io" "net/http" "net/http/httputil" + "strconv" + "time" "github.com/beckn-one/beckn-onix/pkg/log" "github.com/beckn-one/beckn-onix/pkg/model" "github.com/beckn-one/beckn-onix/pkg/plugin" "github.com/beckn-one/beckn-onix/pkg/plugin/definition" "github.com/beckn-one/beckn-onix/pkg/response" + "github.com/beckn-one/beckn-onix/pkg/telemetry" + "github.com/google/uuid" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + auditlog "go.opentelemetry.io/otel/log" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" ) // stdHandler orchestrates the execution of defined processing steps. @@ -94,31 +104,74 @@ func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { r.Header.Del("X-Role") }() - ctx, err := h.stepCtx(r, w.Header()) + // to start a new trace + propagator := otel.GetTextMapPropagator() + traceCtx := propagator.Extract(r.Context(), propagation.HeaderCarrier(r.Header)) + tracer := otel.Tracer(telemetry.ScopeName, trace.WithInstrumentationVersion(telemetry.ScopeVersion)) + spanName := r.URL.Path + traceCtx, span := tracer.Start(traceCtx, spanName, trace.WithSpanKind(trace.SpanKindServer)) + + //to build the request with trace + r = r.WithContext(traceCtx) + + var recordOnce func() + wrapped := &responseRecorder{ + ResponseWriter: w, + statusCode: http.StatusOK, + record: nil, + } + + caller := "unknown" + if v, ok := r.Context().Value(model.ContextKeyCallerID).(string); ok && v != "" { + caller = v + } + httpMeter, _ := GetHTTPMetrics(r.Context()) + if httpMeter != nil { + recordOnce = func() { + RecordHTTPRequest(r.Context(), wrapped.statusCode, r.URL.Path, string(h.role), caller) + } + wrapped.record = recordOnce + } + + // set beckn attribute + setBecknAttr(span, r, h) + + stepCtx, err := h.stepCtx(r, w.Header()) if err != nil { log.Errorf(r.Context(), err, "stepCtx(r):%v", err) - response.SendNack(r.Context(), w, err) + response.SendNack(r.Context(), wrapped, err) return } - log.Request(r.Context(), r, ctx.Body) + log.Request(r.Context(), r, stepCtx.Body) + + defer func() { + span.SetAttributes(attribute.Int("http.response.status_code", wrapped.statusCode), attribute.String("observedTimeUnixNano", strconv.FormatInt(time.Now().UnixNano(), 10))) + if wrapped.statusCode < 200 || wrapped.statusCode >= 400 { + span.SetStatus(codes.Error, "status code is invalid") + } + + body := stepCtx.Body + go telemetry.EmitAuditLogs(r.Context(), body, auditlog.Int("http.response.status_code", wrapped.statusCode)) + span.End() + }() // Execute processing steps. for _, step := range h.steps { - if err := step.Run(ctx); err != nil { - log.Errorf(ctx, err, "%T.run():%v", step, err) - response.SendNack(ctx, w, err) + if err := step.Run(stepCtx); err != nil { + log.Errorf(stepCtx, err, "%T.run():%v", step, err) + response.SendNack(stepCtx, wrapped, err) return } } // Restore request body before forwarding or publishing. - r.Body = io.NopCloser(bytes.NewReader(ctx.Body)) - if ctx.Route == nil { + r.Body = io.NopCloser(bytes.NewReader(stepCtx.Body)) + if stepCtx.Route == nil { response.SendAck(w) return } // Handle routing based on the defined route type. - route(ctx, r, w, h.publisher, h.httpClient) + route(stepCtx, r, wrapped, h.publisher, h.httpClient) } // stepCtx creates a new StepContext for processing an HTTP request. @@ -321,3 +374,42 @@ func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Conf log.Infof(ctx, "Processor steps initialized: %v", cfg.Steps) return nil } + +func setBecknAttr(span trace.Span, r *http.Request, h *stdHandler) { + recipientID := h.SubscriberID + + if v, ok := r.Context().Value(model.ContextKeySubscriberID).(string); ok { + recipientID = v + } + senderID := "" + if v, ok := r.Context().Value(model.ContextKeyCallerID).(string); ok { + senderID = v + } + + attrs := []attribute.KeyValue{ + attribute.String("recipient.id", recipientID), + attribute.String("sender.id", senderID), + attribute.String("span_uuid", uuid.New().String()), + attribute.String("http.request.method", r.Method), + attribute.String("http.route", r.URL.Path), + } + + if trxID, ok := r.Context().Value(model.ContextKeyTxnID).(string); ok { + attrs = append(attrs, attribute.String("transaction_id", trxID)) + } + if mesID, ok := r.Context().Value(model.ContextKeyMsgID).(string); ok { + attrs = append(attrs, attribute.String("message_id", mesID)) + } + if parentID, ok := r.Context().Value(model.ContextKeyParentID).(string); ok && parentID != "" { + attrs = append(attrs, attribute.String("parentSpanId", parentID)) + } + if r.Host != "" { + attrs = append(attrs, attribute.String("server.address", r.Host)) + } + + if ua := r.UserAgent(); ua != "" { + attrs = append(attrs, attribute.String("user_agent.original", ua)) + } + + span.SetAttributes(attrs...) +} diff --git a/core/module/handler/step.go b/core/module/handler/step.go index f985031..2ea061f 100644 --- a/core/module/handler/step.go +++ b/core/module/handler/step.go @@ -7,7 +7,9 @@ import ( "strings" "time" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" "github.com/beckn-one/beckn-onix/pkg/log" "github.com/beckn-one/beckn-onix/pkg/model" @@ -38,24 +40,42 @@ func (s *signStep) Run(ctx *model.StepContext) error { if len(ctx.SubID) == 0 { return model.NewBadReqErr(fmt.Errorf("subscriberID not set")) } - keySet, err := s.km.Keyset(ctx, ctx.SubID) - if err != nil { - return fmt.Errorf("failed to get signing key: %w", err) - } - createdAt := time.Now().Unix() - validTill := time.Now().Add(5 * time.Minute).Unix() - sign, err := s.signer.Sign(ctx, ctx.Body, keySet.SigningPrivate, createdAt, validTill) - if err != nil { - return fmt.Errorf("failed to sign request: %w", err) + + tracer := otel.Tracer("beckn-onix") + + var keySet *model.Keyset + { + // to create span to finding the key set + keySetCtx, keySetSpan := tracer.Start(ctx.Context, "keyset") + defer keySetSpan.End() + ks, err := s.km.Keyset(keySetCtx, ctx.SubID) + if err != nil { + return fmt.Errorf("failed to get signing key: %w", err) + } + keySet = ks + } - authHeader := s.generateAuthHeader(ctx.SubID, keySet.UniqueKeyID, createdAt, validTill, sign) - log.Debugf(ctx, "Signature generated: %v", sign) - header := model.AuthHeaderSubscriber - if ctx.Role == model.RoleGateway { - header = model.AuthHeaderGateway + { + // to create span for the signa + signerCtx, signerSpan := tracer.Start(ctx.Context, "sign") + defer signerSpan.End() + createdAt := time.Now().Unix() + validTill := time.Now().Add(5 * time.Minute).Unix() + sign, err := s.signer.Sign(signerCtx, ctx.Body, keySet.SigningPrivate, createdAt, validTill) + if err != nil { + return fmt.Errorf("failed to sign request: %w", err) + } + authHeader := s.generateAuthHeader(ctx.SubID, keySet.UniqueKeyID, createdAt, validTill, sign) + log.Debugf(ctx, "Signature generated: %v", sign) + header := model.AuthHeaderSubscriber + if ctx.Role == model.RoleGateway { + header = model.AuthHeaderGateway + } + ctx.Request.Header.Set(header, authHeader) + } - ctx.Request.Header.Set(header, authHeader) + return nil } @@ -93,8 +113,20 @@ func newValidateSignStep(signValidator definition.SignValidator, km definition.K // Run executes the validation step. func (s *validateSignStep) Run(ctx *model.StepContext) error { - err := s.validateHeaders(ctx) - s.recordMetrics(ctx, err) + tracer := otel.Tracer(telemetry.ScopeName, trace.WithInstrumentationVersion(telemetry.ScopeVersion)) + spanCtx, span := tracer.Start(ctx.Context, "validate-sign") + defer span.End() + stepCtx := &model.StepContext{ + Context: spanCtx, + Request: ctx.Request, + Body: ctx.Body, + Role: ctx.Role, + SubID: ctx.SubID, + RespHeader: ctx.RespHeader, + Route: ctx.Route, + } + err := s.validateHeaders(stepCtx) + s.recordMetrics(stepCtx, err) return err } diff --git a/core/module/handler/step_instrumentor.go b/core/module/handler/step_instrumentor.go index 0869304..8b1787f 100644 --- a/core/module/handler/step_instrumentor.go +++ b/core/module/handler/step_instrumentor.go @@ -6,8 +6,10 @@ import ( "fmt" "time" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" "github.com/beckn-one/beckn-onix/pkg/log" "github.com/beckn-one/beckn-onix/pkg/model" @@ -52,18 +54,34 @@ func (is *InstrumentedStep) Run(ctx *model.StepContext) error { return is.step.Run(ctx) } + tracer := otel.Tracer(telemetry.ScopeName, trace.WithInstrumentationVersion(telemetry.ScopeVersion)) + stepName := "step:" + is.stepName + spanCtx, span := tracer.Start(ctx.Context, stepName) + defer span.End() + + // run step with context that contains the step span + stepCtx := &model.StepContext{ + Context: spanCtx, + Request: ctx.Request, + Body: ctx.Body, + Role: ctx.Role, + SubID: ctx.SubID, + RespHeader: ctx.RespHeader, + Route: ctx.Route, + } + start := time.Now() - err := is.step.Run(ctx) + err := is.step.Run(stepCtx) duration := time.Since(start).Seconds() attrs := []attribute.KeyValue{ telemetry.AttrModule.String(is.moduleName), telemetry.AttrStep.String(is.stepName), - telemetry.AttrRole.String(string(ctx.Role)), + telemetry.AttrRole.String(string(stepCtx.Role)), } - is.metrics.StepExecutionTotal.Add(ctx.Context, 1, metric.WithAttributes(attrs...)) - is.metrics.StepExecutionDuration.Record(ctx.Context, duration, metric.WithAttributes(attrs...)) + is.metrics.StepExecutionTotal.Add(stepCtx.Context, 1, metric.WithAttributes(attrs...)) + is.metrics.StepExecutionDuration.Record(stepCtx.Context, duration, metric.WithAttributes(attrs...)) if err != nil { errorType := fmt.Sprintf("%T", err) @@ -75,10 +93,12 @@ func (is *InstrumentedStep) Run(ctx *model.StepContext) error { } errorAttrs := append(attrs, telemetry.AttrErrorType.String(errorType)) - is.metrics.StepErrorsTotal.Add(ctx.Context, 1, metric.WithAttributes(errorAttrs...)) - log.Errorf(ctx.Context, err, "Step %s failed", is.stepName) + is.metrics.StepErrorsTotal.Add(stepCtx.Context, 1, metric.WithAttributes(errorAttrs...)) + log.Errorf(stepCtx.Context, err, "Step %s failed", is.stepName) } + if stepCtx.Route != nil { + ctx.Route = stepCtx.Route + } return err } - diff --git a/core/module/handler/step_metrics.go b/core/module/handler/step_metrics.go index e3fc418..a4c3f74 100644 --- a/core/module/handler/step_metrics.go +++ b/core/module/handler/step_metrics.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" + "github.com/beckn-one/beckn-onix/pkg/telemetry" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" ) @@ -32,8 +33,8 @@ func GetStepMetrics(ctx context.Context) (*StepMetrics, error) { func newStepMetrics() (*StepMetrics, error) { meter := otel.GetMeterProvider().Meter( - "github.com/beckn-one/beckn-onix/telemetry", - metric.WithInstrumentationVersion("1.0.0"), + telemetry.ScopeName, + metric.WithInstrumentationVersion(telemetry.ScopeVersion), ) m := &StepMetrics{} @@ -66,4 +67,3 @@ func newStepMetrics() (*StepMetrics, error) { return m, nil } - diff --git a/core/module/handler/step_metrics_test.go b/core/module/handler/step_metrics_test.go index 777821b..c2498f6 100644 --- a/core/module/handler/step_metrics_test.go +++ b/core/module/handler/step_metrics_test.go @@ -2,7 +2,7 @@ package handler import ( "context" - "net/http/httptest" + "sync" "testing" "go.opentelemetry.io/otel/metric" @@ -103,11 +103,8 @@ func TestStepMetrics_Instruments(t *testing.T) { metric.WithAttributes(telemetry.AttrStep.String("test-step"), telemetry.AttrModule.String("test-module"))) }, "StepErrorsTotal.Add should not panic") - // Verify metrics are exposed via HTTP handler - rec := httptest.NewRecorder() - req := httptest.NewRequest("GET", "/metrics", nil) - provider.MetricsHandler.ServeHTTP(rec, req) - assert.Equal(t, 200, rec.Code, "Metrics endpoint should return 200") + // MeterProvider is set by NewTestProvider; metrics are recorded via OTel SDK + assert.NotNil(t, provider.MeterProvider, "MeterProvider should be set") } func TestStepMetrics_MultipleCalls(t *testing.T) { @@ -129,3 +126,113 @@ func TestStepMetrics_MultipleCalls(t *testing.T) { } } +func TestStepMetrics_RecordWithDifferentAttributes(t *testing.T) { + ctx := context.Background() + provider, err := telemetry.NewTestProvider(ctx) + require.NoError(t, err) + defer provider.Shutdown(context.Background()) + + metrics, err := GetStepMetrics(ctx) + require.NoError(t, err) + require.NotNil(t, metrics) + + attrsList := []struct { + step string + module string + }{ + {"test-step", "test-module"}, + {"", "module-only"}, + {"step-only", ""}, + {"", ""}, + {"long-step-name-with-many-parts", "long-module-name"}, + } + + for _, a := range attrsList { + attrs := metric.WithAttributes( + telemetry.AttrStep.String(a.step), + telemetry.AttrModule.String(a.module), + ) + require.NotPanics(t, func() { + metrics.StepExecutionDuration.Record(ctx, 0.01, attrs) + metrics.StepExecutionTotal.Add(ctx, 1, attrs) + metrics.StepErrorsTotal.Add(ctx, 0, attrs) + }, "Recording with step=%q module=%q should not panic", a.step, a.module) + } +} + +func TestStepMetrics_DurationValues(t *testing.T) { + ctx := context.Background() + provider, err := telemetry.NewTestProvider(ctx) + require.NoError(t, err) + defer provider.Shutdown(context.Background()) + + metrics, err := GetStepMetrics(ctx) + require.NoError(t, err) + require.NotNil(t, metrics) + + attrs := metric.WithAttributes( + telemetry.AttrStep.String("test-step"), + telemetry.AttrModule.String("test-module"), + ) + + durations := []float64{0, 0.0005, 0.001, 0.01, 0.1, 0.5} + for _, d := range durations { + d := d + require.NotPanics(t, func() { + metrics.StepExecutionDuration.Record(ctx, d, attrs) + }, "StepExecutionDuration.Record(%.4f) should not panic", d) + } +} + +func TestStepMetrics_ConcurrentRecord(t *testing.T) { + ctx := context.Background() + provider, err := telemetry.NewTestProvider(ctx) + require.NoError(t, err) + defer provider.Shutdown(context.Background()) + + metrics, err := GetStepMetrics(ctx) + require.NoError(t, err) + require.NotNil(t, metrics) + + attrs := metric.WithAttributes( + telemetry.AttrStep.String("concurrent-step"), + telemetry.AttrModule.String("concurrent-module"), + ) + + var wg sync.WaitGroup + for i := 0; i < 20; i++ { + wg.Add(1) + go func() { + defer wg.Done() + metrics.StepExecutionDuration.Record(ctx, 0.05, attrs) + metrics.StepExecutionTotal.Add(ctx, 1, attrs) + metrics.StepErrorsTotal.Add(ctx, 0, attrs) + }() + } + wg.Wait() +} + +func TestStepMetrics_WithTraceProvider(t *testing.T) { + ctx := context.Background() + provider, sr, err := telemetry.NewTestProviderWithTrace(ctx) + require.NoError(t, err) + require.NotNil(t, provider) + require.NotNil(t, sr) + defer provider.Shutdown(ctx) + + metrics, err := GetStepMetrics(ctx) + require.NoError(t, err) + require.NotNil(t, metrics) + assert.NotNil(t, provider.MeterProvider, "MeterProvider should be set") + assert.NotNil(t, provider.TraceProvider, "TraceProvider should be set") + + attrs := metric.WithAttributes( + telemetry.AttrStep.String("trace-test-step"), + telemetry.AttrModule.String("trace-test-module"), + ) + require.NotPanics(t, func() { + metrics.StepExecutionDuration.Record(ctx, 0.1, attrs) + metrics.StepExecutionTotal.Add(ctx, 1, attrs) + metrics.StepErrorsTotal.Add(ctx, 0, attrs) + }, "Step metrics should work when trace provider is also set") +} diff --git a/go.mod b/go.mod index b53f3db..2030ae7 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.24.0 require ( github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 - golang.org/x/crypto v0.36.0 + golang.org/x/crypto v0.47.0 ) require github.com/stretchr/testify v1.11.1 @@ -19,18 +19,20 @@ require ( require github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03 -require golang.org/x/text v0.26.0 // indirect +require golang.org/x/text v0.33.0 // indirect require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/go-jose/go-jose/v4 v4.0.1 // indirect + github.com/go-jose/go-jose/v4 v4.1.3 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/swag v0.23.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-rootcerts v1.0.2 // indirect @@ -55,12 +57,17 @@ require ( github.com/redis/go-redis/extra/rediscmd/v9 v9.16.0 // indirect github.com/ryanuber/go-glob v1.0.0 // indirect github.com/woodsbury/decimal128 v1.3.0 // indirect - go.opentelemetry.io/auto/sdk v1.1.0 // indirect - go.opentelemetry.io/otel/trace v1.38.0 // indirect - golang.org/x/net v0.38.0 // indirect - golang.org/x/sys v0.38.0 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.16.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect + go.opentelemetry.io/otel/log v0.16.0 // indirect + go.opentelemetry.io/proto/otlp v1.9.0 // indirect + golang.org/x/net v0.49.0 // indirect + golang.org/x/sys v0.40.0 // indirect golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect - google.golang.org/protobuf v1.32.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect + google.golang.org/protobuf v1.36.11 // indirect ) require ( @@ -74,13 +81,18 @@ require ( github.com/redis/go-redis/extra/redisotel/v9 v9.16.0 github.com/redis/go-redis/v9 v9.16.0 github.com/rs/zerolog v1.34.0 - go.opentelemetry.io/contrib/instrumentation/runtime v0.63.0 - go.opentelemetry.io/otel v1.38.0 + go.opentelemetry.io/contrib/instrumentation/runtime v0.64.0 + go.opentelemetry.io/otel v1.40.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.39.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 go.opentelemetry.io/otel/exporters/prometheus v0.46.0 - go.opentelemetry.io/otel/metric v1.38.0 - go.opentelemetry.io/otel/sdk v1.38.0 - go.opentelemetry.io/otel/sdk/metric v1.38.0 + go.opentelemetry.io/otel/metric v1.40.0 + go.opentelemetry.io/otel/sdk v1.40.0 + go.opentelemetry.io/otel/sdk/log v0.16.0 + go.opentelemetry.io/otel/sdk/metric v1.40.0 + go.opentelemetry.io/otel/trace v1.40.0 go.uber.org/automaxprocs v1.6.0 + google.golang.org/grpc v1.78.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index e684441..0e2eb1a 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= +github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= @@ -24,8 +26,8 @@ github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/getkin/kin-openapi v0.133.0 h1:pJdmNohVIJ97r4AUFtEXRXwESr8b0bD721u/Tz6k8PQ= github.com/getkin/kin-openapi v0.133.0/go.mod h1:boAciF6cXk5FhPqe/NQeBTeenbjqU4LhWBf09ILVvWE= -github.com/go-jose/go-jose/v4 v4.0.1 h1:QVEPDE3OluqXBQZDcnNvQrInro2h0e4eqNbnZSWqS6U= -github.com/go-jose/go-jose/v4 v4.0.1/go.mod h1:WVf9LFMHh/QVrmqrOfqun0C45tMe3RoiKJMPvgWwLfY= +github.com/go-jose/go-jose/v4 v4.1.3 h1:CVLmWDhDVRa6Mi/IgCgaopNosCaHz7zrMeF9MlZRkrs= +github.com/go-jose/go-jose/v4 v4.1.3/go.mod h1:x4oUasVrzR7071A4TnHLGSPpNOm2a21K9Kf04k1rs08= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -38,10 +40,16 @@ github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ github.com/go-test/deep v1.0.8 h1:TDsG77qcSprGbC6vTN8OuXp5g+J+b5Pcguhf7Zt61VM= github.com/go-test/deep v1.0.8/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 h1:NmZ1PKzSTQbuGHw9DGPFomqkkLWMC+vZCkfs+FHv1Vg= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3/go.mod h1:zQrxl1YP88HQlA6i9c63DSVPFklWpGX4OWAc9bFuaH4= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 h1:X+2YciYSxvMQK0UZ7sg45ZVabVZBeBuvMkmuI2V3Fak= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7/go.mod h1:lW34nIZuQ8UDPdkon5fmfp2l3+ZkQ2me/+oecHYLOII= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -125,8 +133,8 @@ github.com/redis/go-redis/extra/redisotel/v9 v9.16.0 h1:+a9h9qxFXdf3gX0FXnDcz7X4 github.com/redis/go-redis/extra/redisotel/v9 v9.16.0/go.mod h1:EtTTC7vnKWgznfG6kBgl9ySLqd7NckRCFUBzVXdeHeI= github.com/redis/go-redis/v9 v9.16.0 h1:OotgqgLSRCmzfqChbQyG1PHC3tLNR89DG4jdOERSEP4= github.com/redis/go-redis/v9 v9.16.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= -github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= -github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= @@ -147,42 +155,78 @@ github.com/woodsbury/decimal128 v1.3.0 h1:8pffMNWIlC0O5vbyHWFZAt5yWvWcrHA+3ovIIj github.com/woodsbury/decimal128 v1.3.0/go.mod h1:C5UTmyTjW3JftjUFzOVhC20BEQa2a4ZKOB5I6Zjb+ds= github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03 h1:m1h+vudopHsI67FPT9MOncyndWhTcdUoBtI1R1uajGY= github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03/go.mod h1:8sheVFH84v3PCyFY/O02mIgSQY9I6wMYPWsq7mDnEZY= -go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= -go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= -go.opentelemetry.io/contrib/instrumentation/runtime v0.63.0 h1:PeBoRj6af6xMI7qCupwFvTbbnd49V7n5YpG6pg8iDYQ= -go.opentelemetry.io/contrib/instrumentation/runtime v0.63.0/go.mod h1:ingqBCtMCe8I4vpz/UVzCW6sxoqgZB37nao91mLQ3Bw= -go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= -go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/contrib/instrumentation/runtime v0.64.0 h1:/+/+UjlXjFcdDlXxKL1PouzX8Z2Vl0OxolRKeBEgYDw= +go.opentelemetry.io/contrib/instrumentation/runtime v0.64.0/go.mod h1:Ldm/PDuzY2DP7IypudopCR3OCOW42NJlN9+mNEroevo= +go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= +go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.16.0 h1:ZVg+kCXxd9LtAaQNKBxAvJ5NpMf7LpvEr4MIZqb0TMQ= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.16.0/go.mod h1:hh0tMeZ75CCXrHd9OXRYxTlCAdxcXioWHFIpYw2rZu8= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.39.0 h1:cEf8jF6WbuGQWUVcqgyWtTR0kOOAWY1DYZ+UhvdmQPw= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.39.0/go.mod h1:k1lzV5n5U3HkGvTCJHraTAGJ7MqsgL1wrGwTj1Isfiw= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 h1:f0cb2XPmrqn4XMy9PNliTgRKJgS5WcL/u0/WRYGz4t0= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0/go.mod h1:vnakAaFckOMiMtOIhFI2MNH4FYrZzXCYxmb1LlhoGz8= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 h1:in9O8ESIOlwJAEGTkkf34DesGRAc/Pn8qJ7k3r/42LM= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0/go.mod h1:Rp0EXBm5tfnv0WL+ARyO/PHBEaEAT8UUHQ6AGJcSq6c= go.opentelemetry.io/otel/exporters/prometheus v0.46.0 h1:I8WIFXR351FoLJYuloU4EgXbtNX2URfU/85pUPheIEQ= go.opentelemetry.io/otel/exporters/prometheus v0.46.0/go.mod h1:ztwVUHe5DTR/1v7PeuGRnU5Bbd4QKYwApWmuutKsJSs= -go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= -go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= -go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= -go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg= -go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM= -go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= -go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= -go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= +go.opentelemetry.io/otel/log v0.16.0 h1:DeuBPqCi6pQwtCK0pO4fvMB5eBq6sNxEnuTs88pjsN4= +go.opentelemetry.io/otel/log v0.16.0/go.mod h1:rWsmqNVTLIA8UnwYVOItjyEZDbKIkMxdQunsIhpUMes= +go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g= +go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc= +go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8= +go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE= +go.opentelemetry.io/otel/sdk/log v0.16.0 h1:e/b4bdlQwC5fnGtG3dlXUrNOnP7c8YLVSpSfEBIkTnI= +go.opentelemetry.io/otel/sdk/log v0.16.0/go.mod h1:JKfP3T6ycy7QEuv3Hj8oKDy7KItrEkus8XJE6EoSzw4= +go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw= +go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= +go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= +go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= +go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= +go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= -golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= -golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= -golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/crypto v0.44.0 h1:A97SsFvM3AIwEEmTBiaxPPTYpDC47w720rdiiUvgoAU= +golang.org/x/crypto v0.44.0/go.mod h1:013i+Nw79BMiQiMsOPcVCB5ZIJbYkerPrGnOa00tvmc= +golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= +golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= +golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= +golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= +golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= +golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= -golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= -golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= +golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= +golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= +golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= +golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= -google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 h1:fCvbg86sFXwdrl5LgVcTEvNC+2txB5mgROGmRL5mrls= +google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:+rXWjjaukWZun3mLfjmVnQi18E1AsFbDN9QdJ5YXLto= +google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 h1:merA0rdPeUV3YIIfHHcH4qBkiQAc1nfCKSI7lB4cV2M= +google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409/go.mod h1:fl8J1IvUjCilwZzQowmw2b7HQB2eAuYBabMXzWurF+I= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 h1:gRkg/vSppuSQoDjxyiGfN4Upv/h/DQmIR10ZU8dh4Ww= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 h1:H86B94AW+VfJWDqFeEbBPhEtHzJwJfTbgE2lZa54ZAQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= +google.golang.org/grpc v1.77.0 h1:wVVY6/8cGA6vvffn+wWK5ToddbgdU3d8MNENr4evgXM= +google.golang.org/grpc v1.77.0/go.mod h1:z0BY1iVj0q8E1uSQCjL9cppRj+gnZjzDnzV0dHhrNig= +google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= +google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= +google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= +google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/pkg/log/log.go b/pkg/log/log.go index e1c788c..a80ebea 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -14,6 +14,7 @@ import ( "github.com/beckn-one/beckn-onix/pkg/model" "github.com/rs/zerolog" + "go.opentelemetry.io/otel/trace" "gopkg.in/natefinch/lumberjack.v2" ) @@ -273,6 +274,11 @@ func Request(ctx context.Context, r *http.Request, body []byte) { // addCtx adds context values to the log event based on configured context keys. func addCtx(ctx context.Context, event *zerolog.Event) { + span := trace.SpanFromContext(ctx) + if span.SpanContext().IsValid() { + event.Str("trace_id", span.SpanContext().TraceID().String()) + event.Str("span_id", span.SpanContext().SpanID().String()) + } for _, key := range cfg.ContextKeys { val, ok := ctx.Value(key).(string) if !ok { diff --git a/pkg/model/model.go b/pkg/model/model.go index de53254..ac3dc5b 100644 --- a/pkg/model/model.go +++ b/pkg/model/model.go @@ -56,6 +56,9 @@ const ( // ContextKeyParentID is the context key for storing and retrieving the parent ID from a request context ContextKeyParentID ContextKey = "parent_id" + + // ContextKeyCallerID is the context key for the caller who is calling the bap/bpp + ContextKeyCallerID ContextKey = "caller_id" ) var contextKeys = map[string]ContextKey{ @@ -64,6 +67,7 @@ var contextKeys = map[string]ContextKey{ "subscriber_id": ContextKeySubscriberID, "module_id": ContextKeyModuleID, "parent_id": ContextKeyParentID, + "caller_id": ContextKeyCallerID, } // ParseContextKey converts a string into a valid ContextKey. diff --git a/pkg/plugin/implementation/cache/cache.go b/pkg/plugin/implementation/cache/cache.go index fe91c17..334e0f1 100644 --- a/pkg/plugin/implementation/cache/cache.go +++ b/pkg/plugin/implementation/cache/cache.go @@ -8,13 +8,14 @@ import ( "os" "time" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - "github.com/beckn-one/beckn-onix/pkg/log" "github.com/beckn-one/beckn-onix/pkg/telemetry" "github.com/redis/go-redis/extra/redisotel/v9" "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" ) // RedisCl global variable for the Redis client, can be overridden in tests @@ -103,10 +104,6 @@ func New(ctx context.Context, cfg *Config) (*Cache, func() error, error) { log.Debugf(ctx, "Failed to instrument Redis tracing: %v", err) } - if err := redisotel.InstrumentMetrics(redisClient); err != nil { - // Log error but don't fail - instrumentation is optional - log.Debugf(ctx, "Failed to instrument Redis metrics: %v", err) - } } metrics, _ := GetCacheMetrics(ctx) @@ -141,8 +138,12 @@ func (c *Cache) Get(ctx context.Context, key string) (string, error) { // Set stores the given key-value pair in Redis with the specified TTL (time to live). func (c *Cache) Set(ctx context.Context, key, value string, ttl time.Duration) error { - err := c.Client.Set(ctx, key, value, ttl).Err() - c.recordOperation(ctx, "set", err) + tracer := otel.Tracer(telemetry.ScopeName, trace.WithInstrumentationVersion(telemetry.ScopeVersion)) + spanCtx, span := tracer.Start(ctx, "redis_set") + defer span.End() + + err := c.Client.Set(spanCtx, key, value, ttl).Err() + c.recordOperation(spanCtx, "set", err) return err } diff --git a/pkg/plugin/implementation/otelsetup/cmd/plugin.go b/pkg/plugin/implementation/otelsetup/cmd/plugin.go index 260231e..a0407d7 100644 --- a/pkg/plugin/implementation/otelsetup/cmd/plugin.go +++ b/pkg/plugin/implementation/otelsetup/cmd/plugin.go @@ -4,9 +4,11 @@ import ( "context" "errors" "strconv" + "strings" "time" "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/beckn-one/beckn-onix/pkg/model" "github.com/beckn-one/beckn-onix/pkg/plugin/implementation/otelsetup" "github.com/beckn-one/beckn-onix/pkg/telemetry" ) @@ -27,31 +29,81 @@ func (m metricsProvider) New(ctx context.Context, config map[string]string) (*te ServiceName: config["serviceName"], ServiceVersion: config["serviceVersion"], Environment: config["environment"], - MetricsPort: config["metricsPort"], + Domain: config["domain"], + OtlpEndpoint: config["otlpEndpoint"], + } + + // to extract the device id from the parent id from context + var deviceId string + var producer string + var producerType string + var err error + if v := ctx.Value(model.ContextKeyParentID); v != nil { + parentID := v.(string) + p := strings.Split(parentID, ":") + deviceId = p[len(p)-1] + producerType = p[0] + producer = p[1] + } + + if deviceId != "" { + telemetryConfig.DeviceID = deviceId + } + + if producer != "" { + telemetryConfig.Producer = producer + } + if producerType != "" { + telemetryConfig.ProducerType = producerType + } + + // Parse enableTracing from config + if enableTracingStr, ok := config["enableTracing"]; ok && enableTracingStr != "" { + telemetryConfig.EnableTracing, err = strconv.ParseBool(enableTracingStr) + if err != nil { + log.Warnf(ctx, "Invalid enableTracing value: %s, defaulting to False", enableTracingStr) + } } // Parse enableMetrics as boolean if enableMetricsStr, ok := config["enableMetrics"]; ok && enableMetricsStr != "" { - enableMetrics, err := strconv.ParseBool(enableMetricsStr) + telemetryConfig.EnableMetrics, err = strconv.ParseBool(enableMetricsStr) if err != nil { - log.Warnf(ctx, "Invalid enableMetrics value '%s', defaulting to true: %v", enableMetricsStr, err) - telemetryConfig.EnableMetrics = true - } else { - telemetryConfig.EnableMetrics = enableMetrics + log.Warnf(ctx, "Invalid enableMetrics value '%s', defaulting to False: %v", enableMetricsStr, err) } - } else { - telemetryConfig.EnableMetrics = true // Default to true if not specified or empty } - // Apply defaults if fields are empty - if telemetryConfig.ServiceName == "" { - telemetryConfig.ServiceName = otelsetup.DefaultConfig().ServiceName + // Parse enableLogs as boolean + if enableLogsStr, ok := config["enableLogs"]; ok && enableLogsStr != "" { + telemetryConfig.EnableLogs, err = strconv.ParseBool(enableLogsStr) + if err != nil { + log.Warnf(ctx, "Invalid enableLogs value '%s', defaulting to False: %v", enableLogsStr, err) + } } - if telemetryConfig.ServiceVersion == "" { - telemetryConfig.ServiceVersion = otelsetup.DefaultConfig().ServiceVersion + + // Parse timeInterval as int + if timeIntervalStr, ok := config["timeInterval"]; ok && timeIntervalStr != "" { + telemetryConfig.TimeInterval, err = strconv.ParseInt(timeIntervalStr, 10, 64) + if err != nil { + log.Warnf(ctx, "Invalid timeInterval value: %s, defaulting to 5 second ", timeIntervalStr) + } + } - if telemetryConfig.Environment == "" { - telemetryConfig.Environment = otelsetup.DefaultConfig().Environment + + // to set fields for audit logs + if v, ok := config["auditFieldsConfig"]; ok && v != "" { + if err := telemetry.LoadAuditFieldRules(ctx, v); err != nil { + log.Warnf(ctx, "Failed to load audit field rules: %v", err) + } + } + + //to set network leval matric frequency and granularity + if v, ok := config["networkMetricsGranularity"]; ok && v != "" { + telemetry.SetNetworkMetricsConfig(v, "") + } + + if v, ok := config["networkMetricsFrequency"]; ok && v != "" { + telemetry.SetNetworkMetricsConfig("", v) } log.Debugf(ctx, "Telemetry config mapped: %+v", telemetryConfig) diff --git a/pkg/plugin/implementation/otelsetup/otelsetup.go b/pkg/plugin/implementation/otelsetup/otelsetup.go index 4b52d78..adccc5e 100644 --- a/pkg/plugin/implementation/otelsetup/otelsetup.go +++ b/pkg/plugin/implementation/otelsetup/otelsetup.go @@ -3,23 +3,25 @@ package otelsetup import ( "context" "fmt" - "net" - "net/http" - "sync" - "time" - clientprom "github.com/prometheus/client_golang/prometheus" - clientpromhttp "github.com/prometheus/client_golang/prometheus/promhttp" - "go.opentelemetry.io/contrib/instrumentation/runtime" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - otelprom "go.opentelemetry.io/otel/exporters/prometheus" - "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" + "time" "github.com/beckn-one/beckn-onix/pkg/log" "github.com/beckn-one/beckn-onix/pkg/plugin" "github.com/beckn-one/beckn-onix/pkg/telemetry" + "go.opentelemetry.io/contrib/instrumentation/runtime" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/log/global" + logsdk "go.opentelemetry.io/otel/sdk/log" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) // Setup wires the telemetry provider. This is the concrete implementation @@ -30,9 +32,16 @@ type Setup struct{} type Config struct { ServiceName string `yaml:"serviceName"` ServiceVersion string `yaml:"serviceVersion"` - EnableMetrics bool `yaml:"enableMetrics"` Environment string `yaml:"environment"` - MetricsPort string `yaml:"metricsPort"` + Domain string `yaml:"domain"` + DeviceID string `yaml:"deviceID"` + EnableMetrics bool `yaml:"enableMetrics"` + EnableTracing bool `yaml:"enableTracing"` + EnableLogs bool `yaml:"enableLogs"` + OtlpEndpoint string `yaml:"otlpEndpoint"` + TimeInterval int64 `yaml:"timeInterval"` + Producer string `yaml:"producer"` + ProducerType string `yaml:"producerType"` } // DefaultConfig returns sensible defaults for telemetry configuration. @@ -40,9 +49,11 @@ func DefaultConfig() *Config { return &Config{ ServiceName: "beckn-onix", ServiceVersion: "dev", - EnableMetrics: true, Environment: "development", - MetricsPort: "9090", + Domain: "", + DeviceID: "beckn-onix-device", + OtlpEndpoint: "localhost:4317", + TimeInterval: 5, } } @@ -53,9 +64,11 @@ func ToPluginConfig(cfg *Config) *plugin.Config { Config: map[string]string{ "serviceName": cfg.ServiceName, "serviceVersion": cfg.ServiceVersion, - "enableMetrics": fmt.Sprintf("%t", cfg.EnableMetrics), "environment": cfg.Environment, - "metricsPort": cfg.MetricsPort, + "enableMetrics": fmt.Sprintf("%t", cfg.EnableMetrics), + "enableTracing": fmt.Sprintf("%t", cfg.EnableTracing), + "otelEndpoint": cfg.OtlpEndpoint, + "deviceID": cfg.DeviceID, }, } } @@ -78,92 +91,126 @@ func (Setup) New(ctx context.Context, cfg *Config) (*telemetry.Provider, error) if cfg.Environment == "" { cfg.Environment = DefaultConfig().Environment } - if cfg.MetricsPort == "" { - cfg.MetricsPort = DefaultConfig().MetricsPort + if cfg.Domain == "" { + cfg.Domain = DefaultConfig().Domain + } + if cfg.DeviceID == "" { + cfg.DeviceID = DefaultConfig().DeviceID + } + if cfg.TimeInterval == 0 { + cfg.TimeInterval = DefaultConfig().TimeInterval } - if !cfg.EnableMetrics { - log.Info(ctx, "OpenTelemetry metrics disabled") + if !cfg.EnableMetrics && !cfg.EnableTracing { + log.Info(ctx, "OpenTelemetry metrics and tracing are disabled") return &telemetry.Provider{ Shutdown: func(context.Context) error { return nil }, }, nil } - res, err := resource.New( - ctx, - resource.WithAttributes( - attribute.String("service.name", cfg.ServiceName), - attribute.String("service.version", cfg.ServiceVersion), - attribute.String("deployment.environment", cfg.Environment), - ), - ) + //this will be used by both matric and traces + + // to build resource with envelope metadata + baseAttrs := []attribute.KeyValue{ + attribute.String("service.name", cfg.ServiceName), + attribute.String("service.version", cfg.ServiceVersion), + attribute.String("environment", cfg.Environment), + attribute.String("domain", cfg.Domain), + attribute.String("device_id", cfg.DeviceID), + attribute.String("producerType", cfg.ProducerType), + attribute.String("producer", cfg.Producer), + } + + resMetric, err := resource.New(ctx, resource.WithAttributes(buildAtts(baseAttrs, "METRIC")...)) if err != nil { - return nil, fmt.Errorf("failed to create telemetry resource: %w", err) + return nil, fmt.Errorf("failed to create telemetry resource for matric: %w", err) } - registry := clientprom.NewRegistry() - - exporter, err := otelprom.New( - otelprom.WithRegisterer(registry), - otelprom.WithoutUnits(), - otelprom.WithoutScopeInfo(), - ) - if err != nil { - return nil, fmt.Errorf("failed to create prometheus exporter: %w", err) - } - - meterProvider := metric.NewMeterProvider( - metric.WithReader(exporter), - metric.WithResource(res), - ) - - otel.SetMeterProvider(meterProvider) - log.Infof(ctx, "OpenTelemetry metrics initialized for service=%s version=%s env=%s", - cfg.ServiceName, cfg.ServiceVersion, cfg.Environment) - - if err := runtime.Start(runtime.WithMinimumReadMemStatsInterval(0)); err != nil { - log.Warnf(ctx, "Failed to start Go runtime instrumentation: %v", err) - } - - // Create metrics handler - metricsHandler := clientpromhttp.HandlerFor(registry, clientpromhttp.HandlerOpts{}) - - // Create and start metrics HTTP server - metricsMux := http.NewServeMux() - metricsMux.Handle("/metrics", metricsHandler) - - metricsServer := &http.Server{ - Addr: net.JoinHostPort("", cfg.MetricsPort), - Handler: metricsMux, - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - IdleTimeout: 30 * time.Second, - } - - var serverWg sync.WaitGroup - serverWg.Add(1) - go func() { - defer serverWg.Done() - log.Infof(ctx, "Metrics server listening on %s", metricsServer.Addr) - if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { - log.Errorf(ctx, fmt.Errorf("metrics server ListenAndServe: %w", err), "error listening and serving metrics") + //OTLP matric + var meterProvider *metric.MeterProvider + if cfg.EnableMetrics { + metricExpoter, err := otlpmetricgrpc.New(ctx, otlpmetricgrpc.WithEndpoint(cfg.OtlpEndpoint), + otlpmetricgrpc.WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials()))) + if err != nil { + return nil, fmt.Errorf("failed to create OTLP metric exporter: %w", err) } - }() + reader := metric.NewPeriodicReader(metricExpoter, metric.WithInterval(time.Second*time.Duration(cfg.TimeInterval))) + meterProvider = metric.NewMeterProvider(metric.WithReader(reader), metric.WithResource(resMetric)) + otel.SetMeterProvider(meterProvider) + log.Infof(ctx, "OpenTelemetry metrics initialized for service=%s version=%s env=%s (OTLP endpoint=%s)", + cfg.ServiceName, cfg.ServiceVersion, cfg.Environment, cfg.OtlpEndpoint) + // for the go runtime matrics + if err := runtime.Start(runtime.WithMinimumReadMemStatsInterval(runtime.DefaultMinimumReadMemStatsInterval)); err != nil { + log.Warnf(ctx, "Failed to start Go runtime instrumentation: %v", err) + } + } + + //OTLP traces + restrace, err := resource.New(ctx, resource.WithAttributes(buildAtts(baseAttrs, "API")...)) + if err != nil { + return nil, fmt.Errorf("failed to create trace resource: %w", err) + } + var traceProvider *trace.TracerProvider + if cfg.EnableTracing { + traceExpoter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithEndpoint(cfg.OtlpEndpoint), otlptracegrpc.WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials()))) + if err != nil { + return nil, fmt.Errorf("failed to create OTLP trace exporter: %w", err) + } + traceProvider = trace.NewTracerProvider(trace.WithBatcher(traceExpoter), trace.WithResource(restrace)) //TODO: need to add the trace sampleing rate + otel.SetTracerProvider(traceProvider) + log.Infof(ctx, "OpenTelemetry tracing initialized for service=%s (OTLP endpoint=%s)", + cfg.ServiceName, cfg.OtlpEndpoint) + } + + resAudit, err := resource.New(ctx, resource.WithAttributes(buildAtts(baseAttrs, "AUDIT")...)) + if err != nil { + return nil, fmt.Errorf("failed to create audit resource: %w", err) + } + var logProvider *logsdk.LoggerProvider + if cfg.EnableLogs { + logExporter, err := otlploggrpc.New(ctx, otlploggrpc.WithEndpoint(cfg.OtlpEndpoint), otlploggrpc.WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials()))) + if err != nil { + return nil, fmt.Errorf("failed to create OTLP logs exporter: %w", err) + } + processor := logsdk.NewBatchProcessor(logExporter) + logProvider = logsdk.NewLoggerProvider(logsdk.WithProcessor(processor), logsdk.WithResource(resAudit)) + global.SetLoggerProvider(logProvider) + } return &telemetry.Provider{ - MeterProvider: meterProvider, - MetricsHandler: metricsHandler, + MeterProvider: meterProvider, + TraceProvider: traceProvider, + LogProvider: logProvider, Shutdown: func(shutdownCtx context.Context) error { - log.Infof(ctx, "Shutting down metrics server...") - // Shutdown the metrics server - serverShutdownCtx, cancel := context.WithTimeout(shutdownCtx, 10*time.Second) - defer cancel() - if err := metricsServer.Shutdown(serverShutdownCtx); err != nil { - log.Errorf(ctx, fmt.Errorf("metrics server shutdown: %w", err), "error shutting down metrics server") + + var errs []error + if traceProvider != nil { + if err := traceProvider.Shutdown(shutdownCtx); err != nil { + errs = append(errs, fmt.Errorf("tracer shutdown: %w", err)) + } } - serverWg.Wait() - // Shutdown the meter provider - return meterProvider.Shutdown(shutdownCtx) + if meterProvider != nil { + if err := meterProvider.Shutdown(shutdownCtx); err != nil { + errs = append(errs, fmt.Errorf("meter shutdown: %w", err)) + } + } + + if logProvider != nil { + if err := logProvider.Shutdown(shutdownCtx); err != nil { + errs = append(errs, fmt.Errorf("logs shutdown: %w", err)) + } + } + if len(errs) > 0 { + return fmt.Errorf("shutdown errors: %v", errs) + } + return nil }, }, nil } + +func buildAtts(base []attribute.KeyValue, eid string) []attribute.KeyValue { + atts := make([]attribute.KeyValue, 0, len(base)+1) + atts = append(atts, base...) + atts = append(atts, attribute.String("eid", eid)) + return atts +} diff --git a/pkg/plugin/implementation/otelsetup/otelsetup_test.go b/pkg/plugin/implementation/otelsetup/otelsetup_test.go index 916b632..81d5afb 100644 --- a/pkg/plugin/implementation/otelsetup/otelsetup_test.go +++ b/pkg/plugin/implementation/otelsetup/otelsetup_test.go @@ -22,15 +22,21 @@ func TestSetup_New_Success(t *testing.T) { ServiceName: "test-service", ServiceVersion: "1.0.0", EnableMetrics: true, + EnableTracing: false, Environment: "test", + Domain: "test-domain", + DeviceID: "test-device", + OtlpEndpoint: "localhost:4317", + TimeInterval: 5, }, }, { - name: "Valid config with metrics disabled", + name: "Valid config with metrics and tracing disabled", cfg: &Config{ ServiceName: "test-service", ServiceVersion: "1.0.0", EnableMetrics: false, + EnableTracing: false, Environment: "test", }, }, @@ -40,6 +46,7 @@ func TestSetup_New_Success(t *testing.T) { ServiceName: "", ServiceVersion: "", EnableMetrics: true, + EnableTracing: false, Environment: "", }, }, @@ -56,10 +63,12 @@ func TestSetup_New_Success(t *testing.T) { if tt.cfg.EnableMetrics { assert.NotNil(t, provider.MeterProvider, "MeterProvider should be set when metrics enabled") } + if tt.cfg.EnableTracing { + assert.NotNil(t, provider.TraceProvider, "TraceProvider should be set when tracing enabled") + } - // Test shutdown - err = provider.Shutdown(ctx) - assert.NoError(t, err, "Shutdown should not return error") + // Shutdown for cleanup. When metrics/tracing are enabled, shutdown may fail without a real OTLP backend. + _ = provider.Shutdown(ctx) }) } } @@ -104,7 +113,10 @@ func TestSetup_New_DefaultValues(t *testing.T) { ServiceName: "", ServiceVersion: "", EnableMetrics: true, + EnableTracing: false, Environment: "", + OtlpEndpoint: "localhost:4317", + TimeInterval: 5, } provider, err := setup.New(ctx, cfg) @@ -114,9 +126,8 @@ func TestSetup_New_DefaultValues(t *testing.T) { // Verify defaults are applied by checking that provider is functional assert.NotNil(t, provider.MeterProvider, "MeterProvider should be set with defaults") - // Cleanup - err = provider.Shutdown(ctx) - assert.NoError(t, err) + // Cleanup (shutdown may fail without a real OTLP backend) + _ = provider.Shutdown(ctx) } func TestSetup_New_MetricsDisabled(t *testing.T) { @@ -127,6 +138,7 @@ func TestSetup_New_MetricsDisabled(t *testing.T) { ServiceName: "test-service", ServiceVersion: "1.0.0", EnableMetrics: false, + EnableTracing: false, Environment: "test", } @@ -134,8 +146,9 @@ func TestSetup_New_MetricsDisabled(t *testing.T) { require.NoError(t, err) require.NotNil(t, provider) - // When metrics are disabled, MetricsHandler should be nil and MeterProvider should be nil + // When metrics and tracing are disabled, MeterProvider and TraceProvider should be nil assert.Nil(t, provider.MeterProvider, "MeterProvider should be nil when metrics disabled") + assert.Nil(t, provider.TraceProvider, "TraceProvider should be nil when tracing disabled") // Shutdown should still work err = provider.Shutdown(ctx) @@ -155,32 +168,42 @@ func TestToPluginConfig_Success(t *testing.T) { ServiceName: "test-service", ServiceVersion: "1.0.0", EnableMetrics: true, + EnableTracing: true, Environment: "test", + Domain: "test-domain", + DeviceID: "test-device", + OtlpEndpoint: "localhost:4317", + TimeInterval: 5, }, expectedID: "otelsetup", expectedConfig: map[string]string{ "serviceName": "test-service", "serviceVersion": "1.0.0", - "enableMetrics": "true", "environment": "test", - "metricsPort": "", + "enableMetrics": "true", + "enableTracing": "true", + "otelEndpoint": "localhost:4317", + "deviceID": "test-device", }, }, { - name: "Config with enableMetrics false", + name: "Config with enableMetrics and enableTracing false", cfg: &Config{ ServiceName: "my-service", ServiceVersion: "2.0.0", EnableMetrics: false, + EnableTracing: false, Environment: "production", }, expectedID: "otelsetup", expectedConfig: map[string]string{ "serviceName": "my-service", "serviceVersion": "2.0.0", - "enableMetrics": "false", "environment": "production", - "metricsPort": "", + "enableMetrics": "false", + "enableTracing": "false", + "otelEndpoint": "", + "deviceID": "", }, }, { @@ -189,15 +212,21 @@ func TestToPluginConfig_Success(t *testing.T) { ServiceName: "", ServiceVersion: "", EnableMetrics: true, + EnableTracing: false, Environment: "", + Domain: "", + DeviceID: "", + OtlpEndpoint: "", }, expectedID: "otelsetup", expectedConfig: map[string]string{ "serviceName": "", "serviceVersion": "", - "enableMetrics": "true", "environment": "", - "metricsPort": "", + "enableMetrics": "true", + "enableTracing": "false", + "otelEndpoint": "", + "deviceID": "", }, }, } @@ -224,19 +253,32 @@ func TestToPluginConfig_NilConfig(t *testing.T) { func TestToPluginConfig_BooleanConversion(t *testing.T) { tests := []struct { - name string - enableMetrics bool - expected string + name string + enableMetrics bool + enableTracing bool + expectedMetric string + expectedTrace string }{ { - name: "EnableMetrics true", - enableMetrics: true, - expected: "true", + name: "EnableMetrics and EnableTracing true", + enableMetrics: true, + enableTracing: true, + expectedMetric: "true", + expectedTrace: "true", }, { - name: "EnableMetrics false", - enableMetrics: false, - expected: "false", + name: "EnableMetrics and EnableTracing false", + enableMetrics: false, + enableTracing: false, + expectedMetric: "false", + expectedTrace: "false", + }, + { + name: "EnableMetrics true, EnableTracing false", + enableMetrics: true, + enableTracing: false, + expectedMetric: "true", + expectedTrace: "false", }, } @@ -246,14 +288,18 @@ func TestToPluginConfig_BooleanConversion(t *testing.T) { ServiceName: "test", ServiceVersion: "1.0.0", EnableMetrics: tt.enableMetrics, + EnableTracing: tt.enableTracing, Environment: "test", - MetricsPort: "", + OtlpEndpoint: "localhost:4317", + DeviceID: "test-device", } result := ToPluginConfig(cfg) require.NotNil(t, result) - assert.Equal(t, tt.expected, result.Config["enableMetrics"], "enableMetrics should be converted to string correctly") - assert.Equal(t, "", result.Config["metricsPort"], "metricsPort should be included even when empty") + assert.Equal(t, tt.expectedMetric, result.Config["enableMetrics"], "enableMetrics should be converted to string correctly") + assert.Equal(t, tt.expectedTrace, result.Config["enableTracing"], "enableTracing should be converted to string correctly") + assert.Equal(t, "localhost:4317", result.Config["otelEndpoint"], "otelEndpoint should be included") + assert.Equal(t, "test-device", result.Config["deviceID"], "deviceID should be included") }) } } diff --git a/pkg/plugin/implementation/reqpreprocessor/reqpreprocessor.go b/pkg/plugin/implementation/reqpreprocessor/reqpreprocessor.go index fa32898..97d74a4 100644 --- a/pkg/plugin/implementation/reqpreprocessor/reqpreprocessor.go +++ b/pkg/plugin/implementation/reqpreprocessor/reqpreprocessor.go @@ -48,6 +48,7 @@ func NewPreProcessor(cfg *Config) (func(http.Handler) http.Handler, error) { http.Error(w, fmt.Sprintf("%s field not found or invalid.", contextKey), http.StatusBadRequest) return } + var subID any switch cfg.Role { case "bap": @@ -55,6 +56,14 @@ func NewPreProcessor(cfg *Config) (func(http.Handler) http.Handler, error) { case "bpp": subID = reqContext["bpp_id"] } + + var callerID any + switch cfg.Role { + case "bap": + callerID = reqContext["bpp_id"] + case "bpp": + callerID = reqContext["bap_id"] + } if subID != nil { log.Debugf(ctx, "adding subscriberId to request:%s, %v", model.ContextKeySubscriberID, subID) ctx = context.WithValue(ctx, model.ContextKeySubscriberID, subID) @@ -64,6 +73,11 @@ func NewPreProcessor(cfg *Config) (func(http.Handler) http.Handler, error) { log.Debugf(ctx, "adding parentID to request:%s, %v", model.ContextKeyParentID, cfg.ParentID) ctx = context.WithValue(ctx, model.ContextKeyParentID, cfg.ParentID) } + + if callerID != nil { + log.Debugf(ctx, "adding callerID to request:%s, %v", model.ContextKeyCallerID, callerID) + ctx = context.WithValue(ctx, model.ContextKeyCallerID, callerID) + } for _, key := range cfg.ContextKeys { ctxKey, _ := model.ParseContextKey(key) if v, ok := reqContext[key]; ok { diff --git a/pkg/plugin/implementation/simplekeymanager/simplekeymanager.go b/pkg/plugin/implementation/simplekeymanager/simplekeymanager.go index 2194250..88a499b 100644 --- a/pkg/plugin/implementation/simplekeymanager/simplekeymanager.go +++ b/pkg/plugin/implementation/simplekeymanager/simplekeymanager.go @@ -15,7 +15,10 @@ import ( "github.com/beckn-one/beckn-onix/pkg/log" "github.com/beckn-one/beckn-onix/pkg/model" "github.com/beckn-one/beckn-onix/pkg/plugin/definition" + "github.com/beckn-one/beckn-onix/pkg/telemetry" "github.com/google/uuid" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" ) // Config holds configuration parameters for SimpleKeyManager. @@ -245,28 +248,43 @@ func (skm *SimpleKeyMgr) LookupNPKeys(ctx context.Context, subscriberID, uniqueK return "", "", err } + tracer := otel.Tracer(telemetry.ScopeName, trace.WithInstrumentationVersion(telemetry.ScopeVersion)) cacheKey := fmt.Sprintf("%s_%s", subscriberID, uniqueKeyID) - cachedData, err := skm.Cache.Get(ctx, cacheKey) - if err == nil { - var keys model.Keyset - if err := json.Unmarshal([]byte(cachedData), &keys); err == nil { - log.Debugf(ctx, "Found cached keys for subscriber: %s, uniqueKeyID: %s", subscriberID, uniqueKeyID) - return keys.SigningPublic, keys.EncrPublic, nil + var cachedData string + + { + spanCtx, span := tracer.Start(ctx, "redis lookup") + defer span.End() + var err error + cachedData, err = skm.Cache.Get(spanCtx, cacheKey) + if err == nil { + var keys model.Keyset + if err := json.Unmarshal([]byte(cachedData), &keys); err == nil { + log.Debugf(ctx, "Found cached keys for subscriber: %s, uniqueKeyID: %s", subscriberID, uniqueKeyID) + return keys.SigningPublic, keys.EncrPublic, nil + } } } log.Debugf(ctx, "Cache miss, looking up registry for subscriber: %s, uniqueKeyID: %s", subscriberID, uniqueKeyID) - subscribers, err := skm.Registry.Lookup(ctx, &model.Subscription{ - Subscriber: model.Subscriber{ - SubscriberID: subscriberID, - }, - KeyID: uniqueKeyID, - }) - if err != nil { - return "", "", fmt.Errorf("failed to lookup registry: %w", err) - } - if len(subscribers) == 0 { - return "", "", ErrSubscriberNotFound + var subscribers []model.Subscription + { + spanCtx, span := tracer.Start(ctx, "registry lookup") + defer span.End() + var err error + + subscribers, err = skm.Registry.Lookup(spanCtx, &model.Subscription{ + Subscriber: model.Subscriber{ + SubscriberID: subscriberID, + }, + KeyID: uniqueKeyID, + }) + if err != nil { + return "", "", fmt.Errorf("failed to lookup registry: %w", err) + } + if len(subscribers) == 0 { + return "", "", ErrSubscriberNotFound + } } log.Debugf(ctx, "Successfully looked up keys for subscriber: %s, uniqueKeyID: %s", subscriberID, uniqueKeyID) diff --git a/pkg/plugin/manager.go b/pkg/plugin/manager.go index ebc4316..ef00dd8 100644 --- a/pkg/plugin/manager.go +++ b/pkg/plugin/manager.go @@ -197,9 +197,7 @@ func (m *Manager) Middleware(ctx context.Context, cfg *Config) (func(http.Handle return mwp.New(ctx, cfg.Config) } -// OtelSetup initializes OpenTelemetry via a dedicated plugin. The plugin is -// expected to return a telemetry Provider that the core application can use for -// instrumentation. +// OtelSetup initializes OpenTelemetry via a dedicated plugin. The plugin is expected to return a telemetry Provider that the core application can use for instrumentation. func (m *Manager) OtelSetup(ctx context.Context, cfg *Config) (*telemetry.Provider, error) { if cfg == nil { log.Info(ctx, "Telemetry config not provided; skipping OpenTelemetry setup") diff --git a/pkg/telemetry/audit.go b/pkg/telemetry/audit.go new file mode 100644 index 0000000..2398c3c --- /dev/null +++ b/pkg/telemetry/audit.go @@ -0,0 +1,56 @@ +package telemetry + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "time" + + logger "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/beckn-one/beckn-onix/pkg/model" + "github.com/google/uuid" + "go.opentelemetry.io/otel/log" + "go.opentelemetry.io/otel/log/global" +) + +const auditLoggerName = "Beckn_ONIX" + +func EmitAuditLogs(ctx context.Context, body []byte, attrs ...log.KeyValue) { + + provider := global.GetLoggerProvider() + if provider == nil { + logger.Warnf(ctx, "failed to emit audit logs, logs disabled") + return + } + + //maskedBody := MaskPIIInAuditBody(body) + + sum := sha256.Sum256(body) + auditBody := selectAuditPayload(ctx, body) + auditlog := provider.Logger(auditLoggerName) + record := log.Record{} + record.SetBody(log.StringValue(string(auditBody))) + record.SetTimestamp(time.Now()) + record.SetObservedTimestamp(time.Now()) + record.SetSeverity(log.SeverityInfo) + + checkSum := hex.EncodeToString(sum[:]) + + txnID, _ := ctx.Value(model.ContextKeyTxnID).(string) + msgID, _ := ctx.Value(model.ContextKeyMsgID).(string) + parentID, _ := ctx.Value(model.ContextKeyParentID).(string) + + record.AddAttributes( + log.String("checkSum", checkSum), + log.String("log_uuid", uuid.New().String()), + log.String("transaction_id", txnID), + log.String("message_id", msgID), + log.String("parent_id", parentID), + ) + + if len(attrs) > 0 { + record.AddAttributes(attrs...) + } + + auditlog.Emit(ctx, record) +} diff --git a/pkg/telemetry/audit_fields.go b/pkg/telemetry/audit_fields.go new file mode 100644 index 0000000..d5e5635 --- /dev/null +++ b/pkg/telemetry/audit_fields.go @@ -0,0 +1,216 @@ +package telemetry + +import ( + "context" + "encoding/json" + "fmt" + "os" + "strings" + "sync" + + "github.com/beckn-one/beckn-onix/pkg/log" + "gopkg.in/yaml.v3" +) + +type auditFieldsRules struct { + AuditRules map[string][]string `yaml:"auditRules"` +} + +var ( + auditRules = map[string][]string{} + auditRulesMutex sync.RWMutex +) + +func LoadAuditFieldRules(ctx context.Context, configPath string) error { + + if strings.TrimSpace(configPath) == "" { + err := fmt.Errorf("config file path is empty") + log.Error(ctx, err, "there are no audit rules defined") + return err + } + + data, err := os.ReadFile(configPath) + if err != nil { + log.Error(ctx, err, "failed to read audit rules file") + return err + } + + var config auditFieldsRules + if err := yaml.Unmarshal(data, &config); err != nil { + log.Error(ctx, err, "failed to parse audit rules file") + return err + } + + if config.AuditRules == nil { + log.Warn(ctx, "audit rules are not defined") + config.AuditRules = map[string][]string{} + } + + auditRulesMutex.Lock() + auditRules = config.AuditRules + auditRulesMutex.Unlock() + log.Info(ctx, "audit rules loaded") + return nil +} + +func selectAuditPayload(ctx context.Context, body []byte) []byte { + + var root map[string]interface{} + if err := json.Unmarshal(body, &root); err != nil { + log.Warn(ctx, "failed to unmarshal audit payload ") + return nil + } + + action := "" + if c, ok := root["context"].(map[string]interface{}); ok { + if v, ok := c["action"].(string); ok { + action = strings.TrimSpace(v) + } + } + + fields := getFieldForAction(ctx, action) + if len(fields) == 0 { + return nil + } + + out := map[string]interface{}{} + for _, field := range fields { + parts := strings.Split(field, ".") + partial, ok := projectPath(root, parts) + if !ok { + continue + } + merged := deepMerge(out, partial) + if m, ok := merged.(map[string]interface{}); ok { + out = m + } + } + + body, err := json.Marshal(out) + if err != nil { + log.Warn(ctx, "failed to marshal audit payload") + return nil + } + return body +} + +func getFieldForAction(ctx context.Context, action string) []string { + auditRulesMutex.RLock() + defer auditRulesMutex.RUnlock() + + if action != "" { + if fields, ok := auditRules[action]; ok && len(fields) > 0 { + return fields + } + } + + log.Warn(ctx, "audit rules are not defined for this action send default") + return auditRules["default"] +} + +//func getByPath(root map[string]interface{}, path string) (interface{}, bool) { +// +// parts := strings.Split(path, ".") +// var cur interface{} = root +// +// for _, part := range parts { +// m, ok := cur.(map[string]interface{}) +// if !ok { +// return nil, false +// } +// v, ok := m[part] +// if !ok { +// return nil, false +// } +// cur = v +// } +// return cur, true +//} +// +//func setByPath(root map[string]interface{}, path string, value interface{}) { +// parts := strings.Split(path, ".") +// cur := root +// +// for i := 0; i < len(parts)-1; i++ { +// k := parts[i] +// next, ok := cur[k].(map[string]interface{}) +// if !ok { +// next = map[string]interface{}{} +// cur[k] = next +// } +// cur = next +// } +// cur[parts[len(parts)-1]] = value +//} + +func projectPath(cur interface{}, parts []string) (interface{}, bool) { + if len(parts) == 0 { + return cur, true + } + + switch node := cur.(type) { + case map[string]interface{}: + next, ok := node[parts[0]] + if !ok { + return nil, false + } + child, ok := projectPath(next, parts[1:]) + if !ok { + return nil, false + } + return map[string]interface{}{parts[0]: child}, true + + case []interface{}: + out := make([]interface{}, 0, len(node)) + found := false + + for _, n := range node { + child, ok := projectPath(n, parts) + if ok { + out = append(out, child) + found = true + } + } + if !found { + return nil, false + } + return out, true + + default: + return nil, false + } +} +func deepMerge(dst, src interface{}) interface{} { + if dst == nil { + return src + } + + dm, dok := dst.(map[string]interface{}) + sm, sok := src.(map[string]interface{}) + if dok && sok { + for k, sv := range sm { + if dv, ok := dm[k]; ok { + dm[k] = deepMerge(dv, sv) + } else { + dm[k] = sv + } + } + return dm + } + + da, dok := dst.([]interface{}) + sa, sok := src.([]interface{}) + if dok && sok { + if len(da) < len(sa) { + ext := make([]interface{}, len(sa)-len(da)) + da = append(da, ext...) + } + + for i := range sa { + da[i] = deepMerge(da[i], sa[i]) + } + return da + } + + return src +} diff --git a/pkg/telemetry/audit_fields_test.go b/pkg/telemetry/audit_fields_test.go new file mode 100644 index 0000000..17141a7 --- /dev/null +++ b/pkg/telemetry/audit_fields_test.go @@ -0,0 +1,518 @@ +package telemetry + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Test projectPath + +func TestProjectPath_EmptyParts(t *testing.T) { + root := map[string]interface{}{"a": "v"} + got, ok := projectPath(root, nil) + require.True(t, ok) + assert.Equal(t, root, got) + + got, ok = projectPath(root, []string{}) + require.True(t, ok) + assert.Equal(t, root, got) +} + +func TestProjectPath_MapSingleLevel(t *testing.T) { + root := map[string]interface{}{"context": map[string]interface{}{"action": "search"}} + got, ok := projectPath(root, []string{"context"}) + require.True(t, ok) + assert.Equal(t, map[string]interface{}{"context": map[string]interface{}{"action": "search"}}, got) +} + +func TestProjectPath_MapNested(t *testing.T) { + root := map[string]interface{}{ + "context": map[string]interface{}{ + "action": "select", + "transaction_id": "tx-1", + }, + } + got, ok := projectPath(root, []string{"context", "action"}) + require.True(t, ok) + assert.Equal(t, map[string]interface{}{"context": map[string]interface{}{"action": "select"}}, got) +} + +func TestProjectPath_MissingKey(t *testing.T) { + root := map[string]interface{}{"context": map[string]interface{}{"action": "search"}} + got, ok := projectPath(root, []string{"context", "missing"}) + require.False(t, ok) + assert.Nil(t, got) +} + +func TestProjectPath_ArrayTraverseAndProject(t *testing.T) { + root := map[string]interface{}{ + "message": map[string]interface{}{ + "order": map[string]interface{}{ + "beckn:orderItems": []interface{}{ + map[string]interface{}{"beckn:orderedItem": "item-1"}, + map[string]interface{}{"beckn:orderedItem": "item-2"}, + }, + }, + }, + } + parts := []string{"message", "order", "beckn:orderItems", "beckn:orderedItem"} + got, ok := projectPath(root, parts) + require.True(t, ok) + + expected := map[string]interface{}{ + "message": map[string]interface{}{ + "order": map[string]interface{}{ + "beckn:orderItems": []interface{}{ + map[string]interface{}{"beckn:orderedItem": "item-1"}, + map[string]interface{}{"beckn:orderedItem": "item-2"}, + }, + }, + }, + } + assert.Equal(t, expected, got) +} + +func TestProjectPath_NonMapOrSlice(t *testing.T) { + _, ok := projectPath("string", []string{"a"}) + require.False(t, ok) + + _, ok = projectPath(42, []string{"a"}) + require.False(t, ok) +} + +func TestProjectPath_EmptyArray(t *testing.T) { + root := map[string]interface{}{"items": []interface{}{}} + got, ok := projectPath(root, []string{"items", "id"}) + require.False(t, ok) + assert.Nil(t, got) +} + +// Test deepMerge + +func TestDeepMerge_NilDst(t *testing.T) { + src := map[string]interface{}{"a": 1} + got := deepMerge(nil, src) + assert.Equal(t, src, got) +} + +func TestDeepMerge_MapIntoMap(t *testing.T) { + dst := map[string]interface{}{"a": 1, "b": 2} + src := map[string]interface{}{"b": 20, "c": 3} + got := deepMerge(dst, src) + assert.Equal(t, map[string]interface{}{"a": 1, "b": 20, "c": 3}, got) +} + +func TestDeepMerge_MapNested(t *testing.T) { + dst := map[string]interface{}{ + "context": map[string]interface{}{"action": "search", "domain": "retail"}, + } + src := map[string]interface{}{ + "context": map[string]interface{}{"action": "search", "transaction_id": "tx-1"}, + } + got := deepMerge(dst, src) + ctx, ok := got.(map[string]interface{})["context"].(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "search", ctx["action"]) + assert.Equal(t, "retail", ctx["domain"]) + assert.Equal(t, "tx-1", ctx["transaction_id"]) +} + +func TestDeepMerge_ArrayIntoArray(t *testing.T) { + dst := []interface{}{ + map[string]interface{}{"id": "a"}, + map[string]interface{}{"id": "b"}, + } + src := []interface{}{ + map[string]interface{}{"id": "a", "name": "A"}, + map[string]interface{}{"id": "b", "name": "B"}, + } + got := deepMerge(dst, src) + sl, ok := got.([]interface{}) + require.True(t, ok) + require.Len(t, sl, 2) + assert.Equal(t, map[string]interface{}{"id": "a", "name": "A"}, sl[0]) + assert.Equal(t, map[string]interface{}{"id": "b", "name": "B"}, sl[1]) +} + +func TestDeepMerge_ArraySrcLonger(t *testing.T) { + dst := []interface{}{map[string]interface{}{"a": 1}} + src := []interface{}{ + map[string]interface{}{"a": 1}, + map[string]interface{}{"a": 2}, + } + got := deepMerge(dst, src) + sl, ok := got.([]interface{}) + require.True(t, ok) + require.Len(t, sl, 2) +} + +func TestDeepMerge_ScalarSrc(t *testing.T) { + dst := map[string]interface{}{"a": 1} + src := "overwrite" + got := deepMerge(dst, src) + assert.Equal(t, "overwrite", got) +} + +// Test getFieldForAction and selectAuditPayload (require loaded rules via temp file) + +func writeAuditRulesFile(t *testing.T, content string) string { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "audit-fields.yaml") + err := os.WriteFile(path, []byte(content), 0600) + require.NoError(t, err) + return path +} + +func TestGetFieldForAction_ActionMatch(t *testing.T) { + ctx := context.Background() + path := writeAuditRulesFile(t, ` +auditRules: + default: + - context.transaction_id + - context.action + search: + - context.action + - context.timestamp + select: + - context.action + - message.order +`) + require.NoError(t, LoadAuditFieldRules(ctx, path)) + + fields := getFieldForAction(ctx, "search") + assert.Equal(t, []string{"context.action", "context.timestamp"}, fields) + + fields = getFieldForAction(ctx, "select") + assert.Equal(t, []string{"context.action", "message.order"}, fields) +} + +func TestGetFieldForAction_FallbackToDefault(t *testing.T) { + ctx := context.Background() + path := writeAuditRulesFile(t, ` +auditRules: + default: + - context.transaction_id + - context.message_id + search: + - context.action +`) + require.NoError(t, LoadAuditFieldRules(ctx, path)) + + fields := getFieldForAction(ctx, "unknown_action") + assert.Equal(t, []string{"context.transaction_id", "context.message_id"}, fields) + + fields = getFieldForAction(ctx, "") + assert.Equal(t, []string{"context.transaction_id", "context.message_id"}, fields) +} + +func TestGetFieldForAction_EmptyDefault(t *testing.T) { + ctx := context.Background() + path := writeAuditRulesFile(t, ` +auditRules: + default: [] + search: + - context.action +`) + require.NoError(t, LoadAuditFieldRules(ctx, path)) + + fields := getFieldForAction(ctx, "other") + assert.Empty(t, fields) +} + +func TestSelectAuditPayload_InvalidJSON(t *testing.T) { + ctx := context.Background() + path := writeAuditRulesFile(t, ` +auditRules: + default: + - context.action +`) + require.NoError(t, LoadAuditFieldRules(ctx, path)) + + got := selectAuditPayload(ctx, []byte("not json")) + assert.Nil(t, got) +} + +func TestSelectAuditPayload_NoRulesLoaded(t *testing.T) { + ctx := context.Background() + // use a fresh context without loading any rules; auditRules may be from previous test + path := writeAuditRulesFile(t, ` +auditRules: + default: [] +`) + require.NoError(t, LoadAuditFieldRules(ctx, path)) + + body := []byte(`{"context":{"action":"search"}}`) + got := selectAuditPayload(ctx, body) + assert.Nil(t, got) +} + +func TestSelectAuditPayload_ContextAndActionOnly(t *testing.T) { + ctx := context.Background() + path := writeAuditRulesFile(t, ` +auditRules: + default: + - context.transaction_id + - context.message_id + - context.action +`) + require.NoError(t, LoadAuditFieldRules(ctx, path)) + + body := []byte(`{ + "context": { + "action": "search", + "transaction_id": "tx-1", + "message_id": "msg-1", + "domain": "retail" + }, + "message": {"intent": "buy"} + }`) + got := selectAuditPayload(ctx, body) + require.NotNil(t, got) + + var out map[string]interface{} + require.NoError(t, json.Unmarshal(got, &out)) + ctxMap, ok := out["context"].(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "search", ctxMap["action"]) + assert.Equal(t, "tx-1", ctxMap["transaction_id"]) + assert.Equal(t, "msg-1", ctxMap["message_id"]) + _, hasMessage := out["message"] + assert.False(t, hasMessage) +} + +func TestSelectAuditPayload_ActionSpecificRules(t *testing.T) { + ctx := context.Background() + path := writeAuditRulesFile(t, ` +auditRules: + default: + - context.action + search: + - context.action + - context.timestamp + - message.intent +`) + require.NoError(t, LoadAuditFieldRules(ctx, path)) + + body := []byte(`{ + "context": {"action": "search", "timestamp": "2024-01-15T10:30:00Z", "domain": "retail"}, + "message": {"intent": {"item": {"id": "x"}}} + }`) + got := selectAuditPayload(ctx, body) + require.NotNil(t, got) + + var out map[string]interface{} + require.NoError(t, json.Unmarshal(got, &out)) + ctxMap := out["context"].(map[string]interface{}) + assert.Equal(t, "search", ctxMap["action"]) + assert.Equal(t, "2024-01-15T10:30:00Z", ctxMap["timestamp"]) + msg := out["message"].(map[string]interface{}) + assert.NotNil(t, msg["intent"]) +} + +func TestSelectAuditPayload_ArrayFieldProjection(t *testing.T) { + ctx := context.Background() + path := writeAuditRulesFile(t, ` +auditRules: + default: + - context.action + select: + - context.transaction_id + - context.action + - message.order.beckn:orderItems.beckn:orderedItem +`) + require.NoError(t, LoadAuditFieldRules(ctx, path)) + + body := []byte(`{ + "context": {"action": "select", "transaction_id": "tx-2"}, + "message": { + "order": { + "beckn:orderItems": [ + {"beckn:orderedItem": "item-A", "other": "x"}, + {"beckn:orderedItem": "item-B", "other": "y"} + ] + } + } + }`) + got := selectAuditPayload(ctx, body) + require.NotNil(t, got) + + var out map[string]interface{} + require.NoError(t, json.Unmarshal(got, &out)) + ctxMap := out["context"].(map[string]interface{}) + assert.Equal(t, "select", ctxMap["action"]) + assert.Equal(t, "tx-2", ctxMap["transaction_id"]) + + order := out["message"].(map[string]interface{})["order"].(map[string]interface{}) + items := order["beckn:orderItems"].([]interface{}) + require.Len(t, items, 2) + assert.Equal(t, map[string]interface{}{"beckn:orderedItem": "item-A"}, items[0]) + assert.Equal(t, map[string]interface{}{"beckn:orderedItem": "item-B"}, items[1]) +} + +// TestSelectAuditPayload_SelectOrderExample uses a full select request payload and +// select audit rules to verify that only configured fields are projected into the +// audit log body. The request mirrors a real select with context, message.order, +// beckn:orderItems (array), beckn:acceptedOffer, and beckn:orderAttributes. +// Rules include array traversal (e.g. message.order.beckn:orderItems.beckn:orderedItem +// projects that field from each array element) and nested paths like +// message.order.beckn:orderItems.beckn:acceptedOffer.beckn:price.value. +func TestSelectAuditPayload_SelectOrderExample(t *testing.T) { + ctx := context.Background() + path := writeAuditRulesFile(t, ` +auditRules: + default: [] + select: + - context.transaction_id + - context.message_id + - context.action + - context.timestamp + - message.order + - message.order.beckn:seller + - message.order.beckn:buyer + - message.order.beckn:buyer.beckn:id + - message.order.beckn:orderItems + - message.order.beckn:orderItems.beckn:orderedItem + - message.order.beckn:orderItems.beckn:acceptedOffer + - message.order.beckn:orderItems.beckn:acceptedOffer.beckn:id + - message.order.beckn:orderItems.beckn:acceptedOffer.beckn:price + - message.order.beckn:orderItems.beckn:acceptedOffer.beckn:price.value + - message.order.beckn:orderAttributes + - message.order.beckn:orderAttributes.preferences + - message.order.beckn:orderAttributes.preferences.startTime +`) + require.NoError(t, LoadAuditFieldRules(ctx, path)) + + // Full select request example: context (version, action, domain, timestamp, ids, URIs, ttl) + // and message.order with orderStatus, seller, buyer, orderItems array (orderedItem, quantity, + // acceptedOffer with id, descriptor, items, provider, price), orderAttributes (buyerFinderFee, preferences). + body := []byte(`{ + "context": { + "version": "1.0.0", + "action": "select", + "domain": "ev_charging", + "timestamp": "2024-01-15T10:30:00Z", + "message_id": "bb9f86db-9a3d-4e9c-8c11-81c8f1a7b901", + "transaction_id": "2b4d69aa-22e4-4c78-9f56-5a7b9e2b2002", + "bap_id": "bap.example.com", + "bap_uri": "https://bap.example.com", + "ttl": "PT30S", + "bpp_id": "bpp.example.com", + "bpp_uri": "https://bpp.example.com" + }, + "message": { + "order": { + "@context": "https://raw.githubusercontent.com/beckn/protocol-specifications-new/refs/heads/main/schema/core/v2/context.jsonld", + "@type": "beckn:Order", + "beckn:orderStatus": "CREATED", + "beckn:seller": "ecopower-charging", + "beckn:buyer": { + "@context": "https://raw.githubusercontent.com/beckn/protocol-specifications-new/refs/heads/main/schema/core/v2/context.jsonld", + "@type": "beckn:Buyer", + "beckn:id": "user-123", + "beckn:role": "BUYER", + "beckn:displayName": "Ravi Kumar", + "beckn:telephone": "+91-9876543210", + "beckn:email": "ravi.kumar@example.com", + "beckn:taxID": "GSTIN29ABCDE1234F1Z5" + }, + "beckn:orderItems": [ + { + "beckn:orderedItem": "IND*ecopower-charging*cs-01*IN*ECO*BTM*01*CCS2*A*CCS2-A", + "beckn:quantity": { + "unitText": "Kilowatt Hour", + "unitCode": "KWH", + "unitQuantity": 2.5 + }, + "beckn:acceptedOffer": { + "@context": "https://raw.githubusercontent.com/beckn/protocol-specifications-new/refs/heads/main/schema/core/v2/context.jsonld", + "@type": "beckn:Offer", + "beckn:id": "offer-ccs2-60kw-kwh", + "beckn:descriptor": { + "@type": "beckn:Descriptor", + "schema:name": "Per-kWh Tariff - CCS2 60kW" + }, + "beckn:items": [ + "IND*ecopower-charging*cs-01*IN*ECO*BTM*01*CCS2*A*CCS2-A" + ], + "beckn:provider": "ecopower-charging", + "beckn:price": { + "currency": "INR", + "value": 45.0, + "applicableQuantity": { + "unitText": "Kilowatt Hour", + "unitCode": "KWH", + "unitQuantity": 1 + } + } + } + } + ], + "beckn:orderAttributes": { + "@context": "https://raw.githubusercontent.com/beckn/protocol-specifications-new/refs/heads/main/schema/EvChargingSession/v1/context.jsonld", + "@type": "ChargingSession", + "buyerFinderFee": { + "feeType": "PERCENTAGE", + "feeValue": 2.5 + }, + "preferences": { + "startTime": "2026-01-04T08:00:00+05:30", + "endTime": "2026-01-04T20:00:00+05:30" + } + } + } + } +}`) + got := selectAuditPayload(ctx, body) + require.NotNil(t, got, "selectAuditPayload should return projected body for select action") + + var out map[string]interface{} + require.NoError(t, json.Unmarshal(got, &out)) + + // Context: only transaction_id, message_id, action, timestamp + ctxMap, ok := out["context"].(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "select", ctxMap["action"]) + assert.Equal(t, "2b4d69aa-22e4-4c78-9f56-5a7b9e2b2002", ctxMap["transaction_id"]) + assert.Equal(t, "bb9f86db-9a3d-4e9c-8c11-81c8f1a7b901", ctxMap["message_id"]) + assert.Equal(t, "2024-01-15T10:30:00Z", ctxMap["timestamp"]) + _, hasBapID := ctxMap["bap_id"] + assert.False(t, hasBapID, "context should not include bap_id when not in audit rules") + + // message.order: full order merged with projected array fields + msg, ok := out["message"].(map[string]interface{}) + require.True(t, ok) + order, ok := msg["order"].(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "ecopower-charging", order["beckn:seller"]) + buyer, ok := order["beckn:buyer"].(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "user-123", buyer["beckn:id"]) + + // beckn:orderItems: array with projected fields from each element (beckn:orderedItem, beckn:acceptedOffer with id, price, price.value) + items, ok := order["beckn:orderItems"].([]interface{}) + require.True(t, ok) + require.Len(t, items, 1) + item0, ok := items[0].(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "IND*ecopower-charging*cs-01*IN*ECO*BTM*01*CCS2*A*CCS2-A", item0["beckn:orderedItem"]) + acceptedOffer, ok := item0["beckn:acceptedOffer"].(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "offer-ccs2-60kw-kwh", acceptedOffer["beckn:id"]) + price, ok := acceptedOffer["beckn:price"].(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, 45.0, price["value"]) + + // beckn:orderAttributes: only preferences and preferences.startTime + orderAttrs, ok := order["beckn:orderAttributes"].(map[string]interface{}) + require.True(t, ok) + prefs, ok := orderAttrs["preferences"].(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "2026-01-04T08:00:00+05:30", prefs["startTime"]) +} diff --git a/pkg/telemetry/metrics_test.go b/pkg/telemetry/metrics_test.go index 1c3663a..289edd3 100644 --- a/pkg/telemetry/metrics_test.go +++ b/pkg/telemetry/metrics_test.go @@ -2,9 +2,9 @@ package telemetry import ( "context" - "net/http/httptest" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -13,16 +13,31 @@ func TestNewProviderAndMetrics(t *testing.T) { provider, err := NewTestProvider(ctx) require.NoError(t, err) require.NotNil(t, provider) - require.NotNil(t, provider.MetricsHandler) + require.NotNil(t, provider.MeterProvider, "MeterProvider should be set") metrics, err := GetMetrics(ctx) require.NoError(t, err) require.NotNil(t, metrics) - rec := httptest.NewRecorder() - req := httptest.NewRequest("GET", "/metrics", nil) - provider.MetricsHandler.ServeHTTP(rec, req) - require.Equal(t, 200, rec.Code) - - require.NoError(t, provider.Shutdown(context.Background())) + require.NoError(t, provider.Shutdown(ctx)) +} + +func TestNewProviderAndTraces(t *testing.T) { + ctx := context.Background() + provider, sr, err := NewTestProviderWithTrace(ctx) + require.NoError(t, err) + require.NotNil(t, provider) + require.NotNil(t, provider.MeterProvider, "MeterProvider should be set") + require.NotNil(t, provider.TraceProvider, "TraceProvider should be set") + require.NotNil(t, sr, "SpanRecorder should be set") + + tracer := provider.TraceProvider.Tracer("test-instrumentation") + _, span := tracer.Start(ctx, "test-span") + span.End() + + ended := sr.Ended() + require.Len(t, ended, 1, "exactly one span should be recorded") + assert.Equal(t, "test-span", ended[0].Name(), "recorded span should have expected name") + + require.NoError(t, provider.Shutdown(ctx)) } diff --git a/pkg/telemetry/pluginMetrics.go b/pkg/telemetry/pluginMetrics.go index c6d83ce..7bb73c8 100644 --- a/pkg/telemetry/pluginMetrics.go +++ b/pkg/telemetry/pluginMetrics.go @@ -30,24 +30,52 @@ var ( // Attribute keys shared across instruments. var ( - AttrModule = attribute.Key("module") - AttrSubsystem = attribute.Key("subsystem") - AttrName = attribute.Key("name") - AttrStep = attribute.Key("step") - AttrRole = attribute.Key("role") - AttrAction = attribute.Key("action") - AttrHTTPMethod = attribute.Key("http_method") - AttrHTTPStatus = attribute.Key("http_status_code") - AttrStatus = attribute.Key("status") - AttrErrorType = attribute.Key("error_type") - AttrPluginID = attribute.Key("plugin_id") - AttrPluginType = attribute.Key("plugin_type") - AttrOperation = attribute.Key("operation") - AttrRouteType = attribute.Key("route_type") - AttrTargetType = attribute.Key("target_type") - AttrSchemaVersion = attribute.Key("schema_version") + AttrModule = attribute.Key("module") + AttrCaller = attribute.Key("caller") // who is calling bab/bpp with there name + AttrStep = attribute.Key("step") + AttrRole = attribute.Key("role") + AttrAction = attribute.Key("action") // action is context.action + AttrHTTPStatus = attribute.Key("http_status_code") // status code is 2xx/3xx/4xx/5xx + AttrStatus = attribute.Key("status") + AttrErrorType = attribute.Key("error_type") + AttrPluginID = attribute.Key("plugin_id") // id for the plugine + AttrPluginType = attribute.Key("plugin_type") // type for the plugine + AttrOperation = attribute.Key("operation") + AttrRouteType = attribute.Key("route_type") // publish/ uri + AttrTargetType = attribute.Key("target_type") + AttrSchemaVersion = attribute.Key("schema_version") + AttrMetricUUID = attribute.Key("metric_uuid") + AttrMetricCode = attribute.Key("metric.code") + AttrMetricCategory = attribute.Key("metric.category") + AttrMetricGranularity = attribute.Key("metric.granularity") + AttrMetricFrequency = attribute.Key("metric.frequency") + AttrObservedTimeUnixNano = attribute.Key("observedTimeUnixNano") + AttrMatricLabels = attribute.Key("metric.labels") ) +var ( + networkMetricsCfgMu sync.RWMutex + networkMetricsGranularity = "10mim" // default + networkMetricsFrequency = "10mim" // default +) + +func SetNetworkMetricsConfig(granularity, frequency string) { + networkMetricsCfgMu.Lock() + defer networkMetricsCfgMu.Unlock() + if granularity != "" { + networkMetricsGranularity = granularity + } + if frequency != "" { + networkMetricsFrequency = frequency + } +} + +func GetNetworkMetricsConfig() (granularity, frequency string) { + networkMetricsCfgMu.RLock() + defer networkMetricsCfgMu.RUnlock() + return networkMetricsGranularity, networkMetricsFrequency +} + // GetMetrics lazily initializes instruments and returns a cached reference. func GetMetrics(ctx context.Context) (*Metrics, error) { metricsOnce.Do(func() { @@ -58,8 +86,8 @@ func GetMetrics(ctx context.Context) (*Metrics, error) { func newMetrics() (*Metrics, error) { meter := otel.GetMeterProvider().Meter( - "github.com/beckn-one/beckn-onix/telemetry", - metric.WithInstrumentationVersion("1.0.0"), + ScopeName, + metric.WithInstrumentationVersion(ScopeVersion), ) m := &Metrics{} diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index c5b70df..3400e93 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -2,14 +2,21 @@ package telemetry import ( "context" - "net/http" + "go.opentelemetry.io/otel/sdk/log" "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/trace" +) + +const ( + ScopeName = "beckn-onix" + ScopeVersion = "v2.0.0" ) // Provider holds references to telemetry components that need coordinated shutdown. type Provider struct { - MeterProvider *metric.MeterProvider - MetricsHandler http.Handler - Shutdown func(context.Context) error + MeterProvider *metric.MeterProvider + TraceProvider *trace.TracerProvider + LogProvider *log.LoggerProvider + Shutdown func(context.Context) error } diff --git a/pkg/telemetry/test_helper.go b/pkg/telemetry/test_helper.go index 627965b..0e81dfc 100644 --- a/pkg/telemetry/test_helper.go +++ b/pkg/telemetry/test_helper.go @@ -4,12 +4,13 @@ import ( "context" clientprom "github.com/prometheus/client_golang/prometheus" - clientpromhttp "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" otelprom "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" ) // NewTestProvider creates a minimal telemetry provider for testing purposes. @@ -45,10 +46,56 @@ func NewTestProvider(ctx context.Context) (*Provider, error) { otel.SetMeterProvider(meterProvider) return &Provider{ - MeterProvider: meterProvider, - MetricsHandler: clientpromhttp.HandlerFor(registry, clientpromhttp.HandlerOpts{}), + MeterProvider: meterProvider, Shutdown: func(ctx context.Context) error { return meterProvider.Shutdown(ctx) }, }, nil } + +// NewTestProviderWithTrace creates a telemetry provider with both metrics and +// tracing enabled, using an in-memory span recorder. It returns the provider +// and the SpanRecorder so tests can assert on recorded spans. +func NewTestProviderWithTrace(ctx context.Context) (*Provider, *tracetest.SpanRecorder, error) { + provider, err := NewTestProvider(ctx) + if err != nil { + return nil, nil, err + } + + res, err := resource.New( + ctx, + resource.WithAttributes( + attribute.String("service.name", "test-service"), + attribute.String("service.version", "test"), + attribute.String("deployment.environment", "test"), + ), + ) + if err != nil { + return nil, nil, err + } + + sr := tracetest.NewSpanRecorder() + traceProvider := trace.NewTracerProvider( + trace.WithSpanProcessor(sr), + trace.WithResource(res), + ) + otel.SetTracerProvider(traceProvider) + + return &Provider{ + MeterProvider: provider.MeterProvider, + TraceProvider: traceProvider, + Shutdown: func(ctx context.Context) error { + var errs []error + if err := traceProvider.Shutdown(ctx); err != nil { + errs = append(errs, err) + } + if err := provider.MeterProvider.Shutdown(ctx); err != nil { + errs = append(errs, err) + } + if len(errs) > 0 { + return errs[0] + } + return nil + }, + }, sr, nil +}