make changes as per the doc
This commit is contained in:
@@ -9,11 +9,11 @@ import (
|
||||
"net/http/httputil"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/log"
|
||||
"github.com/beckn-one/beckn-onix/pkg/metrics"
|
||||
"github.com/beckn-one/beckn-onix/pkg/model"
|
||||
"github.com/beckn-one/beckn-onix/pkg/plugin"
|
||||
"github.com/beckn-one/beckn-onix/pkg/plugin/definition"
|
||||
"github.com/beckn-one/beckn-onix/pkg/response"
|
||||
"github.com/beckn-one/beckn-onix/pkg/telemetry"
|
||||
)
|
||||
|
||||
// stdHandler orchestrates the execution of defined processing steps.
|
||||
@@ -30,6 +30,7 @@ type stdHandler struct {
|
||||
SubscriberID string
|
||||
role model.Role
|
||||
httpClient *http.Client
|
||||
moduleName string
|
||||
}
|
||||
|
||||
// newHTTPClient creates a new HTTP client with a custom transport configuration.
|
||||
@@ -52,19 +53,17 @@ func newHTTPClient(cfg *HttpClientConfig) *http.Client {
|
||||
transport.ResponseHeaderTimeout = cfg.ResponseHeaderTimeout
|
||||
}
|
||||
|
||||
// Wrap transport with metrics tracking for outbound requests
|
||||
wrappedTransport := metrics.WrapHTTPTransport(transport)
|
||||
|
||||
return &http.Client{Transport: wrappedTransport}
|
||||
return &http.Client{Transport: transport}
|
||||
}
|
||||
|
||||
// NewStdHandler initializes a new processor with plugins and steps.
|
||||
func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Handler, error) {
|
||||
func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config, moduleName string) (http.Handler, error) {
|
||||
h := &stdHandler{
|
||||
steps: []definition.Step{},
|
||||
SubscriberID: cfg.SubscriberID,
|
||||
role: cfg.Role,
|
||||
httpClient: newHTTPClient(&cfg.HttpClientConfig),
|
||||
moduleName: moduleName,
|
||||
}
|
||||
// Initialize plugins.
|
||||
if err := h.initPlugins(ctx, mgr, &cfg.Plugins); err != nil {
|
||||
@@ -79,12 +78,8 @@ func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Ha
|
||||
|
||||
// ServeHTTP processes an incoming HTTP request and executes defined processing steps.
|
||||
func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// Track inbound request
|
||||
host := r.Host
|
||||
if host == "" {
|
||||
host = r.URL.Host
|
||||
}
|
||||
metrics.RecordInboundRequest(r.Context(), host)
|
||||
r.Header.Set("X-Module-Name", h.moduleName)
|
||||
r.Header.Set("X-Role", string(h.role))
|
||||
|
||||
ctx, err := h.stepCtx(r, w.Header())
|
||||
if err != nil {
|
||||
@@ -94,35 +89,14 @@ func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
log.Request(r.Context(), r, ctx.Body)
|
||||
|
||||
// Track validation steps
|
||||
signValidated := false
|
||||
schemaValidated := false
|
||||
|
||||
// Execute processing steps.
|
||||
for _, step := range h.steps {
|
||||
stepName := fmt.Sprintf("%T", step)
|
||||
// Check if this is a validation step
|
||||
if stepName == "*step.validateSignStep" {
|
||||
signValidated = true
|
||||
}
|
||||
if stepName == "*step.validateSchemaStep" {
|
||||
schemaValidated = true
|
||||
}
|
||||
|
||||
if err := step.Run(ctx); err != nil {
|
||||
log.Errorf(ctx, err, "%T.run(%v):%v", step, ctx, err)
|
||||
response.SendNack(ctx, w, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Record validation metrics after successful execution
|
||||
if signValidated {
|
||||
metrics.RecordInboundSignValidation(ctx, host)
|
||||
}
|
||||
if schemaValidated {
|
||||
metrics.RecordInboundSchemaValidation(ctx, host)
|
||||
}
|
||||
// Restore request body before forwarding or publishing.
|
||||
r.Body = io.NopCloser(bytes.NewReader(ctx.Body))
|
||||
if ctx.Route == nil {
|
||||
@@ -130,6 +104,10 @@ func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// These headers are only needed for internal instrumentation; avoid leaking them downstream.
|
||||
r.Header.Del("X-Module-Name")
|
||||
r.Header.Del("X-Role")
|
||||
|
||||
// Handle routing based on the defined route type.
|
||||
route(ctx, r, w, h.publisher, h.httpClient)
|
||||
}
|
||||
@@ -320,7 +298,13 @@ func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Conf
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
h.steps = append(h.steps, s)
|
||||
instrumentedStep, wrapErr := telemetry.NewInstrumentedStep(s, step, h.moduleName)
|
||||
if wrapErr != nil {
|
||||
log.Warnf(ctx, "Failed to instrument step %s: %v", step, wrapErr)
|
||||
h.steps = append(h.steps, s)
|
||||
continue
|
||||
}
|
||||
h.steps = append(h.steps, instrumentedStep)
|
||||
}
|
||||
log.Infof(ctx, "Processor steps initialized: %v", cfg.Steps)
|
||||
return nil
|
||||
|
||||
@@ -2,13 +2,17 @@ package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/log"
|
||||
"github.com/beckn-one/beckn-onix/pkg/model"
|
||||
"github.com/beckn-one/beckn-onix/pkg/plugin/definition"
|
||||
"github.com/beckn-one/beckn-onix/pkg/telemetry"
|
||||
)
|
||||
|
||||
// signStep represents the signing step in the processing pipeline.
|
||||
@@ -68,6 +72,7 @@ func (s *signStep) generateAuthHeader(subID, keyID string, createdAt, validTill
|
||||
type validateSignStep struct {
|
||||
validator definition.SignValidator
|
||||
km definition.KeyManager
|
||||
metrics *telemetry.Metrics
|
||||
}
|
||||
|
||||
// newValidateSignStep initializes and returns a new validate sign step.
|
||||
@@ -78,11 +83,22 @@ func newValidateSignStep(signValidator definition.SignValidator, km definition.K
|
||||
if km == nil {
|
||||
return nil, fmt.Errorf("invalid config: KeyManager plugin not configured")
|
||||
}
|
||||
return &validateSignStep{validator: signValidator, km: km}, nil
|
||||
metrics, _ := telemetry.GetMetrics(context.Background())
|
||||
return &validateSignStep{
|
||||
validator: signValidator,
|
||||
km: km,
|
||||
metrics: metrics,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run executes the validation step.
|
||||
func (s *validateSignStep) Run(ctx *model.StepContext) error {
|
||||
err := s.validateHeaders(ctx)
|
||||
s.recordMetrics(ctx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *validateSignStep) validateHeaders(ctx *model.StepContext) error {
|
||||
unauthHeader := fmt.Sprintf("Signature realm=\"%s\",headers=\"(created) (expires) digest\"", ctx.SubID)
|
||||
headerValue := ctx.Request.Header.Get(model.AuthHeaderGateway)
|
||||
if len(headerValue) != 0 {
|
||||
@@ -123,6 +139,18 @@ func (s *validateSignStep) validate(ctx *model.StepContext, value string) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *validateSignStep) recordMetrics(ctx *model.StepContext, err error) {
|
||||
if s.metrics == nil {
|
||||
return
|
||||
}
|
||||
status := "success"
|
||||
if err != nil {
|
||||
status = "failed"
|
||||
}
|
||||
s.metrics.SignatureValidationsTotal.Add(ctx.Context, 1,
|
||||
metric.WithAttributes(telemetry.AttrStatus.String(status)))
|
||||
}
|
||||
|
||||
// ParsedKeyID holds the components from the parsed Authorization header's keyId.
|
||||
type authHeader struct {
|
||||
SubscriberID string
|
||||
@@ -165,6 +193,7 @@ func parseHeader(header string) (*authHeader, error) {
|
||||
// validateSchemaStep represents the schema validation step.
|
||||
type validateSchemaStep struct {
|
||||
validator definition.SchemaValidator
|
||||
metrics *telemetry.Metrics
|
||||
}
|
||||
|
||||
// newValidateSchemaStep creates and returns the validateSchema step after validation.
|
||||
@@ -173,20 +202,43 @@ func newValidateSchemaStep(schemaValidator definition.SchemaValidator) (definiti
|
||||
return nil, fmt.Errorf("invalid config: SchemaValidator plugin not configured")
|
||||
}
|
||||
log.Debug(context.Background(), "adding schema validator")
|
||||
return &validateSchemaStep{validator: schemaValidator}, nil
|
||||
metrics, _ := telemetry.GetMetrics(context.Background())
|
||||
return &validateSchemaStep{
|
||||
validator: schemaValidator,
|
||||
metrics: metrics,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run executes the schema validation step.
|
||||
func (s *validateSchemaStep) Run(ctx *model.StepContext) error {
|
||||
if err := s.validator.Validate(ctx, ctx.Request.URL, ctx.Body); err != nil {
|
||||
return fmt.Errorf("schema validation failed: %w", err)
|
||||
err := s.validator.Validate(ctx, ctx.Request.URL, ctx.Body)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("schema validation failed: %w", err)
|
||||
}
|
||||
return nil
|
||||
s.recordMetrics(ctx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *validateSchemaStep) recordMetrics(ctx *model.StepContext, err error) {
|
||||
if s.metrics == nil {
|
||||
return
|
||||
}
|
||||
status := "success"
|
||||
if err != nil {
|
||||
status = "failed"
|
||||
}
|
||||
version := extractSchemaVersion(ctx.Body)
|
||||
s.metrics.SchemaValidationsTotal.Add(ctx.Context, 1,
|
||||
metric.WithAttributes(
|
||||
telemetry.AttrSchemaVersion.String(version),
|
||||
telemetry.AttrStatus.String(status),
|
||||
))
|
||||
}
|
||||
|
||||
// addRouteStep represents the route determination step.
|
||||
type addRouteStep struct {
|
||||
router definition.Router
|
||||
router definition.Router
|
||||
metrics *telemetry.Metrics
|
||||
}
|
||||
|
||||
// newAddRouteStep creates and returns the addRoute step after validation.
|
||||
@@ -194,7 +246,11 @@ func newAddRouteStep(router definition.Router) (definition.Step, error) {
|
||||
if router == nil {
|
||||
return nil, fmt.Errorf("invalid config: Router plugin not configured")
|
||||
}
|
||||
return &addRouteStep{router: router}, nil
|
||||
metrics, _ := telemetry.GetMetrics(context.Background())
|
||||
return &addRouteStep{
|
||||
router: router,
|
||||
metrics: metrics,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run executes the routing step.
|
||||
@@ -208,5 +264,31 @@ func (s *addRouteStep) Run(ctx *model.StepContext) error {
|
||||
PublisherID: route.PublisherID,
|
||||
URL: route.URL,
|
||||
}
|
||||
if s.metrics != nil && ctx.Route != nil {
|
||||
s.metrics.RoutingDecisionsTotal.Add(ctx.Context, 1,
|
||||
metric.WithAttributes(
|
||||
telemetry.AttrRouteType.String(ctx.Route.TargetType),
|
||||
telemetry.AttrTargetType.String(ctx.Route.TargetType),
|
||||
))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func extractSchemaVersion(body []byte) string {
|
||||
type contextEnvelope struct {
|
||||
Context struct {
|
||||
Version string `json:"version"`
|
||||
CoreVersion string `json:"core_version"`
|
||||
} `json:"context"`
|
||||
}
|
||||
var payload contextEnvelope
|
||||
if err := json.Unmarshal(body, &payload); err == nil {
|
||||
if payload.Context.CoreVersion != "" {
|
||||
return payload.Context.CoreVersion
|
||||
}
|
||||
if payload.Context.Version != "" {
|
||||
return payload.Context.Version
|
||||
}
|
||||
}
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
|
||||
"github.com/beckn-one/beckn-onix/core/module/handler"
|
||||
"github.com/beckn-one/beckn-onix/pkg/log"
|
||||
"github.com/beckn-one/beckn-onix/pkg/metrics"
|
||||
"github.com/beckn-one/beckn-onix/pkg/model"
|
||||
)
|
||||
|
||||
@@ -19,7 +18,7 @@ type Config struct {
|
||||
}
|
||||
|
||||
// Provider represents a function that initializes an HTTP handler using a PluginManager.
|
||||
type Provider func(ctx context.Context, mgr handler.PluginManager, cfg *handler.Config) (http.Handler, error)
|
||||
type Provider func(ctx context.Context, mgr handler.PluginManager, cfg *handler.Config, moduleName string) (http.Handler, error)
|
||||
|
||||
// handlerProviders maintains a mapping of handler types to their respective providers.
|
||||
var handlerProviders = map[handler.Type]Provider{
|
||||
@@ -30,8 +29,6 @@ var handlerProviders = map[handler.Type]Provider{
|
||||
// It iterates over the module configurations, retrieves appropriate handler providers,
|
||||
// and registers the handlers with the HTTP multiplexer.
|
||||
func Register(ctx context.Context, mCfgs []Config, mux *http.ServeMux, mgr handler.PluginManager) error {
|
||||
mux.Handle("/health", metrics.HTTPMiddleware(http.HandlerFunc(handler.HealthHandler), "/health"))
|
||||
|
||||
log.Debugf(ctx, "Registering modules with config: %#v", mCfgs)
|
||||
// Iterate over the handlers in the configuration.
|
||||
for _, c := range mCfgs {
|
||||
@@ -39,7 +36,7 @@ func Register(ctx context.Context, mCfgs []Config, mux *http.ServeMux, mgr handl
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid module : %s", c.Name)
|
||||
}
|
||||
h, err := rmp(ctx, mgr, &c.Handler)
|
||||
h, err := rmp(ctx, mgr, &c.Handler, c.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s : %w", c.Name, err)
|
||||
}
|
||||
@@ -49,8 +46,6 @@ func Register(ctx context.Context, mCfgs []Config, mux *http.ServeMux, mgr handl
|
||||
|
||||
}
|
||||
h = moduleCtxMiddleware(c.Name, h)
|
||||
// Wrap handler with metrics middleware.
|
||||
h = metrics.HTTPMiddleware(h, c.Path)
|
||||
log.Debugf(ctx, "Registering handler %s, of type %s @ %s", c.Name, c.Handler.Type, c.Path)
|
||||
mux.Handle(c.Path, h)
|
||||
}
|
||||
@@ -84,4 +79,4 @@ func moduleCtxMiddleware(moduleName string, next http.Handler) http.Handler {
|
||||
ctx := context.WithValue(r.Context(), model.ContextKeyModuleID, moduleName)
|
||||
next.ServeHTTP(w, r.WithContext(ctx))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,15 +123,6 @@ func TestRegisterSuccess(t *testing.T) {
|
||||
if capturedModuleName != "test-module" {
|
||||
t.Errorf("expected module_id in context to be 'test-module', got %v", capturedModuleName)
|
||||
}
|
||||
// Verifying /health endpoint registration
|
||||
reqHealth := httptest.NewRequest(http.MethodGet, "/health", nil)
|
||||
recHealth := httptest.NewRecorder()
|
||||
mux.ServeHTTP(recHealth, reqHealth)
|
||||
|
||||
if status := recHealth.Code; status != http.StatusOK {
|
||||
t.Errorf("handler for /health returned wrong status code: got %v want %v",
|
||||
status, http.StatusOK)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegisterFailure tests scenarios where the handler registration should fail.
|
||||
|
||||
Reference in New Issue
Block a user