Files
onix/core/module/handler/stdHandler.go
2026-02-27 19:05:09 +05:30

421 lines
13 KiB
Go

package handler
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/http/httputil"
"strconv"
"strings"
"time"
"github.com/beckn-one/beckn-onix/pkg/log"
"github.com/beckn-one/beckn-onix/pkg/model"
"github.com/beckn-one/beckn-onix/pkg/plugin"
"github.com/beckn-one/beckn-onix/pkg/plugin/definition"
"github.com/beckn-one/beckn-onix/pkg/response"
"github.com/beckn-one/beckn-onix/pkg/telemetry"
"github.com/google/uuid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
auditlog "go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
// stdHandler orchestrates the execution of defined processing steps.
type stdHandler struct {
signer definition.Signer
steps []definition.Step
signValidator definition.SignValidator
cache definition.Cache
registry definition.RegistryLookup
km definition.KeyManager
schemaValidator definition.SchemaValidator
router definition.Router
publisher definition.Publisher
transportWrapper definition.TransportWrapper
SubscriberID string
role model.Role
httpClient *http.Client
moduleName string
}
// newHTTPClient creates a new HTTP client with a custom transport configuration.
func newHTTPClient(cfg *HttpClientConfig, wrapper definition.TransportWrapper) *http.Client {
// Clone the default transport to inherit its sensible defaults.
transport := http.DefaultTransport.(*http.Transport).Clone()
// Only override the defaults if a value is explicitly provided in the config.
// A zero value in the config means we stick with the default values.
if cfg.MaxIdleConns > 0 {
transport.MaxIdleConns = cfg.MaxIdleConns
}
if cfg.MaxIdleConnsPerHost > 0 {
transport.MaxIdleConnsPerHost = cfg.MaxIdleConnsPerHost
}
if cfg.IdleConnTimeout > 0 {
transport.IdleConnTimeout = cfg.IdleConnTimeout
}
if cfg.ResponseHeaderTimeout > 0 {
transport.ResponseHeaderTimeout = cfg.ResponseHeaderTimeout
}
var finalTransport http.RoundTripper = transport
if wrapper != nil {
log.Debugf(context.Background(), "Applying custom transport wrapper")
finalTransport = wrapper.Wrap(transport)
}
return &http.Client{Transport: finalTransport}
}
// NewStdHandler initializes a new processor with plugins and steps.
func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config, moduleName string) (http.Handler, error) {
h := &stdHandler{
steps: []definition.Step{},
SubscriberID: cfg.SubscriberID,
role: cfg.Role,
moduleName: moduleName,
}
// Initialize plugins.
if err := h.initPlugins(ctx, mgr, &cfg.Plugins); err != nil {
return nil, fmt.Errorf("failed to initialize plugins: %w", err)
}
// Initialize HTTP client after plugins so transport wrapper can be applied.
h.httpClient = newHTTPClient(&cfg.HttpClientConfig, h.transportWrapper)
// Initialize steps.
if err := h.initSteps(ctx, mgr, cfg); err != nil {
return nil, fmt.Errorf("failed to initialize steps: %w", err)
}
return h, nil
}
// ServeHTTP processes an incoming HTTP request and executes defined processing steps.
func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Header.Set("X-Module-Name", h.moduleName)
r.Header.Set("X-Role", string(h.role))
// These headers are only needed for internal instrumentation; avoid leaking them downstream.
// Use defer to ensure cleanup regardless of return path.
defer func() {
r.Header.Del("X-Module-Name")
r.Header.Del("X-Role")
}()
// to start a new trace
propagator := otel.GetTextMapPropagator()
traceCtx := propagator.Extract(r.Context(), propagation.HeaderCarrier(r.Header))
tracer := otel.Tracer(telemetry.ScopeName, trace.WithInstrumentationVersion(telemetry.ScopeVersion))
spanName := r.URL.Path
traceCtx, span := tracer.Start(traceCtx, spanName, trace.WithSpanKind(trace.SpanKindServer))
//to build the request with trace
r = r.WithContext(traceCtx)
var recordOnce func()
wrapped := &responseRecorder{
ResponseWriter: w,
statusCode: http.StatusOK,
record: nil,
}
senderID, receiverID := h.resolveDirection(r.Context())
httpMeter, _ := GetHTTPMetrics(r.Context())
if httpMeter != nil {
recordOnce = func() {
RecordHTTPRequest(r.Context(), wrapped.statusCode, r.URL.Path, string(h.role), senderID, receiverID)
}
wrapped.record = recordOnce
}
// set beckn attribute
setBecknAttr(span, r, h)
stepCtx, err := h.stepCtx(r, w.Header())
if err != nil {
log.Errorf(r.Context(), err, "stepCtx(r):%v", err)
response.SendNack(r.Context(), wrapped, err)
return
}
log.Request(r.Context(), r, stepCtx.Body)
defer func() {
span.SetAttributes(attribute.Int("http.response.status_code", wrapped.statusCode), attribute.String("observedTimeUnixNano", strconv.FormatInt(time.Now().UnixNano(), 10)))
if wrapped.statusCode < 200 || wrapped.statusCode >= 400 {
span.SetStatus(codes.Error, "status code is invalid")
}
body := stepCtx.Body
telemetry.EmitAuditLogs(r.Context(), body, auditlog.Int("http.response.status_code", wrapped.statusCode), auditlog.String("http.response.error", errString(err)))
span.End()
}()
// Execute processing steps.
for _, step := range h.steps {
if err := step.Run(stepCtx); err != nil {
log.Errorf(stepCtx, err, "%T.run():%v", step, err)
response.SendNack(stepCtx, wrapped, err)
return
}
}
// Restore request body before forwarding or publishing.
r.Body = io.NopCloser(bytes.NewReader(stepCtx.Body))
if stepCtx.Route == nil {
response.SendAck(wrapped)
return
}
// Handle routing based on the defined route type.
route(stepCtx, r, wrapped, h.publisher, h.httpClient)
}
// stepCtx creates a new StepContext for processing an HTTP request.
func (h *stdHandler) stepCtx(r *http.Request, rh http.Header) (*model.StepContext, error) {
var bodyBuffer bytes.Buffer
if _, err := io.Copy(&bodyBuffer, r.Body); err != nil {
return nil, model.NewBadReqErr(err)
}
r.Body.Close()
subID := h.subID(r.Context())
return &model.StepContext{
Context: r.Context(),
Request: r,
Body: bodyBuffer.Bytes(),
Role: h.role,
SubID: subID,
RespHeader: rh,
}, nil
}
// subID retrieves the subscriber ID from the request context.
func (h *stdHandler) subID(ctx context.Context) string {
rSubID, ok := ctx.Value(model.ContextKeySubscriberID).(string)
if ok {
return rSubID
}
return h.SubscriberID
}
var proxyFunc = proxy
// route handles request forwarding or message publishing based on the routing type.
func route(ctx *model.StepContext, r *http.Request, w http.ResponseWriter, pb definition.Publisher, httpClient *http.Client) {
log.Debugf(ctx, "Routing to ctx.Route to %#v", ctx.Route)
switch ctx.Route.TargetType {
case "url":
log.Infof(ctx.Context, "Forwarding request to URL: %s", ctx.Route.URL)
proxyFunc(ctx, r, w, httpClient)
return
case "publisher":
if pb == nil {
err := fmt.Errorf("publisher plugin not configured")
log.Errorf(ctx.Context, err, "Invalid configuration:%v", err)
response.SendNack(ctx, w, err)
return
}
log.Infof(ctx.Context, "Publishing message to: %s", ctx.Route.PublisherID)
if err := pb.Publish(ctx, ctx.Route.PublisherID, ctx.Body); err != nil {
log.Errorf(ctx.Context, err, "Failed to publish message")
http.Error(w, "Error publishing message", http.StatusInternalServerError)
response.SendNack(ctx, w, err)
return
}
default:
err := fmt.Errorf("unknown route type: %s", ctx.Route.TargetType)
log.Errorf(ctx.Context, err, "Invalid configuration:%v", err)
response.SendNack(ctx, w, err)
return
}
response.SendAck(w)
}
func proxy(ctx *model.StepContext, r *http.Request, w http.ResponseWriter, httpClient *http.Client) {
target := ctx.Route.URL
r.Header.Set("X-Forwarded-Host", r.Host)
director := func(req *http.Request) {
req.URL = target
req.Host = target.Host
log.Request(req.Context(), req, ctx.Body)
}
proxy := &httputil.ReverseProxy{
Director: director,
Transport: httpClient.Transport,
}
proxy.ServeHTTP(w, r)
}
// loadPlugin is a generic function to load and validate plugins.
func loadPlugin[T any](ctx context.Context, name string, cfg *plugin.Config, mgrFunc func(context.Context, *plugin.Config) (T, error)) (T, error) {
var zero T
if cfg == nil {
log.Debugf(ctx, "Skipping %s plugin: not configured", name)
return zero, nil
}
plugin, err := mgrFunc(ctx, cfg)
if err != nil {
return zero, fmt.Errorf("failed to load %s plugin (%s): %w", name, cfg.ID, err)
}
log.Debugf(ctx, "Loaded %s plugin: %s", name, cfg.ID)
return plugin, nil
}
// loadKeyManager loads the KeyManager plugin using the provided PluginManager, cache, and registry.
func loadKeyManager(ctx context.Context, mgr PluginManager, cache definition.Cache, registry definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error) {
if cfg == nil {
log.Debug(ctx, "Skipping KeyManager plugin: not configured")
return nil, nil
}
if cache == nil {
return nil, fmt.Errorf("failed to load KeyManager plugin (%s): Cache plugin not configured", cfg.ID)
}
if registry == nil {
return nil, fmt.Errorf("failed to load KeyManager plugin (%s): Registry plugin not configured", cfg.ID)
}
km, err := mgr.KeyManager(ctx, cache, registry, cfg)
if err != nil {
return nil, fmt.Errorf("failed to load KeyManager plugin (%s): %w", cfg.ID, err)
}
log.Debugf(ctx, "Loaded Keymanager plugin: %s", cfg.ID)
return km, nil
}
// initPlugins initializes required plugins for the processor.
func (h *stdHandler) initPlugins(ctx context.Context, mgr PluginManager, cfg *PluginCfg) error {
var err error
if h.cache, err = loadPlugin(ctx, "Cache", cfg.Cache, mgr.Cache); err != nil {
return err
}
if h.registry, err = loadPlugin(ctx, "Registry", cfg.Registry, mgr.Registry); err != nil {
return err
}
if h.km, err = loadKeyManager(ctx, mgr, h.cache, h.registry, cfg.KeyManager); err != nil {
return err
}
if h.signValidator, err = loadPlugin(ctx, "SignValidator", cfg.SignValidator, mgr.SignValidator); err != nil {
return err
}
if h.schemaValidator, err = loadPlugin(ctx, "SchemaValidator", cfg.SchemaValidator, mgr.SchemaValidator); err != nil {
return err
}
if h.router, err = loadPlugin(ctx, "Router", cfg.Router, mgr.Router); err != nil {
return err
}
if h.publisher, err = loadPlugin(ctx, "Publisher", cfg.Publisher, mgr.Publisher); err != nil {
return err
}
if h.signer, err = loadPlugin(ctx, "Signer", cfg.Signer, mgr.Signer); err != nil {
return err
}
if h.transportWrapper, err = loadPlugin(ctx, "TransportWrapper", cfg.TransportWrapper, mgr.TransportWrapper); err != nil {
return err
}
log.Debugf(ctx, "All required plugins successfully loaded for stdHandler")
return nil
}
// initSteps initializes and validates processing steps for the processor.
func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Config) error {
steps := make(map[string]definition.Step)
// Load plugin-based steps
for _, c := range cfg.Plugins.Steps {
step, err := mgr.Step(ctx, &c)
if err != nil {
return fmt.Errorf("failed to initialize plugin step %s: %w", c.ID, err)
}
steps[c.ID] = step
}
// Register processing steps
for _, step := range cfg.Steps {
var s definition.Step
var err error
switch step {
case "sign":
s, err = newSignStep(h.signer, h.km)
case "validateSign":
s, err = newValidateSignStep(h.signValidator, h.km)
case "validateSchema":
s, err = newValidateSchemaStep(h.schemaValidator)
case "addRoute":
s, err = newAddRouteStep(h.router)
default:
if customStep, exists := steps[step]; exists {
s = customStep
} else {
return fmt.Errorf("unrecognized step: %s", step)
}
}
if err != nil {
return err
}
instrumentedStep, wrapErr := NewInstrumentedStep(s, step, h.moduleName)
if wrapErr != nil {
log.Warnf(ctx, "Failed to instrument step %s: %v", step, wrapErr)
h.steps = append(h.steps, s)
continue
}
h.steps = append(h.steps, instrumentedStep)
}
log.Infof(ctx, "Processor steps initialized: %v", cfg.Steps)
return nil
}
func (h *stdHandler) resolveDirection(ctx context.Context) (senderID, receiverID string) {
selfID := h.SubscriberID
remoteID, _ := ctx.Value(model.ContextKeyRemoteID).(string)
if strings.Contains(h.moduleName, "Caller") {
return selfID, remoteID
}
return remoteID, selfID
}
func setBecknAttr(span trace.Span, r *http.Request, h *stdHandler) {
senderID, receiverID := h.resolveDirection(r.Context())
attrs := []attribute.KeyValue{
telemetry.AttrRecipientID.String(receiverID),
telemetry.AttrSenderID.String(senderID),
attribute.String("span_uuid", uuid.New().String()),
attribute.String("http.request.method", r.Method),
attribute.String("http.route", r.URL.Path),
}
if trxID, ok := r.Context().Value(model.ContextKeyTxnID).(string); ok {
attrs = append(attrs, attribute.String("transaction_id", trxID))
}
if mesID, ok := r.Context().Value(model.ContextKeyMsgID).(string); ok {
attrs = append(attrs, attribute.String("message_id", mesID))
}
if parentID, ok := r.Context().Value(model.ContextKeyParentID).(string); ok && parentID != "" {
attrs = append(attrs, attribute.String("parentSpanId", parentID))
}
if r.Host != "" {
attrs = append(attrs, attribute.String("server.address", r.Host))
}
if ua := r.UserAgent(); ua != "" {
attrs = append(attrs, attribute.String("user_agent.original", ua))
}
span.SetAttributes(attrs...)
}
func errString(e error) string {
if e == nil {
return ""
}
return e.Error()
}