can create matric in there respective module.

This commit is contained in:
Manendra Pal Singh
2025-11-24 10:25:26 +05:30
parent c38e7b75ca
commit 88eef682df
12 changed files with 352 additions and 158 deletions

View File

@@ -37,7 +37,7 @@ type Config struct {
// Cache wraps a Redis client to provide basic caching operations.
type Cache struct {
Client RedisClient
metrics *telemetry.Metrics
metrics *CacheMetrics
}
// Error variables to describe common failure modes.
@@ -97,7 +97,7 @@ func New(ctx context.Context, cfg *Config) (*Cache, func() error, error) {
}
}
metrics, _ := telemetry.GetMetrics(ctx)
metrics, _ := GetCacheMetrics(ctx)
log.Infof(ctx, "Cache connection to Redis established successfully")
return &Cache{Client: client, metrics: metrics}, client.Close, nil

View File

@@ -0,0 +1,69 @@
package cache
import (
"context"
"fmt"
"sync"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)
// CacheMetrics exposes cache-related metric instruments.
type CacheMetrics struct {
CacheOperationsTotal metric.Int64Counter
CacheHitsTotal metric.Int64Counter
CacheMissesTotal metric.Int64Counter
}
var (
cacheMetricsInstance *CacheMetrics
cacheMetricsOnce sync.Once
cacheMetricsErr error
)
// GetCacheMetrics lazily initializes cache metric instruments and returns a cached reference.
func GetCacheMetrics(ctx context.Context) (*CacheMetrics, error) {
cacheMetricsOnce.Do(func() {
cacheMetricsInstance, cacheMetricsErr = newCacheMetrics()
})
return cacheMetricsInstance, cacheMetricsErr
}
func newCacheMetrics() (*CacheMetrics, error) {
meter := otel.GetMeterProvider().Meter(
"github.com/beckn-one/beckn-onix/cache",
metric.WithInstrumentationVersion("1.0.0"),
)
m := &CacheMetrics{}
var err error
if m.CacheOperationsTotal, err = meter.Int64Counter(
"onix_cache_operations_total",
metric.WithDescription("Redis cache operations"),
metric.WithUnit("{operation}"),
); err != nil {
return nil, fmt.Errorf("onix_cache_operations_total: %w", err)
}
if m.CacheHitsTotal, err = meter.Int64Counter(
"onix_cache_hits_total",
metric.WithDescription("Redis cache hits"),
metric.WithUnit("{hit}"),
); err != nil {
return nil, fmt.Errorf("onix_cache_hits_total: %w", err)
}
if m.CacheMissesTotal, err = meter.Int64Counter(
"onix_cache_misses_total",
metric.WithDescription("Redis cache misses"),
metric.WithUnit("{miss}"),
); err != nil {
return nil, fmt.Errorf("onix_cache_misses_total: %w", err)
}
return m, nil
}

View File

