updated code.
1. Resolved merge conflicts. 2. Resolved go linting issues.
This commit is contained in:
@@ -24,7 +24,7 @@ type Config struct {
|
||||
// RegisteryClient encapsulates the logic for calling the subscribe and lookup endpoints.
|
||||
type RegisteryClient struct {
|
||||
Config *Config
|
||||
Client *retryablehttp.Client // Retryable HTTP Client
|
||||
Client *retryablehttp.Client
|
||||
}
|
||||
|
||||
// NewRegisteryClient creates a new instance of Client.
|
||||
|
||||
@@ -2,7 +2,6 @@ package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/beckn/beckn-onix/pkg/model"
|
||||
@@ -10,7 +9,7 @@ import (
|
||||
"github.com/beckn/beckn-onix/pkg/plugin/definition"
|
||||
)
|
||||
|
||||
// PluginManager defines the methods required for managing plugins in stdHandler.
|
||||
// PluginManager defines an interface for managing plugins dynamically.
|
||||
type PluginManager interface {
|
||||
Middleware(ctx context.Context, cfg *plugin.Config) (func(http.Handler) http.Handler, error)
|
||||
SignValidator(ctx context.Context, cfg *plugin.Config) (definition.Verifier, error)
|
||||
@@ -53,45 +52,4 @@ type Config struct {
|
||||
RegistryURL string `yaml:"registryUrl"`
|
||||
Role model.Role
|
||||
SubscriberID string `yaml:"subscriberId"`
|
||||
Trace map[string]bool
|
||||
}
|
||||
|
||||
// Step represents a named processing step.
|
||||
type Step string
|
||||
|
||||
const (
|
||||
// StepInitialize represents the initialization phase of the request processing pipeline.
|
||||
StepInitialize Step = "initialize"
|
||||
|
||||
// StepValidate represents the validation phase, where input data is checked for correctness.
|
||||
StepValidate Step = "validate"
|
||||
|
||||
// StepProcess represents the core processing phase of the request.
|
||||
StepProcess Step = "process"
|
||||
|
||||
// StepFinalize represents the finalization phase, where the response is prepared and sent.
|
||||
StepFinalize Step = "finalize"
|
||||
)
|
||||
|
||||
// validSteps ensures only allowed step values are used.
|
||||
var validSteps = map[Step]bool{
|
||||
StepInitialize: true,
|
||||
StepValidate: true,
|
||||
StepProcess: true,
|
||||
StepFinalize: true,
|
||||
}
|
||||
|
||||
// UnmarshalYAML customizes YAML unmarshalling for Step to enforce valid values.
|
||||
func (s *Step) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
||||
var stepName string
|
||||
if err := unmarshal(&stepName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
step := Step(stepName)
|
||||
if !validSteps[step] {
|
||||
return fmt.Errorf("invalid step: %s", stepName)
|
||||
}
|
||||
*s = step
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -38,11 +38,11 @@ func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Ha
|
||||
SubscriberID: cfg.SubscriberID,
|
||||
role: cfg.Role,
|
||||
}
|
||||
// Initialize plugins
|
||||
// Initialize plugins.
|
||||
if err := h.initPlugins(ctx, mgr, &cfg.Plugins, cfg.RegistryURL); err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize plugins: %w", err)
|
||||
}
|
||||
// Initialize steps
|
||||
// Initialize steps.
|
||||
if err := h.initSteps(ctx, mgr, cfg); err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize steps: %w", err)
|
||||
}
|
||||
@@ -59,7 +59,7 @@ func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
log.Request(r.Context(), r, ctx.Body)
|
||||
|
||||
// Execute processing steps
|
||||
// Execute processing steps.
|
||||
for _, step := range h.steps {
|
||||
if err := step.Run(ctx); err != nil {
|
||||
log.Errorf(ctx, err, "%T.run(%v):%v", step, ctx, err)
|
||||
@@ -67,14 +67,14 @@ func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
}
|
||||
// Restore request body before forwarding or publishing
|
||||
// Restore request body before forwarding or publishing.
|
||||
r.Body = io.NopCloser(bytes.NewReader(ctx.Body))
|
||||
if ctx.Route == nil {
|
||||
response.SendAck(w)
|
||||
return
|
||||
}
|
||||
|
||||
// Handle routing based on the defined route type
|
||||
// Handle routing based on the defined route type.
|
||||
route(ctx, r, w, h.publisher)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -74,17 +73,17 @@ func (s *validateSignStep) Run(ctx *model.StepContext) error {
|
||||
if len(headerValue) != 0 {
|
||||
if err := s.validate(ctx, headerValue); err != nil {
|
||||
ctx.RespHeader.Set(model.UnaAuthorizedHeaderGateway, unauthHeader)
|
||||
return model.NewSignValidationErrf("failed to validate %s: %w", model.AuthHeaderGateway, err)
|
||||
return model.NewSignValidationErr(fmt.Errorf("failed to validate %s: %w", model.AuthHeaderGateway, err))
|
||||
}
|
||||
}
|
||||
headerValue = ctx.Request.Header.Get(model.AuthHeaderSubscriber)
|
||||
if len(headerValue) == 0 {
|
||||
ctx.RespHeader.Set(model.UnaAuthorizedHeaderSubscriber, unauthHeader)
|
||||
return model.NewSignValidationErrf("%s missing", model.UnaAuthorizedHeaderSubscriber)
|
||||
return model.NewSignValidationErr(fmt.Errorf("%s missing", model.UnaAuthorizedHeaderSubscriber))
|
||||
}
|
||||
if err := s.validate(ctx, headerValue); err != nil {
|
||||
ctx.RespHeader.Set(model.UnaAuthorizedHeaderSubscriber, unauthHeader)
|
||||
return model.NewSignValidationErrf("failed to validate %s: %w", model.AuthHeaderSubscriber, err)
|
||||
return model.NewSignValidationErr(fmt.Errorf("failed to validate %s: %w", model.AuthHeaderSubscriber, err))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -113,15 +112,6 @@ type validateSchemaStep struct {
|
||||
validator definition.SchemaValidator
|
||||
}
|
||||
|
||||
// newValidateSchemaStep creates and returns the validateSchema step after validation
|
||||
func newValidateSchemaStep(schemaValidator definition.SchemaValidator) (definition.Step, error) {
|
||||
if schemaValidator == nil {
|
||||
return nil, fmt.Errorf("invalid config: SchemaValidator plugin not configured")
|
||||
}
|
||||
log.Debug(context.Background(), "adding schema validator")
|
||||
return &validateSchemaStep{validator: schemaValidator}, nil
|
||||
}
|
||||
|
||||
// Run executes the schema validation step.
|
||||
func (s *validateSchemaStep) Run(ctx *model.StepContext) error {
|
||||
if err := s.validator.Validate(ctx, ctx.Request.URL, ctx.Body); err != nil {
|
||||
@@ -135,14 +125,6 @@ type addRouteStep struct {
|
||||
router definition.Router
|
||||
}
|
||||
|
||||
// newRouteStep initializes and returns a new routing step.
|
||||
func newRouteStep(router definition.Router) (definition.Step, error) {
|
||||
if router == nil {
|
||||
return nil, fmt.Errorf("invalid config: Router plugin not configured")
|
||||
}
|
||||
return &addRouteStep{router: router}, nil
|
||||
}
|
||||
|
||||
// Run executes the routing step.
|
||||
func (s *addRouteStep) Run(ctx *model.StepContext) error {
|
||||
route, err := s.router.Route(ctx, ctx.Request.URL, ctx.Body)
|
||||
@@ -150,8 +132,5 @@ func (s *addRouteStep) Run(ctx *model.StepContext) error {
|
||||
return fmt.Errorf("failed to determine route: %w", err)
|
||||
}
|
||||
log.Debugf(ctx, "Routing to %#v", route)
|
||||
ctx.Route = route
|
||||
|
||||
log.Debugf(ctx, "ctx.Route to %#v", ctx.Route)
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user