Update on review comments

1. changed implementations from camelcase to lowercase
2. added removed initSteps.
3. updated rout struct for model.go
This commit is contained in:
MohitKatare-protean
2025-03-29 16:08:28 +05:30
parent 98c04b1f9f
commit 9bfb65079e
9 changed files with 77 additions and 17 deletions

View File

@@ -111,7 +111,7 @@ func (h *stdHandler) subID(ctx context.Context) string {
// route handles request forwarding or message publishing based on the routing type.
func route(ctx *model.StepContext, r *http.Request, w http.ResponseWriter, pb definition.Publisher) {
log.Debugf(ctx, "Routing to ctx.Route to %#v", ctx.Route)
switch ctx.Route.Type {
switch ctx.Route.TargetType {
case "url":
log.Infof(ctx.Context, "Forwarding request to URL: %s", ctx.Route.URL)
proxy(r, w, ctx.Route.URL)
@@ -123,7 +123,7 @@ func route(ctx *model.StepContext, r *http.Request, w http.ResponseWriter, pb de
response.SendNack(ctx, w, err)
return
}
log.Infof(ctx.Context, "Publishing message to: %s", ctx.Route.Publisher)
log.Infof(ctx.Context, "Publishing message to: %s", ctx.Route.PublisherID)
if err := pb.Publish(ctx, ctx.Body); err != nil {
log.Errorf(ctx.Context, err, "Failed to publish message")
http.Error(w, "Error publishing message", http.StatusInternalServerError)
@@ -131,7 +131,7 @@ func route(ctx *model.StepContext, r *http.Request, w http.ResponseWriter, pb de
return
}
default:
err := fmt.Errorf("unknown route type: %s", ctx.Route.Type)
err := fmt.Errorf("unknown route type: %s", ctx.Route.TargetType)
log.Errorf(ctx.Context, err, "Invalid configuration:%v", err)
response.SendNack(ctx, w, err)
return
@@ -230,6 +230,35 @@ func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Conf
steps[c.ID] = step
}
// Register processing steps
for _, step := range cfg.Steps {
var s definition.Step
var err error
switch step {
case "sign":
s, err = newSignStep(h.signer, h.km)
case "validateSign":
s, err = newValidateSignStep(h.signValidator, h.km)
case "validateSchema":
s, err = newValidateSchemaStep(h.schemaValidator)
case "addRoute":
s, err = newRouteStep(h.router)
case "broadcast":
s = &broadcastStep{}
default:
if customStep, exists := steps[step]; exists {
s = customStep
} else {
return fmt.Errorf("unrecognized step: %s", step)
}
}
if err != nil {
return err
}
h.steps = append(h.steps, s)
}
log.Infof(ctx, "Processor steps initialized: %v", cfg.Steps)
return nil
}

View File

@@ -1,6 +1,7 @@
package handler
import (
"context"
"fmt"
"strings"
"time"
@@ -112,6 +113,15 @@ type validateSchemaStep struct {
validator definition.SchemaValidator
}
// newValidateSchemaStep creates and returns the validateSchema step after validation.
func newValidateSchemaStep(schemaValidator definition.SchemaValidator) (definition.Step, error) {
if schemaValidator == nil {
return nil, fmt.Errorf("invalid config: SchemaValidator plugin not configured")
}
log.Debug(context.Background(), "adding schema validator")
return &validateSchemaStep{validator: schemaValidator}, nil
}
// Run executes the schema validation step.
func (s *validateSchemaStep) Run(ctx *model.StepContext) error {
if err := s.validator.Validate(ctx, ctx.Request.URL, ctx.Body); err != nil {
@@ -125,6 +135,14 @@ type addRouteStep struct {
router definition.Router
}
// newRouteStep creates and returns the addRoute step after validation.
func newRouteStep(router definition.Router) (definition.Step, error) {
if router == nil {
return nil, fmt.Errorf("invalid config: Router plugin not configured")
}
return &addRouteStep{router: router}, nil
}
// Run executes the routing step.
func (s *addRouteStep) Run(ctx *model.StepContext) error {
route, err := s.router.Route(ctx, ctx.Request.URL, ctx.Body)
@@ -132,5 +150,20 @@ func (s *addRouteStep) Run(ctx *model.StepContext) error {
return fmt.Errorf("failed to determine route: %w", err)
}
log.Debugf(ctx, "Routing to %#v", route)
ctx.Route = &model.Route{
TargetType: route.TargetType,
PublisherID: route.PublisherID,
URL: route.URL,
}
log.Debugf(ctx, "ctx.Route to %#v", ctx.Route)
return nil
}
// broadcastStep is a stub implementation of a step that handles broadcasting messages.
type broadcastStep struct{}
// Run executes the broadcast step.
func (b *broadcastStep) Run(ctx *model.StepContext) error {
// TODO: Implement broadcast logic if needed
return nil
}