@@ -0,0 +1,97 @@
package otelmetrics
import (
"context"
"fmt"
"sync"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)
// HTTPMetrics exposes HTTP-related metric instruments.
type HTTPMetrics struct {
HTTPRequestsTotal metric.Int64Counter
HTTPRequestDuration metric.Float64Histogram
HTTPRequestsInFlight metric.Int64UpDownCounter
HTTPRequestSize metric.Int64Histogram
HTTPResponseSize metric.Int64Histogram
BecknMessagesTotal metric.Int64Counter
}
var (
httpMetricsInstance *HTTPMetrics
httpMetricsOnce sync.Once
httpMetricsErr error
)
// GetHTTPMetrics lazily initializes HTTP metric instruments and returns a cached reference.
func GetHTTPMetrics(ctx context.Context) (*HTTPMetrics, error) {
httpMetricsOnce.Do(func() {
httpMetricsInstance, httpMetricsErr = newHTTPMetrics()
})
return httpMetricsInstance, httpMetricsErr
}
func newHTTPMetrics() (*HTTPMetrics, error) {
meter := otel.GetMeterProvider().Meter(
"github.com/beckn-one/beckn-onix/otelmetrics",
metric.WithInstrumentationVersion("1.0.0"),
)
m := &HTTPMetrics{}
var err error
if m.HTTPRequestsTotal, err = meter.Int64Counter(
"http_server_requests_total",
metric.WithDescription("Total number of HTTP requests processed"),
metric.WithUnit("{request}"),
); err != nil {
return nil, fmt.Errorf("http_server_requests_total: %w", err)
}
if m.HTTPRequestDuration, err = meter.Float64Histogram(
"http_server_request_duration_seconds",
metric.WithDescription("HTTP request duration in seconds"),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10),
); err != nil {
return nil, fmt.Errorf("http_server_request_duration_seconds: %w", err)
}
if m.HTTPRequestsInFlight, err = meter.Int64UpDownCounter(
"http_server_requests_in_flight",
metric.WithDescription("Number of HTTP requests currently being processed"),
metric.WithUnit("{request}"),
); err != nil {
return nil, fmt.Errorf("http_server_requests_in_flight: %w", err)
}
if m.HTTPRequestSize, err = meter.Int64Histogram(
"http_server_request_size_bytes",
metric.WithDescription("Size of HTTP request payloads"),
metric.WithUnit("By"),
metric.WithExplicitBucketBoundaries(100, 1000, 10000, 100000, 1000000),
); err != nil {
return nil, fmt.Errorf("http_server_request_size_bytes: %w", err)
}
if m.HTTPResponseSize, err = meter.Int64Histogram(
"http_server_response_size_bytes",
metric.WithDescription("Size of HTTP responses"),
metric.WithUnit("By"),
metric.WithExplicitBucketBoundaries(100, 1000, 10000, 100000, 1000000),
); err != nil {
return nil, fmt.Errorf("http_server_response_size_bytes: %w", err)
}
if m.BecknMessagesTotal, err = meter.Int64Counter(
"beckn_messages_total",
metric.WithDescription("Total Beckn protocol messages processed"),
metric.WithUnit("{message}"),
); err != nil {
return nil, fmt.Errorf("beckn_messages_total: %w", err)
}
return m, nil
}

View File

