Feat: update the pr as per comment
This commit is contained in:
@@ -233,7 +233,7 @@ func addParentIdCtx(ctx context.Context, config *Config) context.Context {
|
|||||||
log.Infof(ctx, "Adding POD name: %s", p)
|
log.Infof(ctx, "Adding POD name: %s", p)
|
||||||
podName = p
|
podName = p
|
||||||
} else {
|
} else {
|
||||||
log.Info(ctx, "POD_NAME environment variable not set falling back to hostname")
|
log.Info(ctx, "POD_NAME environment variable not set, falling back to hostname")
|
||||||
if hostname, err := os.Hostname(); err == nil {
|
if hostname, err := os.Hostname(); err == nil {
|
||||||
log.Infof(ctx, "Setting POD name as hostname: %s", hostname)
|
log.Infof(ctx, "Setting POD name as hostname: %s", hostname)
|
||||||
podName = hostname
|
podName = hostname
|
||||||
@@ -243,16 +243,21 @@ func addParentIdCtx(ctx context.Context, config *Config) context.Context {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, m := range config.Modules {
|
for _, m := range config.Modules {
|
||||||
if m.Handler.Role != "" && m.Handler.SubscriberID != "" {
|
if m.Handler.Role == "" || m.Handler.SubscriberID == "" {
|
||||||
parentID = string(m.Handler.Role) + ":" + m.Handler.SubscriberID + ":" + podName
|
continue
|
||||||
break
|
}
|
||||||
|
candidate := string(m.Handler.Role) + ":" + m.Handler.SubscriberID + ":" + podName
|
||||||
|
if parentID == "" {
|
||||||
|
parentID = candidate
|
||||||
|
} else if candidate != parentID {
|
||||||
|
log.Warnf(ctx, "Multiple distinct role:subscriberID pairs found in modules (using %q, also saw %q); consider explicit parent_id config", parentID, candidate)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if parentID != "" {
|
if parentID != "" {
|
||||||
ctx = context.WithValue(ctx, model.ContextKeyParentID, parentID)
|
ctx = context.WithValue(ctx, model.ContextKeyParentID, parentID)
|
||||||
} else {
|
} else {
|
||||||
log.Warnf(ctx, "Failed to find parent ID in config please add the role and subscriber_id in the handler config ")
|
log.Warnf(ctx, "Failed to find parent ID in config; add role and subscriber_id to the handler config")
|
||||||
}
|
}
|
||||||
return ctx
|
return ctx
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -65,4 +65,3 @@ func newHandlerMetrics() (*HandlerMetrics, error) {
|
|||||||
|
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/beckn-one/beckn-onix/pkg/telemetry"
|
"github.com/beckn-one/beckn-onix/pkg/telemetry"
|
||||||
@@ -79,10 +80,10 @@ func RecordHTTPRequest(ctx context.Context, statusCode int, action, role, sender
|
|||||||
|
|
||||||
metric_code := action + "_api_total_count"
|
metric_code := action + "_api_total_count"
|
||||||
category := "NetworkHealth"
|
category := "NetworkHealth"
|
||||||
if action == "/search" || action == "/discovery" {
|
if strings.HasSuffix(action, "/search") || strings.HasSuffix(action, "/discovery") {
|
||||||
category = "Discovery"
|
category = "Discovery"
|
||||||
}
|
}
|
||||||
attributes = append(attributes, specHttpMetricAttr(metric_code, category)...) //TODO: need to update as per the furthur discussion
|
attributes = append(attributes, specHttpMetricAttr(metric_code, category)...)
|
||||||
m.HttpRequestCount.Add(ctx, 1, metric.WithAttributes(attributes...))
|
m.HttpRequestCount.Add(ctx, 1, metric.WithAttributes(attributes...))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -122,19 +122,7 @@ func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
record: nil,
|
record: nil,
|
||||||
}
|
}
|
||||||
|
|
||||||
selfID := h.SubscriberID
|
senderID, receiverID := h.resolveDirection(r.Context())
|
||||||
remoteID := ""
|
|
||||||
if v, ok := r.Context().Value(model.ContextKeyRemoteID).(string); ok {
|
|
||||||
remoteID = v
|
|
||||||
}
|
|
||||||
var senderID, receiverID string
|
|
||||||
if strings.Contains(h.moduleName, "Caller") {
|
|
||||||
senderID = selfID
|
|
||||||
receiverID = remoteID
|
|
||||||
} else {
|
|
||||||
senderID = remoteID
|
|
||||||
receiverID = selfID
|
|
||||||
}
|
|
||||||
httpMeter, _ := GetHTTPMetrics(r.Context())
|
httpMeter, _ := GetHTTPMetrics(r.Context())
|
||||||
if httpMeter != nil {
|
if httpMeter != nil {
|
||||||
recordOnce = func() {
|
recordOnce = func() {
|
||||||
@@ -161,7 +149,7 @@ func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
body := stepCtx.Body
|
body := stepCtx.Body
|
||||||
go telemetry.EmitAuditLogs(r.Context(), body, auditlog.Int("http.response.status_code", wrapped.statusCode), auditlog.String("http.response.error", errString(err)))
|
telemetry.EmitAuditLogs(r.Context(), body, auditlog.Int("http.response.status_code", wrapped.statusCode), auditlog.String("http.response.error", errString(err)))
|
||||||
span.End()
|
span.End()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@@ -385,21 +373,17 @@ func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Conf
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func setBecknAttr(span trace.Span, r *http.Request, h *stdHandler) {
|
func (h *stdHandler) resolveDirection(ctx context.Context) (senderID, receiverID string) {
|
||||||
selfID := h.SubscriberID
|
selfID := h.SubscriberID
|
||||||
remoteID := ""
|
remoteID, _ := ctx.Value(model.ContextKeyRemoteID).(string)
|
||||||
if v, ok := r.Context().Value(model.ContextKeyRemoteID).(string); ok {
|
|
||||||
remoteID = v
|
|
||||||
}
|
|
||||||
|
|
||||||
var senderID, receiverID string
|
|
||||||
if strings.Contains(h.moduleName, "Caller") {
|
if strings.Contains(h.moduleName, "Caller") {
|
||||||
senderID = selfID
|
return selfID, remoteID
|
||||||
receiverID = remoteID
|
|
||||||
} else {
|
|
||||||
senderID = remoteID
|
|
||||||
receiverID = selfID
|
|
||||||
}
|
}
|
||||||
|
return remoteID, selfID
|
||||||
|
}
|
||||||
|
|
||||||
|
func setBecknAttr(span trace.Span, r *http.Request, h *stdHandler) {
|
||||||
|
senderID, receiverID := h.resolveDirection(r.Context())
|
||||||
attrs := []attribute.KeyValue{
|
attrs := []attribute.KeyValue{
|
||||||
telemetry.AttrRecipientID.String(receiverID),
|
telemetry.AttrRecipientID.String(receiverID),
|
||||||
telemetry.AttrSenderID.String(senderID),
|
telemetry.AttrSenderID.String(senderID),
|
||||||
|
|||||||
@@ -41,28 +41,25 @@ func (s *signStep) Run(ctx *model.StepContext) error {
|
|||||||
return model.NewBadReqErr(fmt.Errorf("subscriberID not set"))
|
return model.NewBadReqErr(fmt.Errorf("subscriberID not set"))
|
||||||
}
|
}
|
||||||
|
|
||||||
tracer := otel.Tracer("beckn-onix")
|
tracer := otel.Tracer(telemetry.ScopeName, trace.WithInstrumentationVersion(telemetry.ScopeVersion))
|
||||||
|
|
||||||
var keySet *model.Keyset
|
var keySet *model.Keyset
|
||||||
{
|
{
|
||||||
// to create span to finding the key set
|
|
||||||
keySetCtx, keySetSpan := tracer.Start(ctx.Context, "keyset")
|
keySetCtx, keySetSpan := tracer.Start(ctx.Context, "keyset")
|
||||||
defer keySetSpan.End()
|
|
||||||
ks, err := s.km.Keyset(keySetCtx, ctx.SubID)
|
ks, err := s.km.Keyset(keySetCtx, ctx.SubID)
|
||||||
|
keySetSpan.End()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get signing key: %w", err)
|
return fmt.Errorf("failed to get signing key: %w", err)
|
||||||
}
|
}
|
||||||
keySet = ks
|
keySet = ks
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
// to create span for the signa
|
|
||||||
signerCtx, signerSpan := tracer.Start(ctx.Context, "sign")
|
signerCtx, signerSpan := tracer.Start(ctx.Context, "sign")
|
||||||
defer signerSpan.End()
|
|
||||||
createdAt := time.Now().Unix()
|
createdAt := time.Now().Unix()
|
||||||
validTill := time.Now().Add(5 * time.Minute).Unix()
|
validTill := time.Now().Add(5 * time.Minute).Unix()
|
||||||
sign, err := s.signer.Sign(signerCtx, ctx.Body, keySet.SigningPrivate, createdAt, validTill)
|
sign, err := s.signer.Sign(signerCtx, ctx.Body, keySet.SigningPrivate, createdAt, validTill)
|
||||||
|
signerSpan.End()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to sign request: %w", err)
|
return fmt.Errorf("failed to sign request: %w", err)
|
||||||
}
|
}
|
||||||
@@ -73,7 +70,6 @@ func (s *signStep) Run(ctx *model.StepContext) error {
|
|||||||
header = model.AuthHeaderGateway
|
header = model.AuthHeaderGateway
|
||||||
}
|
}
|
||||||
ctx.Request.Header.Set(header, authHeader)
|
ctx.Request.Header.Set(header, authHeader)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -196,46 +196,41 @@ services:
|
|||||||
- zipkin
|
- zipkin
|
||||||
- loki
|
- loki
|
||||||
|
|
||||||
bpp-client:
|
sandbox-bap:
|
||||||
image: fidedocker/protocol-server
|
container_name: sandbox-bap
|
||||||
container_name: bpp-client
|
image: fidedocker/sandbox-2.0:latest
|
||||||
platform: linux/amd64
|
platform: linux/amd64
|
||||||
networks:
|
|
||||||
- beckn_network
|
|
||||||
ports:
|
|
||||||
- "6001:6001"
|
|
||||||
restart: unless-stopped
|
|
||||||
volumes:
|
|
||||||
- bpp_client_config_volume:/usr/src/app/config
|
|
||||||
- bpp_client_schemas_volume:/usr/src/app/schemas
|
|
||||||
- bpp_client_logs_volume:/usr/src/app/logs
|
|
||||||
|
|
||||||
bpp-network:
|
|
||||||
image: fidedocker/protocol-server
|
|
||||||
container_name: bpp-network
|
|
||||||
platform: linux/amd64
|
|
||||||
networks:
|
|
||||||
- beckn_network
|
|
||||||
ports:
|
|
||||||
- "6002:6002"
|
|
||||||
restart: unless-stopped
|
|
||||||
volumes:
|
|
||||||
- bpp_network_config_volume:/usr/src/app/config
|
|
||||||
- bpp_network_schemas_volume:/usr/src/app/schemas
|
|
||||||
- bpp_network_logs_volume:/usr/src/app/logs
|
|
||||||
|
|
||||||
sandbox-api:
|
|
||||||
image: fidedocker/sandbox-api
|
|
||||||
container_name: sandbox-api
|
|
||||||
platform: linux/amd64
|
|
||||||
networks:
|
|
||||||
- beckn_network
|
|
||||||
ports:
|
|
||||||
- "4010:4000"
|
|
||||||
restart: unless-stopped
|
|
||||||
environment:
|
environment:
|
||||||
- PORT=4000
|
- NODE_ENV=production
|
||||||
- WEBHOOK_URL=http://host.docker.internal:3001/webhook
|
- PORT=3001
|
||||||
|
ports:
|
||||||
|
- "3001:3001"
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "wget", "-qO-", "http://localhost:3001/api/health"]
|
||||||
|
interval: 10s
|
||||||
|
timeout: 3s
|
||||||
|
retries: 5
|
||||||
|
start_period: 10s
|
||||||
|
networks:
|
||||||
|
- beckn_network
|
||||||
|
|
||||||
|
sandbox-bpp:
|
||||||
|
container_name: sandbox-bpp
|
||||||
|
image: fidedocker/sandbox-2.0:latest
|
||||||
|
platform: linux/amd64
|
||||||
|
environment:
|
||||||
|
- NODE_ENV=production
|
||||||
|
- PORT=3002
|
||||||
|
ports:
|
||||||
|
- "3002:3002"
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "wget", "-qO-", "http://localhost:3002/api/health"]
|
||||||
|
interval: 10s
|
||||||
|
timeout: 3s
|
||||||
|
retries: 5
|
||||||
|
start_period: 10s
|
||||||
|
networks:
|
||||||
|
- beckn_network
|
||||||
|
|
||||||
networks:
|
networks:
|
||||||
observability:
|
observability:
|
||||||
@@ -248,17 +243,3 @@ volumes:
|
|||||||
prometheus_data:
|
prometheus_data:
|
||||||
grafana_data:
|
grafana_data:
|
||||||
loki_data:
|
loki_data:
|
||||||
bpp_client_config_volume:
|
|
||||||
name: bpp_client_config_volume
|
|
||||||
external: true
|
|
||||||
bpp_client_schemas_volume:
|
|
||||||
name: bpp_client_schemas_volume
|
|
||||||
bpp_client_logs_volume:
|
|
||||||
name: bpp_client_logs_volume
|
|
||||||
bpp_network_config_volume:
|
|
||||||
name: bpp_network_config_volume
|
|
||||||
external: true
|
|
||||||
bpp_network_schemas_volume:
|
|
||||||
name: bpp_network_schemas_volume
|
|
||||||
bpp_network_logs_volume:
|
|
||||||
name: bpp_network_logs_volume
|
|
||||||
|
|||||||
@@ -250,11 +250,11 @@ func TestError(t *testing.T) {
|
|||||||
func TestRequest(t *testing.T) {
|
func TestRequest(t *testing.T) {
|
||||||
logPath := setupLogger(t, InfoLevel)
|
logPath := setupLogger(t, InfoLevel)
|
||||||
ctx := context.WithValue(context.Background(), requestID, "abc-123")
|
ctx := context.WithValue(context.Background(), requestID, "abc-123")
|
||||||
ctx = context.WithValue(context.Background(), transaction_id, "transaction-id-123-")
|
ctx = context.WithValue(ctx, transaction_id, "transaction-id-123-")
|
||||||
ctx = context.WithValue(context.Background(), message_id, "message-id-123")
|
ctx = context.WithValue(ctx, message_id, "message-id-123")
|
||||||
ctx = context.WithValue(context.Background(), subscriber_id, "subscriber-id-123")
|
ctx = context.WithValue(ctx, subscriber_id, "subscriber-id-123")
|
||||||
ctx = context.WithValue(context.Background(), module_id, "module-id-123")
|
ctx = context.WithValue(ctx, module_id, "module-id-123")
|
||||||
ctx = context.WithValue(context.Background(), parent_id, "parent-id-123")
|
ctx = context.WithValue(ctx, parent_id, "parent-id-123")
|
||||||
|
|
||||||
req, _ := http.NewRequest("POST", "/api/test", bytes.NewBuffer([]byte(`{"key":"value"}`)))
|
req, _ := http.NewRequest("POST", "/api/test", bytes.NewBuffer([]byte(`{"key":"value"}`)))
|
||||||
req.RemoteAddr = "127.0.0.1:8080"
|
req.RemoteAddr = "127.0.0.1:8080"
|
||||||
|
|||||||
@@ -58,7 +58,7 @@ const (
|
|||||||
ContextKeyParentID ContextKey = "parent_id"
|
ContextKeyParentID ContextKey = "parent_id"
|
||||||
|
|
||||||
// ContextKeyRemoteID is the context key for the caller who is calling the bap/bpp
|
// ContextKeyRemoteID is the context key for the caller who is calling the bap/bpp
|
||||||
ContextKeyRemoteID ContextKey = "caller_id"
|
ContextKeyRemoteID ContextKey = "remote_id"
|
||||||
)
|
)
|
||||||
|
|
||||||
var contextKeys = map[string]ContextKey{
|
var contextKeys = map[string]ContextKey{
|
||||||
@@ -67,7 +67,7 @@ var contextKeys = map[string]ContextKey{
|
|||||||
"subscriber_id": ContextKeySubscriberID,
|
"subscriber_id": ContextKeySubscriberID,
|
||||||
"module_id": ContextKeyModuleID,
|
"module_id": ContextKeyModuleID,
|
||||||
"parent_id": ContextKeyParentID,
|
"parent_id": ContextKeyParentID,
|
||||||
"caller_id": ContextKeyRemoteID,
|
"remote_id": ContextKeyRemoteID,
|
||||||
}
|
}
|
||||||
|
|
||||||
// ParseContextKey converts a string into a valid ContextKey.
|
// ParseContextKey converts a string into a valid ContextKey.
|
||||||
|
|||||||
22
pkg/plugin/implementation/cache/cache.go
vendored
22
pkg/plugin/implementation/cache/cache.go
vendored
@@ -114,22 +114,25 @@ func New(ctx context.Context, cfg *Config) (*Cache, func() error, error) {
|
|||||||
|
|
||||||
// Get retrieves the value for the specified key from Redis.
|
// Get retrieves the value for the specified key from Redis.
|
||||||
func (c *Cache) Get(ctx context.Context, key string) (string, error) {
|
func (c *Cache) Get(ctx context.Context, key string) (string, error) {
|
||||||
result, err := c.Client.Get(ctx, key).Result()
|
tracer := otel.Tracer(telemetry.ScopeName, trace.WithInstrumentationVersion(telemetry.ScopeVersion))
|
||||||
|
spanCtx, span := tracer.Start(ctx, "redis_get")
|
||||||
|
defer span.End()
|
||||||
|
result, err := c.Client.Get(spanCtx, key).Result()
|
||||||
if c.metrics != nil {
|
if c.metrics != nil {
|
||||||
attrs := []attribute.KeyValue{
|
attrs := []attribute.KeyValue{
|
||||||
telemetry.AttrOperation.String("get"),
|
telemetry.AttrOperation.String("get"),
|
||||||
}
|
}
|
||||||
switch {
|
switch {
|
||||||
case err == redis.Nil:
|
case err == redis.Nil:
|
||||||
c.metrics.CacheMissesTotal.Add(ctx, 1, metric.WithAttributes(attrs...))
|
c.metrics.CacheMissesTotal.Add(spanCtx, 1, metric.WithAttributes(attrs...))
|
||||||
c.metrics.CacheOperationsTotal.Add(ctx, 1,
|
c.metrics.CacheOperationsTotal.Add(spanCtx, 1,
|
||||||
metric.WithAttributes(append(attrs, telemetry.AttrStatus.String("miss"))...))
|
metric.WithAttributes(append(attrs, telemetry.AttrStatus.String("miss"))...))
|
||||||
case err != nil:
|
case err != nil:
|
||||||
c.metrics.CacheOperationsTotal.Add(ctx, 1,
|
c.metrics.CacheOperationsTotal.Add(spanCtx, 1,
|
||||||
metric.WithAttributes(append(attrs, telemetry.AttrStatus.String("error"))...))
|
metric.WithAttributes(append(attrs, telemetry.AttrStatus.String("error"))...))
|
||||||
default:
|
default:
|
||||||
c.metrics.CacheHitsTotal.Add(ctx, 1, metric.WithAttributes(attrs...))
|
c.metrics.CacheHitsTotal.Add(spanCtx, 1, metric.WithAttributes(attrs...))
|
||||||
c.metrics.CacheOperationsTotal.Add(ctx, 1,
|
c.metrics.CacheOperationsTotal.Add(spanCtx, 1,
|
||||||
metric.WithAttributes(append(attrs, telemetry.AttrStatus.String("hit"))...))
|
metric.WithAttributes(append(attrs, telemetry.AttrStatus.String("hit"))...))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -149,8 +152,11 @@ func (c *Cache) Set(ctx context.Context, key, value string, ttl time.Duration) e
|
|||||||
|
|
||||||
// Delete removes the specified key from Redis.
|
// Delete removes the specified key from Redis.
|
||||||
func (c *Cache) Delete(ctx context.Context, key string) error {
|
func (c *Cache) Delete(ctx context.Context, key string) error {
|
||||||
err := c.Client.Del(ctx, key).Err()
|
tracer := otel.Tracer(telemetry.ScopeName, trace.WithInstrumentationVersion(telemetry.ScopeVersion))
|
||||||
c.recordOperation(ctx, "delete", err)
|
spanCtx, span := tracer.Start(ctx, "redis_delete")
|
||||||
|
defer span.End()
|
||||||
|
err := c.Client.Del(spanCtx, key).Err()
|
||||||
|
c.recordOperation(spanCtx, "delete", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ type Config struct {
|
|||||||
EnableLogs bool `yaml:"enableLogs"`
|
EnableLogs bool `yaml:"enableLogs"`
|
||||||
OtlpEndpoint string `yaml:"otlpEndpoint"`
|
OtlpEndpoint string `yaml:"otlpEndpoint"`
|
||||||
TimeInterval int64 `yaml:"timeInterval"`
|
TimeInterval int64 `yaml:"timeInterval"`
|
||||||
|
AuditFieldsConfig string `yaml:"auditFieldsConfig"`
|
||||||
Producer string `yaml:"producer"`
|
Producer string `yaml:"producer"`
|
||||||
ProducerType string `yaml:"producerType"`
|
ProducerType string `yaml:"producerType"`
|
||||||
}
|
}
|
||||||
@@ -65,10 +66,14 @@ func ToPluginConfig(cfg *Config) *plugin.Config {
|
|||||||
"serviceName": cfg.ServiceName,
|
"serviceName": cfg.ServiceName,
|
||||||
"serviceVersion": cfg.ServiceVersion,
|
"serviceVersion": cfg.ServiceVersion,
|
||||||
"environment": cfg.Environment,
|
"environment": cfg.Environment,
|
||||||
|
"domain": cfg.Domain,
|
||||||
"enableMetrics": fmt.Sprintf("%t", cfg.EnableMetrics),
|
"enableMetrics": fmt.Sprintf("%t", cfg.EnableMetrics),
|
||||||
"enableTracing": fmt.Sprintf("%t", cfg.EnableTracing),
|
"enableTracing": fmt.Sprintf("%t", cfg.EnableTracing),
|
||||||
"otelEndpoint": cfg.OtlpEndpoint,
|
"enableLogs": fmt.Sprintf("%t", cfg.EnableLogs),
|
||||||
|
"otlpEndpoint": cfg.OtlpEndpoint,
|
||||||
"deviceID": cfg.DeviceID,
|
"deviceID": cfg.DeviceID,
|
||||||
|
"timeInterval": fmt.Sprintf("%d", cfg.TimeInterval),
|
||||||
|
"auditFieldsConfig": cfg.AuditFieldsConfig,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -101,16 +106,13 @@ func (Setup) New(ctx context.Context, cfg *Config) (*telemetry.Provider, error)
|
|||||||
cfg.TimeInterval = DefaultConfig().TimeInterval
|
cfg.TimeInterval = DefaultConfig().TimeInterval
|
||||||
}
|
}
|
||||||
|
|
||||||
if !cfg.EnableMetrics && !cfg.EnableTracing {
|
if !cfg.EnableMetrics && !cfg.EnableTracing && !cfg.EnableLogs {
|
||||||
log.Info(ctx, "OpenTelemetry metrics and tracing are disabled")
|
log.Info(ctx, "OpenTelemetry metrics, tracing, and logs are all disabled")
|
||||||
return &telemetry.Provider{
|
return &telemetry.Provider{
|
||||||
Shutdown: func(context.Context) error { return nil },
|
Shutdown: func(context.Context) error { return nil },
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
//this will be used by both metric and traces
|
|
||||||
|
|
||||||
// to build resource with envelope metadata
|
|
||||||
baseAttrs := []attribute.KeyValue{
|
baseAttrs := []attribute.KeyValue{
|
||||||
attribute.String("service.name", cfg.ServiceName),
|
attribute.String("service.name", cfg.ServiceName),
|
||||||
attribute.String("service.version", cfg.ServiceVersion),
|
attribute.String("service.version", cfg.ServiceVersion),
|
||||||
@@ -121,53 +123,49 @@ func (Setup) New(ctx context.Context, cfg *Config) (*telemetry.Provider, error)
|
|||||||
attribute.String("producer", cfg.Producer),
|
attribute.String("producer", cfg.Producer),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var meterProvider *metric.MeterProvider
|
||||||
|
if cfg.EnableMetrics {
|
||||||
resMetric, err := resource.New(ctx, resource.WithAttributes(buildAtts(baseAttrs, "METRIC")...))
|
resMetric, err := resource.New(ctx, resource.WithAttributes(buildAtts(baseAttrs, "METRIC")...))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create telemetry resource for metric: %w", err)
|
return nil, fmt.Errorf("failed to create telemetry resource for metric: %w", err)
|
||||||
}
|
}
|
||||||
|
metricExporter, err := otlpmetricgrpc.New(ctx, otlpmetricgrpc.WithEndpoint(cfg.OtlpEndpoint),
|
||||||
//OTLP metric
|
|
||||||
var meterProvider *metric.MeterProvider
|
|
||||||
if cfg.EnableMetrics {
|
|
||||||
metricExpoter, err := otlpmetricgrpc.New(ctx, otlpmetricgrpc.WithEndpoint(cfg.OtlpEndpoint),
|
|
||||||
otlpmetricgrpc.WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())))
|
otlpmetricgrpc.WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create OTLP metric exporter: %w", err)
|
return nil, fmt.Errorf("failed to create OTLP metric exporter: %w", err)
|
||||||
}
|
}
|
||||||
reader := metric.NewPeriodicReader(metricExpoter, metric.WithInterval(time.Second*time.Duration(cfg.TimeInterval)))
|
reader := metric.NewPeriodicReader(metricExporter, metric.WithInterval(time.Second*time.Duration(cfg.TimeInterval)))
|
||||||
meterProvider = metric.NewMeterProvider(metric.WithReader(reader), metric.WithResource(resMetric))
|
meterProvider = metric.NewMeterProvider(metric.WithReader(reader), metric.WithResource(resMetric))
|
||||||
otel.SetMeterProvider(meterProvider)
|
otel.SetMeterProvider(meterProvider)
|
||||||
log.Infof(ctx, "OpenTelemetry metrics initialized for service=%s version=%s env=%s (OTLP endpoint=%s)",
|
log.Infof(ctx, "OpenTelemetry metrics initialized for service=%s version=%s env=%s (OTLP endpoint=%s)",
|
||||||
cfg.ServiceName, cfg.ServiceVersion, cfg.Environment, cfg.OtlpEndpoint)
|
cfg.ServiceName, cfg.ServiceVersion, cfg.Environment, cfg.OtlpEndpoint)
|
||||||
// for the go runtime metrics
|
|
||||||
if err := runtime.Start(runtime.WithMinimumReadMemStatsInterval(runtime.DefaultMinimumReadMemStatsInterval)); err != nil {
|
if err := runtime.Start(runtime.WithMinimumReadMemStatsInterval(runtime.DefaultMinimumReadMemStatsInterval)); err != nil {
|
||||||
log.Warnf(ctx, "Failed to start Go runtime instrumentation: %v", err)
|
log.Warnf(ctx, "Failed to start Go runtime instrumentation: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//OTLP traces
|
var traceProvider *trace.TracerProvider
|
||||||
restrace, err := resource.New(ctx, resource.WithAttributes(buildAtts(baseAttrs, "API")...))
|
if cfg.EnableTracing {
|
||||||
|
resTrace, err := resource.New(ctx, resource.WithAttributes(buildAtts(baseAttrs, "API")...))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create trace resource: %w", err)
|
return nil, fmt.Errorf("failed to create trace resource: %w", err)
|
||||||
}
|
}
|
||||||
var traceProvider *trace.TracerProvider
|
traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithEndpoint(cfg.OtlpEndpoint), otlptracegrpc.WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())))
|
||||||
if cfg.EnableTracing {
|
|
||||||
traceExpoter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithEndpoint(cfg.OtlpEndpoint), otlptracegrpc.WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create OTLP trace exporter: %w", err)
|
return nil, fmt.Errorf("failed to create OTLP trace exporter: %w", err)
|
||||||
}
|
}
|
||||||
traceProvider = trace.NewTracerProvider(trace.WithBatcher(traceExpoter), trace.WithResource(restrace)) //TODO: need to add the trace sampleing rate
|
traceProvider = trace.NewTracerProvider(trace.WithBatcher(traceExporter), trace.WithResource(resTrace))
|
||||||
otel.SetTracerProvider(traceProvider)
|
otel.SetTracerProvider(traceProvider)
|
||||||
log.Infof(ctx, "OpenTelemetry tracing initialized for service=%s (OTLP endpoint=%s)",
|
log.Infof(ctx, "OpenTelemetry tracing initialized for service=%s (OTLP endpoint=%s)",
|
||||||
cfg.ServiceName, cfg.OtlpEndpoint)
|
cfg.ServiceName, cfg.OtlpEndpoint)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var logProvider *logsdk.LoggerProvider
|
||||||
|
if cfg.EnableLogs {
|
||||||
resAudit, err := resource.New(ctx, resource.WithAttributes(buildAtts(baseAttrs, "AUDIT")...))
|
resAudit, err := resource.New(ctx, resource.WithAttributes(buildAtts(baseAttrs, "AUDIT")...))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create audit resource: %w", err)
|
return nil, fmt.Errorf("failed to create audit resource: %w", err)
|
||||||
}
|
}
|
||||||
var logProvider *logsdk.LoggerProvider
|
|
||||||
if cfg.EnableLogs {
|
|
||||||
logExporter, err := otlploggrpc.New(ctx, otlploggrpc.WithEndpoint(cfg.OtlpEndpoint), otlploggrpc.WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())))
|
logExporter, err := otlploggrpc.New(ctx, otlploggrpc.WithEndpoint(cfg.OtlpEndpoint), otlploggrpc.WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create OTLP logs exporter: %w", err)
|
return nil, fmt.Errorf("failed to create OTLP logs exporter: %w", err)
|
||||||
|
|||||||
@@ -169,6 +169,7 @@ func TestToPluginConfig_Success(t *testing.T) {
|
|||||||
ServiceVersion: "1.0.0",
|
ServiceVersion: "1.0.0",
|
||||||
EnableMetrics: true,
|
EnableMetrics: true,
|
||||||
EnableTracing: true,
|
EnableTracing: true,
|
||||||
|
EnableLogs: true,
|
||||||
Environment: "test",
|
Environment: "test",
|
||||||
Domain: "test-domain",
|
Domain: "test-domain",
|
||||||
DeviceID: "test-device",
|
DeviceID: "test-device",
|
||||||
@@ -180,10 +181,14 @@ func TestToPluginConfig_Success(t *testing.T) {
|
|||||||
"serviceName": "test-service",
|
"serviceName": "test-service",
|
||||||
"serviceVersion": "1.0.0",
|
"serviceVersion": "1.0.0",
|
||||||
"environment": "test",
|
"environment": "test",
|
||||||
|
"domain": "test-domain",
|
||||||
"enableMetrics": "true",
|
"enableMetrics": "true",
|
||||||
"enableTracing": "true",
|
"enableTracing": "true",
|
||||||
"otelEndpoint": "localhost:4317",
|
"enableLogs": "true",
|
||||||
|
"otlpEndpoint": "localhost:4317",
|
||||||
"deviceID": "test-device",
|
"deviceID": "test-device",
|
||||||
|
"timeInterval": "5",
|
||||||
|
"auditFieldsConfig": "",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -200,10 +205,14 @@ func TestToPluginConfig_Success(t *testing.T) {
|
|||||||
"serviceName": "my-service",
|
"serviceName": "my-service",
|
||||||
"serviceVersion": "2.0.0",
|
"serviceVersion": "2.0.0",
|
||||||
"environment": "production",
|
"environment": "production",
|
||||||
|
"domain": "",
|
||||||
"enableMetrics": "false",
|
"enableMetrics": "false",
|
||||||
"enableTracing": "false",
|
"enableTracing": "false",
|
||||||
"otelEndpoint": "",
|
"enableLogs": "false",
|
||||||
|
"otlpEndpoint": "",
|
||||||
"deviceID": "",
|
"deviceID": "",
|
||||||
|
"timeInterval": "0",
|
||||||
|
"auditFieldsConfig": "",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -223,10 +232,14 @@ func TestToPluginConfig_Success(t *testing.T) {
|
|||||||
"serviceName": "",
|
"serviceName": "",
|
||||||
"serviceVersion": "",
|
"serviceVersion": "",
|
||||||
"environment": "",
|
"environment": "",
|
||||||
|
"domain": "",
|
||||||
"enableMetrics": "true",
|
"enableMetrics": "true",
|
||||||
"enableTracing": "false",
|
"enableTracing": "false",
|
||||||
"otelEndpoint": "",
|
"enableLogs": "false",
|
||||||
|
"otlpEndpoint": "",
|
||||||
"deviceID": "",
|
"deviceID": "",
|
||||||
|
"timeInterval": "0",
|
||||||
|
"auditFieldsConfig": "",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -298,7 +311,7 @@ func TestToPluginConfig_BooleanConversion(t *testing.T) {
|
|||||||
require.NotNil(t, result)
|
require.NotNil(t, result)
|
||||||
assert.Equal(t, tt.expectedMetric, result.Config["enableMetrics"], "enableMetrics should be converted to string correctly")
|
assert.Equal(t, tt.expectedMetric, result.Config["enableMetrics"], "enableMetrics should be converted to string correctly")
|
||||||
assert.Equal(t, tt.expectedTrace, result.Config["enableTracing"], "enableTracing should be converted to string correctly")
|
assert.Equal(t, tt.expectedTrace, result.Config["enableTracing"], "enableTracing should be converted to string correctly")
|
||||||
assert.Equal(t, "localhost:4317", result.Config["otelEndpoint"], "otelEndpoint should be included")
|
assert.Equal(t, "localhost:4317", result.Config["otlpEndpoint"], "otlpEndpoint should be included")
|
||||||
assert.Equal(t, "test-device", result.Config["deviceID"], "deviceID should be included")
|
assert.Equal(t, "test-device", result.Config["deviceID"], "deviceID should be included")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -23,8 +23,6 @@ func EmitAuditLogs(ctx context.Context, body []byte, attrs ...log.KeyValue) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
//maskedBody := MaskPIIInAuditBody(body)
|
|
||||||
|
|
||||||
sum := sha256.Sum256(body)
|
sum := sha256.Sum256(body)
|
||||||
auditBody := selectAuditPayload(ctx, body)
|
auditBody := selectAuditPayload(ctx, body)
|
||||||
auditlog := provider.Logger(auditLoggerName)
|
auditlog := provider.Logger(auditLoggerName)
|
||||||
|
|||||||
@@ -57,8 +57,8 @@ var (
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
networkMetricsCfgMu sync.RWMutex
|
networkMetricsCfgMu sync.RWMutex
|
||||||
networkMetricsGranularity = "10mim" // default
|
networkMetricsGranularity = "10min" // default
|
||||||
networkMetricsFrequency = "10mim" // default
|
networkMetricsFrequency = "10min" // default
|
||||||
)
|
)
|
||||||
|
|
||||||
func SetNetworkMetricsConfig(granularity, frequency string) {
|
func SetNetworkMetricsConfig(granularity, frequency string) {
|
||||||
|
|||||||
Reference in New Issue
Block a user