From bbb1dbc843d508c856a50de40891a181ec7c8c5e Mon Sep 17 00:00:00 2001 From: MohitKatare-protean Date: Mon, 31 Mar 2025 22:40:59 +0530 Subject: [PATCH] Update on the review comments --- core/module/client/registery.go | 24 ++-- core/module/client/registry_test.go | 132 +++++++----------- core/module/handler/stdHandler.go | 2 +- core/module/handler/step.go | 19 ++- go.mod | 2 +- pkg/model/model.go | 13 +- .../reqpreprocessor/reqpreprocessor.go | 8 +- pkg/plugin/implementation/router/router.go | 13 +- pkg/plugin/manager.go | 55 ++++++-- 9 files changed, 139 insertions(+), 129 deletions(-) diff --git a/core/module/client/registery.go b/core/module/client/registery.go index 57b8e71..3045ebb 100644 --- a/core/module/client/registery.go +++ b/core/module/client/registery.go @@ -21,22 +21,22 @@ type Config struct { RetryWaitMax time.Duration } -// RegisteryClient encapsulates the logic for calling the subscribe and lookup endpoints. -type RegisteryClient struct { - Config *Config - Client *retryablehttp.Client +// registryClient encapsulates the logic for calling the subscribe and lookup endpoints. +type registryClient struct { + config *Config + client *retryablehttp.Client } // NewRegisteryClient creates a new instance of Client. -func NewRegisteryClient(config *Config) *RegisteryClient { +func NewRegisteryClient(config *Config) *registryClient { retryClient := retryablehttp.NewClient() - return &RegisteryClient{Config: config, Client: retryClient} + return ®istryClient{config: config, client: retryClient} } // Subscribe calls the /subscribe endpoint with retry. -func (c *RegisteryClient) Subscribe(ctx context.Context, subscription *model.Subscription) error { - subscribeURL := fmt.Sprintf("%s/subscribe", c.Config.RegisteryURL) +func (c *registryClient) Subscribe(ctx context.Context, subscription *model.Subscription) error { + subscribeURL := fmt.Sprintf("%s/subscribe", c.config.RegisteryURL) jsonData, err := json.Marshal(subscription) if err != nil { @@ -49,7 +49,7 @@ func (c *RegisteryClient) Subscribe(ctx context.Context, subscription *model.Sub } req.Header.Set("Content-Type", "application/json") - resp, err := c.Client.Do(req) + resp, err := c.client.Do(req) if err != nil { return fmt.Errorf("failed to send request with retry: %w", err) } @@ -62,8 +62,8 @@ func (c *RegisteryClient) Subscribe(ctx context.Context, subscription *model.Sub } // Lookup calls the /lookup endpoint with retry and returns a slice of Subscription. -func (c *RegisteryClient) Lookup(ctx context.Context, subscription *model.Subscription) ([]model.Subscription, error) { - lookupURL := fmt.Sprintf("%s/lookUp", c.Config.RegisteryURL) +func (c *registryClient) Lookup(ctx context.Context, subscription *model.Subscription) ([]model.Subscription, error) { + lookupURL := fmt.Sprintf("%s/lookUp", c.config.RegisteryURL) jsonData, err := json.Marshal(subscription) if err != nil { @@ -76,7 +76,7 @@ func (c *RegisteryClient) Lookup(ctx context.Context, subscription *model.Subscr } req.Header.Set("Content-Type", "application/json") - resp, err := c.Client.Do(req) + resp, err := c.client.Do(req) if err != nil { return nil, fmt.Errorf("failed to send request with retry: %w", err) } diff --git a/core/module/client/registry_test.go b/core/module/client/registry_test.go index f8e7eaf..5185597 100644 --- a/core/module/client/registry_test.go +++ b/core/module/client/registry_test.go @@ -46,42 +46,37 @@ func TestSubscribeSuccess(t *testing.T) { ValidUntil: time.Now().Add(24 * time.Hour), Status: "SUBSCRIBED", } - err := client.Subscribe(context.Background(), subscription) - require.NoError(t, err) + if err != nil { + t.Fatalf("Subscribe() failed with error: %v", err) + } } -// TestSubscribeFailureWithMock tests different failure scenarios using a mock client. -func TestSubscribeFailureWithMock(t *testing.T) { +// TestSubscribeFailure tests different failure scenarios using a mock client. +func TestSubscribeFailure(t *testing.T) { tests := []struct { - name string - mockError error - expectError bool + name string + mockError error }{ { - name: "Failed subscription - Internal Server Error", - mockError: errors.New("internal server error"), - expectError: true, + name: "Failed subscription - Internal Server Error", + mockError: errors.New("internal server error"), }, { - name: "Failed subscription - Bad Request", - mockError: errors.New("bad request"), - expectError: true, + name: "Failed subscription - Bad Request", + mockError: errors.New("bad request"), }, { - name: "Request Timeout", - mockError: context.DeadlineExceeded, - expectError: true, + name: "Request Timeout", + mockError: context.DeadlineExceeded, }, { - name: "Network Failure", - mockError: errors.New("network failure"), - expectError: true, + name: "Network Failure", + mockError: errors.New("network failure"), }, { - name: "JSON Marshalling Failure", - mockError: errors.New("json marshalling failure"), - expectError: true, + name: "JSON Marshalling Failure", + mockError: errors.New("json marshalling failure"), }, } @@ -103,67 +98,21 @@ func TestSubscribeFailureWithMock(t *testing.T) { } if tt.name == "JSON Marshalling Failure" { - invalidSubscription := &model.Subscription{} - invalidSubscription.ValidFrom = time.Unix(0, 0) // Invalid zero timestamp - subscription = invalidSubscription + subscription = &model.Subscription{} // Example of an invalid object } err := mockClient.Subscribe(context.Background(), subscription) - if tt.expectError { - require.Error(t, err) - } else { - require.NoError(t, err) - } + require.Error(t, err) // Directly checking for an error since all cases should fail }) } } // TestLookupSuccess tests successful lookup scenarios. func TestLookupSuccess(t *testing.T) { - tests := []struct { - name string - responseBody interface{} - responseCode int - }{ - { - name: "Successful lookup", - responseBody: []model.Subscription{ - { - Subscriber: model.Subscriber{ - SubscriberID: "123", - }, - KeyID: "test-key", - SigningPublicKey: "test-signing-key", - EncrPublicKey: "test-encryption-key", - ValidFrom: time.Now(), - ValidUntil: time.Now().Add(24 * time.Hour), - Status: "SUBSCRIBED", - }, - }, - responseCode: http.StatusOK, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(tc.responseCode) - if tc.responseBody != nil { - bodyBytes, _ := json.Marshal(tc.responseBody) - w.Write(bodyBytes) - } - })) - defer server.Close() - - config := &Config{ - RegisteryURL: server.URL, - RetryMax: 1, - RetryWaitMin: 1 * time.Millisecond, - RetryWaitMax: 2 * time.Millisecond, - } - rClient := NewRegisteryClient(config) - ctx := context.Background() - subscription := &model.Subscription{ + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + response := []model.Subscription{ + { Subscriber: model.Subscriber{ SubscriberID: "123", }, @@ -173,14 +122,37 @@ func TestLookupSuccess(t *testing.T) { ValidFrom: time.Now(), ValidUntil: time.Now().Add(24 * time.Hour), Status: "SUBSCRIBED", - } + }, + } + bodyBytes, _ := json.Marshal(response) + w.Write(bodyBytes) + })) + defer server.Close() - result, err := rClient.Lookup(ctx, subscription) - require.NoError(t, err) - require.NotEmpty(t, result) - require.Equal(t, subscription.Subscriber.SubscriberID, result[0].Subscriber.SubscriberID) - }) + config := &Config{ + RegisteryURL: server.URL, + RetryMax: 1, + RetryWaitMin: 1 * time.Millisecond, + RetryWaitMax: 2 * time.Millisecond, } + rClient := NewRegisteryClient(config) + ctx := context.Background() + subscription := &model.Subscription{ + Subscriber: model.Subscriber{ + SubscriberID: "123", + }, + KeyID: "test-key", + SigningPublicKey: "test-signing-key", + EncrPublicKey: "test-encryption-key", + ValidFrom: time.Now(), + ValidUntil: time.Now().Add(24 * time.Hour), + Status: "SUBSCRIBED", + } + + result, err := rClient.Lookup(ctx, subscription) + require.NoError(t, err) + require.NotEmpty(t, result) + require.Equal(t, subscription.Subscriber.SubscriberID, result[0].Subscriber.SubscriberID) } // TestLookupFailure tests failure scenarios for the Lookup function. diff --git a/core/module/handler/stdHandler.go b/core/module/handler/stdHandler.go index 1a39d51..712fa2d 100644 --- a/core/module/handler/stdHandler.go +++ b/core/module/handler/stdHandler.go @@ -245,7 +245,7 @@ func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Conf case "validateSchema": s, err = newValidateSchemaStep(h.schemaValidator) case "addRoute": - s, err = newRouteStep(h.router) + s, err = newAddRouteStep(h.router) case "broadcast": s = &broadcastStep{} default: diff --git a/core/module/handler/step.go b/core/module/handler/step.go index 9074843..cd44b1a 100644 --- a/core/module/handler/step.go +++ b/core/module/handler/step.go @@ -41,7 +41,9 @@ func (s *signStep) Run(ctx *model.StepContext) error { if err != nil { return fmt.Errorf("failed to sign request: %w", err) } - authHeader := fmt.Sprintf("Signature keyId=\"%s|%s|ed25519\",algorithm=\"ed25519\",created=\"%d\",expires=\"%d\",headers=\"(created) (expires) digest\",signature=\"%s\"", ctx.SubID, keyID, createdAt, validTill, sign) + + authHeader := s.generateAuthHeader(ctx.SubID, keyID, createdAt, validTill, sign) + header := model.AuthHeaderSubscriber if ctx.Role == model.RoleGateway { header = model.AuthHeaderGateway @@ -50,6 +52,15 @@ func (s *signStep) Run(ctx *model.StepContext) error { return nil } +// generateAuthHeader constructs the authorization header for the signed request. +// It includes key ID, algorithm, creation time, expiration time, required headers, and signature. +func (s *signStep) generateAuthHeader(subID, keyID string, createdAt, validTill int64, signature string) string { + return fmt.Sprintf( + "Signature keyId=\"%s|%s|ed25519\",algorithm=\"ed25519\",created=\"%d\",expires=\"%d\",headers=\"(created) (expires) digest\",signature=\"%s\"", + subID, keyID, createdAt, validTill, signature, + ) +} + // validateSignStep represents the signature validation step. type validateSignStep struct { validator definition.SignValidator @@ -135,8 +146,8 @@ type addRouteStep struct { router definition.Router } -// newRouteStep creates and returns the addRoute step after validation. -func newRouteStep(router definition.Router) (definition.Step, error) { +// newAddRouteStep creates and returns the addRoute step after validation. +func newAddRouteStep(router definition.Router) (definition.Step, error) { if router == nil { return nil, fmt.Errorf("invalid config: Router plugin not configured") } @@ -149,13 +160,11 @@ func (s *addRouteStep) Run(ctx *model.StepContext) error { if err != nil { return fmt.Errorf("failed to determine route: %w", err) } - log.Debugf(ctx, "Routing to %#v", route) ctx.Route = &model.Route{ TargetType: route.TargetType, PublisherID: route.PublisherID, URL: route.URL, } - log.Debugf(ctx, "ctx.Route to %#v", ctx.Route) return nil } diff --git a/go.mod b/go.mod index 044bb06..3ec4911 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/beckn/beckn-onix -go 1.24.1 +go 1.24 require ( github.com/kr/pretty v0.3.1 // indirect diff --git a/pkg/model/model.go b/pkg/model/model.go index ec5e29c..621bdeb 100644 --- a/pkg/model/model.go +++ b/pkg/model/model.go @@ -81,17 +81,18 @@ func (r *Role) UnmarshalYAML(unmarshal func(interface{}) error) error { // Route represents a network route for message processing. type Route struct { - TargetType string // "url" or "msgq" or "bap" or "bpp" + TargetType string // "url" or "publisher" PublisherID string // For message queues URL *url.URL // For API calls } +// Keyset represents a collection of cryptographic keys used for signing and encryption. type Keyset struct { - UniqueKeyID string - SigningPrivate string - SigningPublic string - EncrPrivate string - EncrPublic string + UniqueKeyID string // UniqueKeyID is the identifier for the key pair. + SigningPrivate string // SigningPrivate is the private key used for signing operations. + SigningPublic string // SigningPublic is the public key corresponding to the signing private key. + EncrPrivate string // EncrPrivate is the private key used for encryption operations. + EncrPublic string // EncrPublic is the public key corresponding to the encryption private key. } // StepContext holds context information for a request processing step. diff --git a/pkg/plugin/implementation/reqpreprocessor/reqpreprocessor.go b/pkg/plugin/implementation/reqpreprocessor/reqpreprocessor.go index 020df4d..24ffa67 100644 --- a/pkg/plugin/implementation/reqpreprocessor/reqpreprocessor.go +++ b/pkg/plugin/implementation/reqpreprocessor/reqpreprocessor.go @@ -13,9 +13,10 @@ import ( "github.com/google/uuid" ) +// Config holds the configuration settings for the application. type Config struct { - ContextKeys []string - Role string + ContextKeys []string // ContextKeys is a list of context keys used for request processing. + Role string // Role specifies the role of the entity (e.g., subscriber, gateway). } type becknRequest struct { @@ -25,6 +26,8 @@ type becknRequest struct { const contextKey = "context" const subscriberIDKey = "subscriber_id" +// NewPreProcessor creates a middleware that processes incoming HTTP requests by extracting +// and modifying the request context based on the provided configuration. func NewPreProcessor(cfg *Config) (func(http.Handler) http.Handler, error) { if err := validateConfig(cfg); err != nil { return nil, err @@ -51,6 +54,7 @@ func NewPreProcessor(cfg *Config) (func(http.Handler) http.Handler, error) { } if subID != nil { log.Debugf(ctx, "adding subscriberId to request:%s, %v", subscriberIDKey, subID) + // TODO: Add a ContextKey type in model and use it here instead of raw context key. ctx = context.WithValue(ctx, subscriberIDKey, subID) } for _, key := range cfg.ContextKeys { diff --git a/pkg/plugin/implementation/router/router.go b/pkg/plugin/implementation/router/router.go index ba36715..beaac8b 100644 --- a/pkg/plugin/implementation/router/router.go +++ b/pkg/plugin/implementation/router/router.go @@ -201,11 +201,6 @@ func validateRules(rules []routingRule) error { // Route determines the routing destination based on the request context. func (r *Router) Route(ctx context.Context, url *url.URL, body []byte) (*model.Route, error) { - if r == nil { - - log.Debug(ctx, "In Router :Router not set") - } - log.Debugf(ctx, "In Router: Routing request with url %v and body: %s", url, string(body)) // Parse the body to extract domain and version var requestBody struct { Context struct { @@ -218,17 +213,11 @@ func (r *Router) Route(ctx context.Context, url *url.URL, body []byte) (*model.R if err := json.Unmarshal(body, &requestBody); err != nil { return nil, fmt.Errorf("error parsing request body: %w", err) } - log.Debugf(ctx, "In Router: Routing request with %v and body: %#s", url, requestBody) + log.Debugf(ctx, "In Router: Routing request with %v and body: %v", url, requestBody) // Extract the endpoint from the URL endpoint := path.Base(url.Path) - if r.rules == nil { - - log.Debug(ctx, "In Router :Routing rules not set") - } - log.Debugf(ctx, "In Router :Routing rules len :%d", len(r.rules)) - // Lookup route in the optimized map domainRules, ok := r.rules[requestBody.Context.Domain] if !ok { diff --git a/pkg/plugin/manager.go b/pkg/plugin/manager.go index 23b517e..bd969c4 100644 --- a/pkg/plugin/manager.go +++ b/pkg/plugin/manager.go @@ -17,15 +17,19 @@ import ( "github.com/beckn/beckn-onix/pkg/plugin/definition" ) +// TODO: Add unit tests for the plugin manager functions to ensure proper functionality and error handling. + +// Manager is responsible for managing dynamically loaded plugins. type Manager struct { - plugins map[string]*plugin.Plugin - closers []func() + plugins map[string]*plugin.Plugin // plugins holds the dynamically loaded plugins. + closers []func() // closers contains functions to release resources when the manager is closed. } func validateMgrCfg(cfg *ManagerConfig) error { return nil } +// NewManager initializes a new Manager instance by loading plugins from the specified configuration. func NewManager(ctx context.Context, cfg *ManagerConfig) (*Manager, func(), error) { if err := validateMgrCfg(cfg); err != nil { return nil, nil, fmt.Errorf("Invalid config: %w", err) @@ -64,14 +68,10 @@ func plugins(ctx context.Context, cfg *ManagerConfig) (map[string]*plugin.Plugin if strings.HasSuffix(d.Name(), ".so") { id := strings.TrimSuffix(d.Name(), ".so") // Extract plugin ID - - log.Debugf(ctx, "Loading plugin: %s", id) - start := time.Now() - p, err := plugin.Open(path) // Use the full path + p, elapsed, err := loadPlugin(ctx, path, id) if err != nil { - return fmt.Errorf("failed to open plugin %s: %w", id, err) + return err } - elapsed := time.Since(start) plugins[id] = p log.Debugf(ctx, "Loaded plugin: %s in %s", id, elapsed) } @@ -85,6 +85,20 @@ func plugins(ctx context.Context, cfg *ManagerConfig) (map[string]*plugin.Plugin return plugins, nil } +// loadPlugin attempts to load a plugin from the given path and logs the execution time. +func loadPlugin(ctx context.Context, path, id string) (*plugin.Plugin, time.Duration, error) { + log.Debugf(ctx, "Loading plugin: %s", id) + start := time.Now() + + p, err := plugin.Open(path) + if err != nil { + return nil, 0, fmt.Errorf("failed to open plugin %s: %w", id, err) + } + + elapsed := time.Since(start) + return p, elapsed, nil +} + func provider[T any](plugins map[string]*plugin.Plugin, id string) (T, error) { var zero T pgn, ok := plugins[id] @@ -105,8 +119,8 @@ func provider[T any](plugins map[string]*plugin.Plugin, id string) (T, error) { return pp, nil } -// GetPublisher returns a Publisher instance based on the provided configuration. -// It reuses the loaded provider. +// Publisher returns a Publisher instance based on the provided configuration. +// It reuses the loaded provider and registers a cleanup function. func (m *Manager) Publisher(ctx context.Context, cfg *Config) (definition.Publisher, error) { pp, err := provider[definition.PublisherProvider](m.plugins, cfg.ID) if err != nil { @@ -120,12 +134,15 @@ func (m *Manager) Publisher(ctx context.Context, cfg *Config) (definition.Publis return p, nil } +// addCloser appends a cleanup function to the Manager's closers list. func (m *Manager) addCloser(closer func()) { if closer != nil { m.closers = append(m.closers, closer) } } +// SchemaValidator returns a SchemaValidator instance based on the provided configuration. +// It registers a cleanup function for resource management. func (m *Manager) SchemaValidator(ctx context.Context, cfg *Config) (definition.SchemaValidator, error) { vp, err := provider[definition.SchemaValidatorProvider](m.plugins, cfg.ID) if err != nil { @@ -145,12 +162,17 @@ func (m *Manager) SchemaValidator(ctx context.Context, cfg *Config) (definition. return v, nil } +// Router returns a Router instance based on the provided configuration. +// It registers a cleanup function for resource management. func (m *Manager) Router(ctx context.Context, cfg *Config) (definition.Router, error) { rp, err := provider[definition.RouterProvider](m.plugins, cfg.ID) if err != nil { return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err) } router, closer, err := rp.New(ctx, cfg.Config) + if err != nil { + return nil, err + } if closer != nil { m.addCloser(func() { if err := closer(); err != nil { @@ -161,6 +183,7 @@ func (m *Manager) Router(ctx context.Context, cfg *Config) (definition.Router, e return router, nil } +// Middleware returns an HTTP middleware function based on the provided configuration. func (m *Manager) Middleware(ctx context.Context, cfg *Config) (func(http.Handler) http.Handler, error) { mwp, err := provider[definition.MiddlewareProvider](m.plugins, cfg.ID) if err != nil { @@ -169,6 +192,7 @@ func (m *Manager) Middleware(ctx context.Context, cfg *Config) (func(http.Handle return mwp.New(ctx, cfg.Config) } +// Step returns a Step instance based on the provided configuration. func (m *Manager) Step(ctx context.Context, cfg *Config) (definition.Step, error) { sp, err := provider[definition.StepProvider](m.plugins, cfg.ID) if err != nil { @@ -181,6 +205,8 @@ func (m *Manager) Step(ctx context.Context, cfg *Config) (definition.Step, error return step, error } +// Cache returns a Cache instance based on the provided configuration. +// It registers a cleanup function for resource management. func (m *Manager) Cache(ctx context.Context, cfg *Config) (definition.Cache, error) { cp, err := provider[definition.CacheProvider](m.plugins, cfg.ID) if err != nil { @@ -198,6 +224,8 @@ func (m *Manager) Cache(ctx context.Context, cfg *Config) (definition.Cache, err return c, nil } +// Signer returns a Signer instance based on the provided configuration. +// It registers a cleanup function for resource management. func (m *Manager) Signer(ctx context.Context, cfg *Config) (definition.Signer, error) { sp, err := provider[definition.SignerProvider](m.plugins, cfg.ID) if err != nil { @@ -216,6 +244,9 @@ func (m *Manager) Signer(ctx context.Context, cfg *Config) (definition.Signer, e } return s, nil } + +// Encryptor returns an Encrypter instance based on the provided configuration. +// It registers a cleanup function for resource management. func (m *Manager) Encryptor(ctx context.Context, cfg *Config) (definition.Encrypter, error) { ep, err := provider[definition.EncrypterProvider](m.plugins, cfg.ID) if err != nil { @@ -235,6 +266,8 @@ func (m *Manager) Encryptor(ctx context.Context, cfg *Config) (definition.Encryp return encrypter, nil } +// Decryptor returns a Decrypter instance based on the provided configuration. +// It registers a cleanup function for resource management. func (m *Manager) Decryptor(ctx context.Context, cfg *Config) (definition.Decrypter, error) { dp, err := provider[definition.DecrypterProvider](m.plugins, cfg.ID) if err != nil { @@ -257,6 +290,8 @@ func (m *Manager) Decryptor(ctx context.Context, cfg *Config) (definition.Decryp return decrypter, nil } +// SignValidator returns a SignValidator instance based on the provided configuration. +// It registers a cleanup function for resource management. func (m *Manager) SignValidator(ctx context.Context, cfg *Config) (definition.SignValidator, error) { svp, err := provider[definition.SignValidatorProvider](m.plugins, cfg.ID) if err != nil {