@@ -15,7 +15,7 @@ import (
// Middleware instruments inbound HTTP handlers with OpenTelemetry metrics.
type Middleware struct {
metrics *telemetry.Metrics
metrics *HTTPMetrics
enabled bool
}
@@ -23,7 +23,7 @@ type Middleware struct {
func New(ctx context.Context, cfg map[string]string) (*Middleware, error) {
enabled := cfg["enabled"] != "false"
metrics, err := telemetry.GetMetrics(ctx)
metrics, err := GetHTTPMetrics(ctx)
if err != nil {
log.Warnf(ctx, "OpenTelemetry metrics unavailable: %v", err)
}

View File

@@ -11,27 +11,15 @@ import (
)
// Metrics exposes strongly typed metric instruments used across the adapter.
// Note: Most metrics have been moved to their respective modules. Only plugin-level
// metrics remain here. See:
// - HTTP metrics: pkg/plugin/implementation/otelmetrics/metrics.go
// - Step metrics: pkg/telemetry/step_metrics.go
// - Cache metrics: pkg/plugin/implementation/cache/metrics.go
// - Handler metrics: core/module/handler/metrics.go
type Metrics struct {
HTTPRequestsTotal metric.Int64Counter
HTTPRequestDuration metric.Float64Histogram
HTTPRequestsInFlight metric.Int64UpDownCounter
HTTPRequestSize metric.Int64Histogram
HTTPResponseSize metric.Int64Histogram
StepExecutionDuration metric.Float64Histogram
StepExecutionTotal metric.Int64Counter
StepErrorsTotal metric.Int64Counter
PluginExecutionDuration metric.Float64Histogram
PluginErrorsTotal metric.Int64Counter
BecknMessagesTotal metric.Int64Counter
SignatureValidationsTotal metric.Int64Counter
SchemaValidationsTotal metric.Int64Counter
CacheOperationsTotal metric.Int64Counter
CacheHitsTotal metric.Int64Counter
CacheMissesTotal metric.Int64Counter
RoutingDecisionsTotal metric.Int64Counter
}
var (
@@ -77,74 +65,6 @@ func newMetrics() (*Metrics, error) {
m := &Metrics{}
var err error
if m.HTTPRequestsTotal, err = meter.Int64Counter(
"http_server_requests_total",
metric.WithDescription("Total number of HTTP requests processed"),
metric.WithUnit("{request}"),
); err != nil {
return nil, fmt.Errorf("http_server_requests_total: %w", err)
}
if m.HTTPRequestDuration, err = meter.Float64Histogram(
"http_server_request_duration_seconds",
metric.WithDescription("HTTP request duration in seconds"),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10),
); err != nil {
return nil, fmt.Errorf("http_server_request_duration_seconds: %w", err)
}
if m.HTTPRequestsInFlight, err = meter.Int64UpDownCounter(
"http_server_requests_in_flight",
metric.WithDescription("Number of HTTP requests currently being processed"),
metric.WithUnit("{request}"),
); err != nil {
return nil, fmt.Errorf("http_server_requests_in_flight: %w", err)
}
if m.HTTPRequestSize, err = meter.Int64Histogram(
"http_server_request_size_bytes",
metric.WithDescription("Size of HTTP request payloads"),
metric.WithUnit("By"),
metric.WithExplicitBucketBoundaries(100, 1000, 10000, 100000, 1000000),
); err != nil {
return nil, fmt.Errorf("http_server_request_size_bytes: %w", err)
}
if m.HTTPResponseSize, err = meter.Int64Histogram(
"http_server_response_size_bytes",
metric.WithDescription("Size of HTTP responses"),
metric.WithUnit("By"),
metric.WithExplicitBucketBoundaries(100, 1000, 10000, 100000, 1000000),
); err != nil {
return nil, fmt.Errorf("http_server_response_size_bytes: %w", err)
}
if m.StepExecutionDuration, err = meter.Float64Histogram(
"onix_step_execution_duration_seconds",
metric.WithDescription("Duration of individual processing steps"),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5),
); err != nil {
return nil, fmt.Errorf("onix_step_execution_duration_seconds: %w", err)
}
if m.StepExecutionTotal, err = meter.Int64Counter(
"onix_step_executions_total",
metric.WithDescription("Total processing step executions"),
metric.WithUnit("{execution}"),
); err != nil {
return nil, fmt.Errorf("onix_step_executions_total: %w", err)
}
if m.StepErrorsTotal, err = meter.Int64Counter(
"onix_step_errors_total",
metric.WithDescription("Processing step errors"),
metric.WithUnit("{error}"),
); err != nil {
return nil, fmt.Errorf("onix_step_errors_total: %w", err)
}
if m.PluginExecutionDuration, err = meter.Float64Histogram(
"onix_plugin_execution_duration_seconds",
metric.WithDescription("Plugin execution time"),
@@ -162,61 +82,5 @@ func newMetrics() (*Metrics, error) {
return nil, fmt.Errorf("onix_plugin_errors_total: %w", err)
}
if m.BecknMessagesTotal, err = meter.Int64Counter(
"beckn_messages_total",
metric.WithDescription("Total Beckn protocol messages processed"),
metric.WithUnit("{message}"),
); err != nil {
return nil, fmt.Errorf("beckn_messages_total: %w", err)
}
if m.SignatureValidationsTotal, err = meter.Int64Counter(
"beckn_signature_validations_total",
metric.WithDescription("Signature validation attempts"),
metric.WithUnit("{validation}"),
); err != nil {
return nil, fmt.Errorf("beckn_signature_validations_total: %w", err)
}
if m.SchemaValidationsTotal, err = meter.Int64Counter(
"beckn_schema_validations_total",
metric.WithDescription("Schema validation attempts"),
metric.WithUnit("{validation}"),
); err != nil {
return nil, fmt.Errorf("beckn_schema_validations_total: %w", err)
}
if m.CacheOperationsTotal, err = meter.Int64Counter(
"onix_cache_operations_total",
metric.WithDescription("Redis cache operations"),
metric.WithUnit("{operation}"),
); err != nil {
return nil, fmt.Errorf("onix_cache_operations_total: %w", err)
}
if m.CacheHitsTotal, err = meter.Int64Counter(
"onix_cache_hits_total",
metric.WithDescription("Redis cache hits"),
metric.WithUnit("{hit}"),
); err != nil {
return nil, fmt.Errorf("onix_cache_hits_total: %w", err)
}
if m.CacheMissesTotal, err = meter.Int64Counter(
"onix_cache_misses_total",
metric.WithDescription("Redis cache misses"),
metric.WithUnit("{miss}"),
); err != nil {
return nil, fmt.Errorf("onix_cache_misses_total: %w", err)
}
if m.RoutingDecisionsTotal, err = meter.Int64Counter(
"onix_routing_decisions_total",
metric.WithDescription("Routing decisions taken by handler"),
metric.WithUnit("{decision}"),
); err != nil {
return nil, fmt.Errorf("onix_routing_decisions_total: %w", err)
}
return m, nil
}

View File

@@ -19,12 +19,12 @@ type InstrumentedStep struct {
step definition.Step
stepName string
moduleName string
metrics *Metrics
metrics *StepMetrics
}
// NewInstrumentedStep returns a telemetry enabled wrapper around a definition.Step.
func NewInstrumentedStep(step definition.Step, stepName, moduleName string) (*InstrumentedStep, error) {
metrics, err := GetMetrics(context.Background())
metrics, err := GetStepMetrics(context.Background())
if err != nil {
return nil, err
}

View File

@@ -0,0 +1,69 @@
package telemetry
import (
"context"
"fmt"
"sync"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)
// StepMetrics exposes step execution metric instruments.
type StepMetrics struct {
StepExecutionDuration metric.Float64Histogram
StepExecutionTotal metric.Int64Counter
StepErrorsTotal metric.Int64Counter
}
var (
stepMetricsInstance *StepMetrics
stepMetricsOnce sync.Once
stepMetricsErr error
)
// GetStepMetrics lazily initializes step metric instruments and returns a cached reference.
func GetStepMetrics(ctx context.Context) (*StepMetrics, error) {
stepMetricsOnce.Do(func() {
stepMetricsInstance, stepMetricsErr = newStepMetrics()
})
return stepMetricsInstance, stepMetricsErr
}
func newStepMetrics() (*StepMetrics, error) {
meter := otel.GetMeterProvider().Meter(
"github.com/beckn-one/beckn-onix/telemetry",
metric.WithInstrumentationVersion("1.0.0"),
)
m := &StepMetrics{}
var err error
if m.StepExecutionDuration, err = meter.Float64Histogram(
"onix_step_execution_duration_seconds",
metric.WithDescription("Duration of individual processing steps"),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5),
); err != nil {
return nil, fmt.Errorf("onix_step_execution_duration_seconds: %w", err)
}
if m.StepExecutionTotal, err = meter.Int64Counter(
"onix_step_executions_total",
metric.WithDescription("Total processing step executions"),
metric.WithUnit("{execution}"),
); err != nil {
return nil, fmt.Errorf("onix_step_executions_total: %w", err)
}
if m.StepErrorsTotal, err = meter.Int64Counter(
"onix_step_errors_total",
metric.WithDescription("Processing step errors"),
metric.WithUnit("{error}"),
); err != nil {
return nil, fmt.Errorf("onix_step_errors_total: %w", err)
}
return m, nil
}