changes for error in request flow
This commit is contained in:
@@ -16,6 +16,7 @@ import (
|
|||||||
"github.com/beckn/beckn-onix/core/module"
|
"github.com/beckn/beckn-onix/core/module"
|
||||||
"github.com/beckn/beckn-onix/core/module/handler"
|
"github.com/beckn/beckn-onix/core/module/handler"
|
||||||
"github.com/beckn/beckn-onix/pkg/log"
|
"github.com/beckn/beckn-onix/pkg/log"
|
||||||
|
"github.com/beckn/beckn-onix/pkg/model"
|
||||||
"github.com/beckn/beckn-onix/pkg/plugin"
|
"github.com/beckn/beckn-onix/pkg/plugin"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -96,7 +97,7 @@ func newServer(ctx context.Context, mgr handler.PluginManager, cfg *Config) (htt
|
|||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
err := module.Register(ctx, cfg.Modules, mux, mgr)
|
err := module.Register(ctx, cfg.Modules, mux, mgr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to register modules: %w", err)
|
return nil, model.NewBadReqErr(fmt.Errorf("failed to register modules: %w", err))
|
||||||
}
|
}
|
||||||
return mux, nil
|
return mux, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -40,23 +40,23 @@ func (c *registryClient) Subscribe(ctx context.Context, subscription *model.Subs
|
|||||||
|
|
||||||
jsonData, err := json.Marshal(subscription)
|
jsonData, err := json.Marshal(subscription)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to marshal subscription data: %w", err)
|
return model.NewBadReqErr(fmt.Errorf("failed to marshal subscription data: %w", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := retryablehttp.NewRequest("POST", subscribeURL, bytes.NewBuffer(jsonData))
|
req, err := retryablehttp.NewRequest("POST", subscribeURL, bytes.NewBuffer(jsonData))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create request: %w", err)
|
return model.NewBadReqErr(fmt.Errorf("failed to create request: %w", err))
|
||||||
}
|
}
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
resp, err := c.client.Do(req)
|
resp, err := c.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to send request with retry: %w", err)
|
return model.NewBadReqErr(fmt.Errorf("failed to send request with retry: %w", err))
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
return fmt.Errorf("subscribe request failed with status: %s", resp.Status)
|
return model.NewBadReqErr(fmt.Errorf("subscribe request failed with status: %s", resp.Status))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -67,28 +67,28 @@ func (c *registryClient) Lookup(ctx context.Context, subscription *model.Subscri
|
|||||||
|
|
||||||
jsonData, err := json.Marshal(subscription)
|
jsonData, err := json.Marshal(subscription)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to marshal subscription data: %w", err)
|
return nil, model.NewBadReqErr(fmt.Errorf("failed to marshal subscription data: %w", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := retryablehttp.NewRequest("POST", lookupURL, bytes.NewBuffer(jsonData))
|
req, err := retryablehttp.NewRequest("POST", lookupURL, bytes.NewBuffer(jsonData))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
return nil, model.NewBadReqErr(fmt.Errorf("failed to create request: %w", err))
|
||||||
}
|
}
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
resp, err := c.client.Do(req)
|
resp, err := c.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to send request with retry: %w", err)
|
return nil, model.NewBadReqErr(fmt.Errorf("failed to send request with retry: %w", err))
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
return nil, fmt.Errorf("lookup request failed with status: %s", resp.Status)
|
return nil, model.NewBadReqErr(fmt.Errorf("lookup request failed with status: %s", resp.Status))
|
||||||
}
|
}
|
||||||
|
|
||||||
body, err := io.ReadAll(resp.Body)
|
body, err := io.ReadAll(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to read response body: %w", err)
|
return nil, model.NewBadReqErr(fmt.Errorf("failed to read response body: %w", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
var results []model.Subscription
|
var results []model.Subscription
|
||||||
|
|||||||
@@ -178,12 +178,12 @@ func loadKeyManager(ctx context.Context, mgr PluginManager, cache definition.Cac
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
if cache == nil {
|
if cache == nil {
|
||||||
return nil, fmt.Errorf("failed to load KeyManager plugin (%s): Cache plugin not configured", cfg.ID)
|
return nil, model.NewBadReqErr(fmt.Errorf("failed to load KeyManager plugin (%s): Cache plugin not configured", cfg.ID))
|
||||||
}
|
}
|
||||||
rClient := client.NewRegisteryClient(&client.Config{RegisteryURL: regURL})
|
rClient := client.NewRegisteryClient(&client.Config{RegisteryURL: regURL})
|
||||||
km, err := mgr.KeyManager(ctx, cache, rClient, cfg)
|
km, err := mgr.KeyManager(ctx, cache, rClient, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to load cache plugin (%s): %w", cfg.ID, err)
|
return nil, model.NewBadReqErr(fmt.Errorf("failed to load cache plugin (%s): %w", cfg.ID, err))
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf(ctx, "Loaded Keymanager plugin: %s", cfg.ID)
|
log.Debugf(ctx, "Loaded Keymanager plugin: %s", cfg.ID)
|
||||||
|
|||||||
@@ -105,7 +105,7 @@ func (s *validateSignStep) validate(ctx *model.StepContext, value string) error
|
|||||||
headerParts := strings.Split(value, "|")
|
headerParts := strings.Split(value, "|")
|
||||||
ids := strings.Split(headerParts[0], "\"")
|
ids := strings.Split(headerParts[0], "\"")
|
||||||
if len(ids) < 2 || len(headerParts) < 3 {
|
if len(ids) < 2 || len(headerParts) < 3 {
|
||||||
return fmt.Errorf("malformed sign header")
|
return model.NewBadReqErr(fmt.Errorf("malformed sign header"))
|
||||||
}
|
}
|
||||||
subID := ids[1]
|
subID := ids[1]
|
||||||
keyID := headerParts[1]
|
keyID := headerParts[1]
|
||||||
@@ -114,7 +114,7 @@ func (s *validateSignStep) validate(ctx *model.StepContext, value string) error
|
|||||||
return fmt.Errorf("failed to get validation key: %w", err)
|
return fmt.Errorf("failed to get validation key: %w", err)
|
||||||
}
|
}
|
||||||
if err := s.validator.Validate(ctx, ctx.Body, value, key); err != nil {
|
if err := s.validator.Validate(ctx, ctx.Body, value, key); err != nil {
|
||||||
return fmt.Errorf("sign validation failed: %w", err)
|
return model.NewSignValidationErr(fmt.Errorf("sign validation failed: %w", err))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -136,7 +136,7 @@ func newValidateSchemaStep(schemaValidator definition.SchemaValidator) (definiti
|
|||||||
// Run executes the schema validation step.
|
// Run executes the schema validation step.
|
||||||
func (s *validateSchemaStep) Run(ctx *model.StepContext) error {
|
func (s *validateSchemaStep) Run(ctx *model.StepContext) error {
|
||||||
if err := s.validator.Validate(ctx, ctx.Request.URL, ctx.Body); err != nil {
|
if err := s.validator.Validate(ctx, ctx.Request.URL, ctx.Body); err != nil {
|
||||||
return fmt.Errorf("schema validation failed: %w", err)
|
return err.(*model.SchemaValidationErr).BecknError()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user