add the feature for observability
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
||||
"net/http/httputil"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/log"
|
||||
"github.com/beckn-one/beckn-onix/pkg/metrics"
|
||||
"github.com/beckn-one/beckn-onix/pkg/model"
|
||||
"github.com/beckn-one/beckn-onix/pkg/plugin"
|
||||
"github.com/beckn-one/beckn-onix/pkg/plugin/definition"
|
||||
@@ -50,7 +51,11 @@ func newHTTPClient(cfg *HttpClientConfig) *http.Client {
|
||||
if cfg.ResponseHeaderTimeout > 0 {
|
||||
transport.ResponseHeaderTimeout = cfg.ResponseHeaderTimeout
|
||||
}
|
||||
return &http.Client{Transport: transport}
|
||||
|
||||
// Wrap transport with metrics tracking for outbound requests
|
||||
wrappedTransport := metrics.WrapHTTPTransport(transport)
|
||||
|
||||
return &http.Client{Transport: wrappedTransport}
|
||||
}
|
||||
|
||||
// NewStdHandler initializes a new processor with plugins and steps.
|
||||
@@ -74,6 +79,13 @@ func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Ha
|
||||
|
||||
// ServeHTTP processes an incoming HTTP request and executes defined processing steps.
|
||||
func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// Track inbound request
|
||||
host := r.Host
|
||||
if host == "" {
|
||||
host = r.URL.Host
|
||||
}
|
||||
metrics.RecordInboundRequest(r.Context(), host)
|
||||
|
||||
ctx, err := h.stepCtx(r, w.Header())
|
||||
if err != nil {
|
||||
log.Errorf(r.Context(), err, "stepCtx(r):%v", err)
|
||||
@@ -82,14 +94,35 @@ func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
log.Request(r.Context(), r, ctx.Body)
|
||||
|
||||
// Track validation steps
|
||||
signValidated := false
|
||||
schemaValidated := false
|
||||
|
||||
// Execute processing steps.
|
||||
for _, step := range h.steps {
|
||||
stepName := fmt.Sprintf("%T", step)
|
||||
// Check if this is a validation step
|
||||
if stepName == "*step.validateSignStep" {
|
||||
signValidated = true
|
||||
}
|
||||
if stepName == "*step.validateSchemaStep" {
|
||||
schemaValidated = true
|
||||
}
|
||||
|
||||
if err := step.Run(ctx); err != nil {
|
||||
log.Errorf(ctx, err, "%T.run(%v):%v", step, ctx, err)
|
||||
response.SendNack(ctx, w, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Record validation metrics after successful execution
|
||||
if signValidated {
|
||||
metrics.RecordInboundSignValidation(ctx, host)
|
||||
}
|
||||
if schemaValidated {
|
||||
metrics.RecordInboundSchemaValidation(ctx, host)
|
||||
}
|
||||
// Restore request body before forwarding or publishing.
|
||||
r.Body = io.NopCloser(bytes.NewReader(ctx.Body))
|
||||
if ctx.Route == nil {
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/beckn-one/beckn-onix/core/module/handler"
|
||||
"github.com/beckn-one/beckn-onix/pkg/log"
|
||||
"github.com/beckn-one/beckn-onix/pkg/metrics"
|
||||
"github.com/beckn-one/beckn-onix/pkg/model"
|
||||
)
|
||||
|
||||
@@ -29,7 +30,7 @@ var handlerProviders = map[handler.Type]Provider{
|
||||
// It iterates over the module configurations, retrieves appropriate handler providers,
|
||||
// and registers the handlers with the HTTP multiplexer.
|
||||
func Register(ctx context.Context, mCfgs []Config, mux *http.ServeMux, mgr handler.PluginManager) error {
|
||||
mux.Handle("/health", http.HandlerFunc(handler.HealthHandler))
|
||||
mux.Handle("/health", metrics.HTTPMiddleware(http.HandlerFunc(handler.HealthHandler), "/health"))
|
||||
|
||||
log.Debugf(ctx, "Registering modules with config: %#v", mCfgs)
|
||||
// Iterate over the handlers in the configuration.
|
||||
@@ -48,6 +49,8 @@ func Register(ctx context.Context, mCfgs []Config, mux *http.ServeMux, mgr handl
|
||||
|
||||
}
|
||||
h = moduleCtxMiddleware(c.Name, h)
|
||||
// Wrap handler with metrics middleware.
|
||||
h = metrics.HTTPMiddleware(h, c.Path)
|
||||
log.Debugf(ctx, "Registering handler %s, of type %s @ %s", c.Name, c.Handler.Type, c.Path)
|
||||
mux.Handle(c.Path, h)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user