diff --git a/config/local-dev.yaml b/config/local-dev.yaml index 0520f90..abe11f8 100644 --- a/config/local-dev.yaml +++ b/config/local-dev.yaml @@ -22,8 +22,14 @@ modules: handler: type: std role: bap - registryUrl: http://localhost:8080/reg plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms keyManager: id: keymanager config: @@ -59,8 +65,14 @@ modules: handler: type: std role: bap - registryUrl: http://localhost:8080/reg plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms keyManager: id: keymanager config: @@ -92,8 +104,14 @@ modules: handler: type: std role: bpp - registryUrl: http://localhost:8080/reg plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms keyManager: id: keymanager config: @@ -125,8 +143,14 @@ modules: handler: type: std role: bpp - registryUrl: http://localhost:8080/reg plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms keyManager: id: keymanager config: diff --git a/config/local-simple.yaml b/config/local-simple.yaml index 466ce78..e4a29f2 100644 --- a/config/local-simple.yaml +++ b/config/local-simple.yaml @@ -22,8 +22,14 @@ modules: handler: type: std role: bap - registryUrl: http://registry:3030/subscribers plugins: + registry: + id: registry + config: + url: http://registry:3030/subscribers + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms keyManager: id: simplekeymanager config: @@ -36,7 +42,7 @@ modules: cache: id: cache config: - addr: localhost:6379 + addr: redis:6379 schemaValidator: id: schemavalidator config: @@ -61,8 +67,14 @@ modules: handler: type: std role: bap - registryUrl: http://registry:3030/subscribers plugins: + registry: + id: registry + config: + url: http://registry:3030/subscribers + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms keyManager: id: simplekeymanager config: @@ -75,7 +87,7 @@ modules: cache: id: cache config: - addr: localhost:6379 + addr: redis:6379 router: id: router config: @@ -96,8 +108,14 @@ modules: handler: type: std role: bpp - registryUrl: http://registry:3030/subscribers plugins: + registry: + id: registry + config: + url: http://registry:3030/subscribers + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms keyManager: id: simplekeymanager config: @@ -110,7 +128,7 @@ modules: cache: id: cache config: - addr: localhost:6379 + addr: redis:6379 schemaValidator: id: schemavalidator config: @@ -129,8 +147,14 @@ modules: handler: type: std role: bpp - registryUrl: http://registry:3030/subscribers plugins: + registry: + id: registry + config: + url: http://registry:3030/subscribers + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms keyManager: id: simplekeymanager config: @@ -143,7 +167,7 @@ modules: cache: id: cache config: - addr: localhost:6379 + addr: redis:6379 router: id: router config: @@ -152,4 +176,4 @@ modules: id: signer steps: - addRoute - - sign \ No newline at end of file + - sign diff --git a/config/onix-bap/adapter.yaml b/config/onix-bap/adapter.yaml index dda19fc..598fc9b 100644 --- a/config/onix-bap/adapter.yaml +++ b/config/onix-bap/adapter.yaml @@ -23,8 +23,14 @@ modules: handler: type: std role: bap - registryUrl: http://localhost:8080/reg plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms keyManager: id: secretskeymanager config: @@ -61,9 +67,15 @@ modules: path: /bap/caller/ handler: type: std - registryUrl: http://localhost:8080/reg role: bap plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms keyManager: id: secretskeymanager config: diff --git a/config/onix-bpp/adapter.yaml b/config/onix-bpp/adapter.yaml index aa3d242..6036752 100644 --- a/config/onix-bpp/adapter.yaml +++ b/config/onix-bpp/adapter.yaml @@ -24,8 +24,14 @@ modules: type: std role: bpp subscriberId: bpp1 - registryUrl: http://localhost:8080/reg plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms keyManager: id: secretskeymanager config: @@ -63,8 +69,14 @@ modules: handler: type: std role: bpp - registryUrl: http://localhost:8080/reg plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms keyManager: id: secretskeymanager config: diff --git a/config/onix/adapter.yaml b/config/onix/adapter.yaml index e3a785b..20ccbab 100644 --- a/config/onix/adapter.yaml +++ b/config/onix/adapter.yaml @@ -23,8 +23,14 @@ modules: handler: type: std role: bap - registryUrl: http://localhost:8080/reg plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms keyManager: id: secretskeymanager config: @@ -61,9 +67,15 @@ modules: path: /bap/caller/ handler: type: std - registryUrl: http://localhost:8080/reg role: bap plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms keyManager: id: secretskeymanager config: @@ -102,8 +114,14 @@ modules: type: std role: bpp subscriberId: bpp1 - registryUrl: http://localhost:8080/reg plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms keyManager: id: secretskeymanager config: @@ -141,8 +159,14 @@ modules: handler: type: std role: bpp - registryUrl: http://localhost:8080/reg plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: 3 + retry_wait_min: 100ms + retry_wait_max: 500ms keyManager: id: secretskeymanager config: diff --git a/core/module/handler/config.go b/core/module/handler/config.go index b35eb15..2de4476 100644 --- a/core/module/handler/config.go +++ b/core/module/handler/config.go @@ -19,6 +19,7 @@ type PluginManager interface { Signer(ctx context.Context, cfg *plugin.Config) (definition.Signer, error) Step(ctx context.Context, cfg *plugin.Config) (definition.Step, error) Cache(ctx context.Context, cfg *plugin.Config) (definition.Cache, error) + Registry(ctx context.Context, cfg *plugin.Config) (definition.RegistryLookup, error) KeyManager(ctx context.Context, cache definition.Cache, rLookup definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error) SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error) } @@ -39,6 +40,7 @@ type PluginCfg struct { Signer *plugin.Config `yaml:"signer,omitempty"` Router *plugin.Config `yaml:"router,omitempty"` Cache *plugin.Config `yaml:"cache,omitempty"` + Registry *plugin.Config `yaml:"registry,omitempty"` KeyManager *plugin.Config `yaml:"keyManager,omitempty"` Middleware []plugin.Config `yaml:"middleware,omitempty"` Steps []plugin.Config diff --git a/core/module/handler/stdHandler.go b/core/module/handler/stdHandler.go index 8d472ee..c674abe 100644 --- a/core/module/handler/stdHandler.go +++ b/core/module/handler/stdHandler.go @@ -8,7 +8,6 @@ import ( "net/http" "net/http/httputil" - "github.com/beckn-one/beckn-onix/core/module/client" "github.com/beckn-one/beckn-onix/pkg/log" "github.com/beckn-one/beckn-onix/pkg/model" "github.com/beckn-one/beckn-onix/pkg/plugin" @@ -22,6 +21,7 @@ type stdHandler struct { steps []definition.Step signValidator definition.SignValidator cache definition.Cache + registry definition.RegistryLookup km definition.KeyManager schemaValidator definition.SchemaValidator router definition.Router @@ -38,7 +38,7 @@ func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Ha role: cfg.Role, } // Initialize plugins. - if err := h.initPlugins(ctx, mgr, &cfg.Plugins, cfg.RegistryURL); err != nil { + if err := h.initPlugins(ctx, mgr, &cfg.Plugins); err != nil { return nil, fmt.Errorf("failed to initialize plugins: %w", err) } // Initialize steps. @@ -169,8 +169,8 @@ func loadPlugin[T any](ctx context.Context, name string, cfg *plugin.Config, mgr return plugin, nil } -// loadKeyManager loads the KeyManager plugin using the provided PluginManager, cache, and registry URL. -func loadKeyManager(ctx context.Context, mgr PluginManager, cache definition.Cache, cfg *plugin.Config, regURL string) (definition.KeyManager, error) { +// loadKeyManager loads the KeyManager plugin using the provided PluginManager, cache, and registry. +func loadKeyManager(ctx context.Context, mgr PluginManager, cache definition.Cache, registry definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error) { if cfg == nil { log.Debug(ctx, "Skipping KeyManager plugin: not configured") return nil, nil @@ -178,10 +178,12 @@ func loadKeyManager(ctx context.Context, mgr PluginManager, cache definition.Cac if cache == nil { return nil, fmt.Errorf("failed to load KeyManager plugin (%s): Cache plugin not configured", cfg.ID) } - rClient := client.NewRegisteryClient(&client.Config{RegisteryURL: regURL}) - km, err := mgr.KeyManager(ctx, cache, rClient, cfg) + if registry == nil { + return nil, fmt.Errorf("failed to load KeyManager plugin (%s): Registry plugin not configured", cfg.ID) + } + km, err := mgr.KeyManager(ctx, cache, registry, cfg) if err != nil { - return nil, fmt.Errorf("failed to load cache plugin (%s): %w", cfg.ID, err) + return nil, fmt.Errorf("failed to load KeyManager plugin (%s): %w", cfg.ID, err) } log.Debugf(ctx, "Loaded Keymanager plugin: %s", cfg.ID) @@ -189,12 +191,15 @@ func loadKeyManager(ctx context.Context, mgr PluginManager, cache definition.Cac } // initPlugins initializes required plugins for the processor. -func (h *stdHandler) initPlugins(ctx context.Context, mgr PluginManager, cfg *PluginCfg, regURL string) error { +func (h *stdHandler) initPlugins(ctx context.Context, mgr PluginManager, cfg *PluginCfg) error { var err error if h.cache, err = loadPlugin(ctx, "Cache", cfg.Cache, mgr.Cache); err != nil { return err } - if h.km, err = loadKeyManager(ctx, mgr, h.cache, cfg.KeyManager, regURL); err != nil { + if h.registry, err = loadPlugin(ctx, "Registry", cfg.Registry, mgr.Registry); err != nil { + return err + } + if h.km, err = loadKeyManager(ctx, mgr, h.cache, h.registry, cfg.KeyManager); err != nil { return err } if h.signValidator, err = loadPlugin(ctx, "SignValidator", cfg.SignValidator, mgr.SignValidator); err != nil { @@ -259,4 +264,4 @@ func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Conf } log.Infof(ctx, "Processor steps initialized: %v", cfg.Steps) return nil -} \ No newline at end of file +} diff --git a/install/build-plugins.sh b/install/build-plugins.sh index f998e0e..d41b47f 100755 --- a/install/build-plugins.sh +++ b/install/build-plugins.sh @@ -13,6 +13,7 @@ plugins=( "keymanager" "simplekeymanager" "publisher" + "registry" "reqpreprocessor" "router" "schemavalidator" diff --git a/pkg/plugin/definition/registry.go b/pkg/plugin/definition/registry.go index 8684a6a..65d5176 100644 --- a/pkg/plugin/definition/registry.go +++ b/pkg/plugin/definition/registry.go @@ -7,5 +7,11 @@ import ( ) type RegistryLookup interface { + // looks up Registry entry to obtain public keys to validate signature of the incoming message Lookup(ctx context.Context, req *model.Subscription) ([]model.Subscription, error) } + +// RegistryLookupProvider initializes a new registry lookup instance. +type RegistryLookupProvider interface { + New(context.Context, map[string]string) (RegistryLookup, func() error, error) +} diff --git a/pkg/plugin/implementation/registry/cmd/plugin.go b/pkg/plugin/implementation/registry/cmd/plugin.go new file mode 100644 index 0000000..52b7766 --- /dev/null +++ b/pkg/plugin/implementation/registry/cmd/plugin.go @@ -0,0 +1,83 @@ +package main + +import ( + "context" + "errors" + "fmt" + "strconv" + "time" + + "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/beckn-one/beckn-onix/pkg/plugin/definition" + "github.com/beckn-one/beckn-onix/pkg/plugin/implementation/registry" +) + +// registryProvider implements the RegistryLookupProvider interface for the registry plugin. +type registryProvider struct{} + +// newRegistryFunc is a function type that creates a new Registry instance. +var newRegistryFunc = registry.New + +// parseConfig parses the configuration map and returns a registry.Config with optional parameters. +func (r registryProvider) parseConfig(config map[string]string) (*registry.Config, error) { + registryConfig := ®istry.Config{ + URL: config["url"], + } + + // Parse retry_max + if retryMaxStr, exists := config["retry_max"]; exists && retryMaxStr != "" { + retryMax, err := strconv.Atoi(retryMaxStr) + if err != nil { + return nil, fmt.Errorf("invalid retry_max value '%s': %w", retryMaxStr, err) + } + registryConfig.RetryMax = retryMax + } + + // Parse retry_wait_min + if retryWaitMinStr, exists := config["retry_wait_min"]; exists && retryWaitMinStr != "" { + retryWaitMin, err := time.ParseDuration(retryWaitMinStr) + if err != nil { + return nil, fmt.Errorf("invalid retry_wait_min value '%s': %w", retryWaitMinStr, err) + } + registryConfig.RetryWaitMin = retryWaitMin + } + + // Parse retry_wait_max + if retryWaitMaxStr, exists := config["retry_wait_max"]; exists && retryWaitMaxStr != "" { + retryWaitMax, err := time.ParseDuration(retryWaitMaxStr) + if err != nil { + return nil, fmt.Errorf("invalid retry_wait_max value '%s': %w", retryWaitMaxStr, err) + } + registryConfig.RetryWaitMax = retryWaitMax + } + + return registryConfig, nil +} + +// New creates a new registry plugin instance. +func (r registryProvider) New(ctx context.Context, config map[string]string) (definition.RegistryLookup, func() error, error) { + if ctx == nil { + return nil, nil, errors.New("context cannot be nil") + } + + // Parse configuration from map using the dedicated method + registryConfig, err := r.parseConfig(config) + if err != nil { + log.Errorf(ctx, err, "Failed to parse registry configuration") + return nil, nil, fmt.Errorf("failed to parse registry configuration: %w", err) + } + + log.Debugf(ctx, "Registry config mapped: %+v", registryConfig) + + registryClient, closer, err := newRegistryFunc(ctx, registryConfig) + if err != nil { + log.Errorf(ctx, err, "Failed to create registry instance") + return nil, nil, err + } + + log.Infof(ctx, "Registry instance created successfully") + return registryClient, closer, nil +} + +// Provider is the exported plugin instance +var Provider = registryProvider{} diff --git a/pkg/plugin/implementation/registry/cmd/plugin_test.go b/pkg/plugin/implementation/registry/cmd/plugin_test.go new file mode 100644 index 0000000..c460c0a --- /dev/null +++ b/pkg/plugin/implementation/registry/cmd/plugin_test.go @@ -0,0 +1,189 @@ +package main + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/beckn-one/beckn-onix/pkg/plugin/implementation/registry" +) + +// mockRegistryClient is a mock implementation of the RegistryLookup interface +// for testing purposes. +type mockRegistryClient struct{} + +func (m *mockRegistryClient) Subscribe(ctx context.Context, subscription interface{}) error { + return nil +} +func (m *mockRegistryClient) Lookup(ctx context.Context, subscription interface{}) ([]interface{}, error) { + return nil, nil +} + +// TestRegistryProvider_ParseConfig tests the configuration parsing logic. +func TestRegistryProvider_ParseConfig(t *testing.T) { + t.Parallel() + provider := registryProvider{} + + testCases := []struct { + name string + config map[string]string + expected *registry.Config + expectedErr string + }{ + { + name: "should parse a full, valid config", + config: map[string]string{ + "url": "http://test.com", + "retry_max": "5", + "retry_wait_min": "100ms", + "retry_wait_max": "2s", + }, + expected: ®istry.Config{ + URL: "http://test.com", + RetryMax: 5, + RetryWaitMin: 100 * time.Millisecond, + RetryWaitMax: 2 * time.Second, + }, + expectedErr: "", + }, + { + name: "should handle missing optional values", + config: map[string]string{ + "url": "http://test.com", + }, + expected: ®istry.Config{ + URL: "http://test.com", + }, + expectedErr: "", + }, + { + name: "should return error for invalid retry_max", + config: map[string]string{ + "url": "http://test.com", + "retry_max": "not-a-number", + }, + expected: nil, + expectedErr: "invalid retry_max value 'not-a-number'", + }, + { + name: "should return error for invalid retry_wait_min", + config: map[string]string{ + "url": "http://test.com", + "retry_wait_min": "bad-duration", + }, + expected: nil, + expectedErr: "invalid retry_wait_min value 'bad-duration'", + }, + { + name: "should return error for invalid retry_wait_max", + config: map[string]string{ + "url": "http://test.com", + "retry_wait_max": "30parsecs", + }, + expected: nil, + expectedErr: "invalid retry_wait_max value '30parsecs'", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + parsedConfig, err := provider.parseConfig(tc.config) + + if tc.expectedErr != "" { + if err == nil { + t.Fatalf("expected an error containing '%s' but got none", tc.expectedErr) + } + if e, a := tc.expectedErr, err.Error(); !(a == e || (len(a) > len(e) && a[:len(e)] == e)) { + t.Errorf("expected error message to contain '%s', but got '%s'", e, a) + } + return + } + + if err != nil { + t.Fatalf("expected no error, but got: %v", err) + } + if parsedConfig.URL != tc.expected.URL { + t.Errorf("expected URL '%s', got '%s'", tc.expected.URL, parsedConfig.URL) + } + if parsedConfig.RetryMax != tc.expected.RetryMax { + t.Errorf("expected RetryMax %d, got %d", tc.expected.RetryMax, parsedConfig.RetryMax) + } + if parsedConfig.RetryWaitMin != tc.expected.RetryWaitMin { + t.Errorf("expected RetryWaitMin %v, got %v", tc.expected.RetryWaitMin, parsedConfig.RetryWaitMin) + } + if parsedConfig.RetryWaitMax != tc.expected.RetryWaitMax { + t.Errorf("expected RetryWaitMax %v, got %v", tc.expected.RetryWaitMax, parsedConfig.RetryWaitMax) + } + }) + } +} + +// TestRegistryProvider_New tests the plugin's main constructor. +func TestRegistryProvider_New(t *testing.T) { + t.Parallel() + provider := registryProvider{} + originalNewRegistryFunc := newRegistryFunc + + // Cleanup to restore the original function after the test + t.Cleanup(func() { + newRegistryFunc = originalNewRegistryFunc + }) + + t.Run("should return error if context is nil", func(t *testing.T) { + _, _, err := provider.New(nil, map[string]string{}) + if err == nil { + t.Fatal("expected an error for nil context but got none") + } + if err.Error() != "context cannot be nil" { + t.Errorf("expected 'context cannot be nil' error, got '%s'", err.Error()) + } + }) + + t.Run("should return error if config parsing fails", func(t *testing.T) { + config := map[string]string{"retry_max": "invalid"} + _, _, err := provider.New(context.Background(), config) + if err == nil { + t.Fatal("expected an error for bad config but got none") + } + }) + + t.Run("should return error if registry.New fails", func(t *testing.T) { + // Mock the newRegistryFunc to return an error + expectedErr := errors.New("registry creation failed") + newRegistryFunc = func(ctx context.Context, cfg *registry.Config) (*registry.RegistryClient, func() error, error) { + return nil, nil, expectedErr + } + + config := map[string]string{"url": "http://test.com"} + _, _, err := provider.New(context.Background(), config) + if err == nil { + t.Fatal("expected an error from registry.New but got none") + } + if !errors.Is(err, expectedErr) { + t.Errorf("expected error '%v', got '%v'", expectedErr, err) + } + }) + + t.Run("should succeed and return a valid instance", func(t *testing.T) { + // Mock the newRegistryFunc for a successful case + mockCloser := func() error { fmt.Println("closed"); return nil } + newRegistryFunc = func(ctx context.Context, cfg *registry.Config) (*registry.RegistryClient, func() error, error) { + // Return a non-nil client of th correct concrete type + return new(registry.RegistryClient), mockCloser, nil + } + + config := map[string]string{"url": "http://test.com"} + instance, closer, err := provider.New(context.Background(), config) + if err != nil { + t.Fatalf("expected no error, but got: %v", err) + } + if instance == nil { + t.Fatal("expected a non-nil instance") + } + if closer == nil { + t.Fatal("expected a non-nil closer function") + } + }) +} diff --git a/pkg/plugin/implementation/registry/registry.go b/pkg/plugin/implementation/registry/registry.go new file mode 100644 index 0000000..455360c --- /dev/null +++ b/pkg/plugin/implementation/registry/registry.go @@ -0,0 +1,156 @@ +package registry + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/beckn-one/beckn-onix/pkg/model" + "github.com/hashicorp/go-retryablehttp" +) + +// Config holds configuration parameters for the registry client. +type Config struct { + URL string `yaml:"url" json:"url"` + RetryMax int `yaml:"retry_max" json:"retry_max"` + RetryWaitMin time.Duration `yaml:"retry_wait_min" json:"retry_wait_min"` + RetryWaitMax time.Duration `yaml:"retry_wait_max" json:"retry_wait_max"` +} + +// RegistryClient encapsulates the logic for calling the registry endpoints. +type RegistryClient struct { + config *Config + client *retryablehttp.Client +} + +// validate checks if the provided registry configuration is valid. +func validate(cfg *Config) error { + if cfg == nil { + return fmt.Errorf("registry config cannot be nil") + } + if cfg.URL == "" { + return fmt.Errorf("registry URL cannot be empty") + } + return nil +} + +// New creates a new instance of RegistryClient. +func New(ctx context.Context, cfg *Config) (*RegistryClient, func() error, error) { + log.Debugf(ctx, "Initializing Registry client with config: %+v", cfg) + + if err := validate(cfg); err != nil { + return nil, nil, err + } + + rc := retryablehttp.NewClient() + + // Configure retry settings if provided + if cfg.RetryMax > 0 { + rc.RetryMax = cfg.RetryMax + } + if cfg.RetryWaitMin > 0 { + rc.RetryWaitMin = cfg.RetryWaitMin + } + if cfg.RetryWaitMax > 0 { + rc.RetryWaitMax = cfg.RetryWaitMax + } + + client := &RegistryClient{ + config: cfg, + client: rc, + } + + // Cleanup function + closer := func() error { + log.Debugf(ctx, "Cleaning up Registry client resources") + if client.client != nil { + client.client.HTTPClient.CloseIdleConnections() + } + return nil + } + + log.Infof(ctx, "Registry client is created successfully") + return client, closer, nil +} + +// Subscribe calls the /subscribe endpoint with retry. +func (c *RegistryClient) Subscribe(ctx context.Context, subscription *model.Subscription) error { + subscribeURL := fmt.Sprintf("%s/subscribe", c.config.URL) + + jsonData, err := json.Marshal(subscription) + if err != nil { + return model.NewBadReqErr(fmt.Errorf("failed to marshal subscription data: %w", err)) + } + + req, err := retryablehttp.NewRequest("POST", subscribeURL, bytes.NewBuffer(jsonData)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req = req.WithContext(ctx) + + log.Debugf(ctx, "Making subscribe request to: %s", subscribeURL) + resp, err := c.client.Do(req) + if err != nil { + return fmt.Errorf("failed to send subscribe request with retry: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + log.Errorf(ctx, nil, "Subscribe request failed with status: %s, response: %s", resp.Status, string(body)) + return fmt.Errorf("subscribe request failed with status: %s", resp.Status) + } + + log.Debugf(ctx, "Subscribe request is initiated successfully") + return nil +} + +// Lookup calls the /lookup endpoint with retry and returns a slice of Subscription. +func (c *RegistryClient) Lookup(ctx context.Context, subscription *model.Subscription) ([]model.Subscription, error) { + lookupURL := fmt.Sprintf("%s/lookup", c.config.URL) + + jsonData, err := json.Marshal(subscription) + if err != nil { + return nil, model.NewBadReqErr(fmt.Errorf("failed to marshal subscription data: %w", err)) + } + + req, err := retryablehttp.NewRequest("POST", lookupURL, bytes.NewBuffer(jsonData)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req = req.WithContext(ctx) + + log.Debugf(ctx, "Making lookup request to: %s", lookupURL) + resp, err := c.client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send lookup request with retry: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + log.Errorf(ctx, nil, "Lookup request failed with status: %s, response: %s", resp.Status, string(body)) + return nil, fmt.Errorf("lookup request failed with status: %s", resp.Status) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + var results []model.Subscription + err = json.Unmarshal(body, &results) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal response body: %w", err) + } + + log.Debugf(ctx, "Lookup request successful, found %d subscriptions", len(results)) + return results, nil +} diff --git a/pkg/plugin/implementation/registry/registry_test.go b/pkg/plugin/implementation/registry/registry_test.go new file mode 100644 index 0000000..1498973 --- /dev/null +++ b/pkg/plugin/implementation/registry/registry_test.go @@ -0,0 +1,204 @@ +package registry + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/beckn-one/beckn-onix/pkg/model" +) + +// TestValidate ensures the config validation logic works correctly. +func TestValidate(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + config *Config + expectedErr string + }{ + { + name: "should return error for nil config", + config: nil, + expectedErr: "registry config cannot be nil", + }, + { + name: "should return error for empty URL", + config: &Config{URL: ""}, + expectedErr: "registry URL cannot be empty", + }, + { + name: "should succeed for valid config", + config: &Config{URL: "http://localhost:8080"}, + expectedErr: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := validate(tc.config) + if tc.expectedErr != "" { + if err == nil { + t.Fatalf("expected an error but got none") + } + if err.Error() != tc.expectedErr { + t.Errorf("expected error message '%s', but got '%s'", tc.expectedErr, err.Error()) + } + } else { + if err != nil { + t.Fatalf("expected no error, but got: %v", err) + } + } + }) + } +} + +// TestNew tests the constructor for the RegistryClient. +func TestNew(t *testing.T) { + t.Parallel() + + t.Run("should fail with invalid config", func(t *testing.T) { + _, _, err := New(context.Background(), &Config{URL: ""}) + if err == nil { + t.Fatal("expected an error for invalid config but got none") + } + }) + + t.Run("should succeed with valid config and set defaults", func(t *testing.T) { + cfg := &Config{URL: "http://test.com"} + client, closer, err := New(context.Background(), cfg) + if err != nil { + t.Fatalf("expected no error, but got: %v", err) + } + if client == nil { + t.Fatal("expected client to be non-nil") + } + if closer == nil { + t.Fatal("expected closer to be non-nil") + } + // Check if default retry settings are applied (go-retryablehttp defaults) + if client.client.RetryMax != 4 { + t.Errorf("expected default RetryMax of 4, but got %d", client.client.RetryMax) + } + }) + + t.Run("should apply custom retry settings", func(t *testing.T) { + cfg := &Config{ + URL: "http://test.com", + RetryMax: 10, + RetryWaitMin: 100 * time.Millisecond, + RetryWaitMax: 1 * time.Second, + } + client, _, err := New(context.Background(), cfg) + if err != nil { + t.Fatalf("expected no error, but got: %v", err) + } + + if client.client.RetryMax != cfg.RetryMax { + t.Errorf("expected RetryMax to be %d, but got %d", cfg.RetryMax, client.client.RetryMax) + } + if client.client.RetryWaitMin != cfg.RetryWaitMin { + t.Errorf("expected RetryWaitMin to be %v, but got %v", cfg.RetryWaitMin, client.client.RetryWaitMin) + } + if client.client.RetryWaitMax != cfg.RetryWaitMax { + t.Errorf("expected RetryWaitMax to be %v, but got %v", cfg.RetryWaitMax, client.client.RetryWaitMax) + } + }) +} + +// TestRegistryClient_Lookup tests the Lookup method. +func TestRegistryClient_Lookup(t *testing.T) { + t.Parallel() + + t.Run("should succeed and unmarshal response", func(t *testing.T) { + expectedSubs := []model.Subscription{ + { + KeyID: "test-key", + SigningPublicKey: "test-signing-key", + EncrPublicKey: "test-encryption-key", + ValidFrom: time.Now(), + ValidUntil: time.Now().Add(24 * time.Hour), + Status: "SUBSCRIBED", + }, + { + KeyID: "test-key-2", + SigningPublicKey: "test-signing-key-2", + EncrPublicKey: "test-encryption-key-2", + ValidFrom: time.Now(), + ValidUntil: time.Now().Add(48 * time.Hour), + Status: "SUBSCRIBED", + }, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/lookup" { + t.Errorf("expected path '/lookup', got '%s'", r.URL.Path) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(expectedSubs); err != nil { + t.Fatalf("failed to write response: %v", err) + } + })) + defer server.Close() + + client, closer, err := New(context.Background(), &Config{URL: server.URL}) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + defer closer() + + results, err := client.Lookup(context.Background(), &model.Subscription{}) + if err != nil { + t.Fatalf("lookup failed: %v", err) + } + + if len(results) != len(expectedSubs) { + t.Fatalf("expected %d results, but got %d", len(expectedSubs), len(results)) + } + + if results[0].SubscriberID != expectedSubs[0].SubscriberID { + t.Errorf("expected subscriber ID '%s', got '%s'", expectedSubs[0].SubscriberID, results[0].SubscriberID) + } + }) + + t.Run("should fail on non-200 status", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + })) + defer server.Close() + + client, closer, err := New(context.Background(), &Config{URL: server.URL}) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + defer closer() + + _, err = client.Lookup(context.Background(), &model.Subscription{}) + if err == nil { + t.Fatal("expected an error but got none") + } + }) + + t.Run("should fail on bad JSON response", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `[{"subscriber_id": "bad-json"`) // Malformed JSON + })) + defer server.Close() + + client, closer, err := New(context.Background(), &Config{URL: server.URL, RetryMax: 1}) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + defer closer() + + _, err = client.Lookup(context.Background(), &model.Subscription{}) + if err == nil { + t.Fatal("expected an unmarshaling error but got none") + } + }) +} diff --git a/pkg/plugin/manager.go b/pkg/plugin/manager.go index 49140dc..d2aba88 100644 --- a/pkg/plugin/manager.go +++ b/pkg/plugin/manager.go @@ -361,6 +361,27 @@ func (m *Manager) SimpleKeyManager(ctx context.Context, cache definition.Cache, return km, nil } +// Registry returns a RegistryLookup instance based on the provided configuration. +// It registers a cleanup function for resource management. +func (m *Manager) Registry(ctx context.Context, cfg *Config) (definition.RegistryLookup, error) { + rp, err := provider[definition.RegistryLookupProvider](m.plugins, cfg.ID) + if err != nil { + return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err) + } + registry, closer, err := rp.New(ctx, cfg.Config) + if err != nil { + return nil, err + } + if closer != nil { + m.closers = append(m.closers, func() { + if err := closer(); err != nil { + panic(err) + } + }) + } + return registry, nil +} + // Validator implements handler.PluginManager. func (m *Manager) Validator(ctx context.Context, cfg *Config) (definition.SchemaValidator, error) { panic("unimplemented")