Merge pull request #519 from Beckn-One/511-registry-as-plugin
[Issue 511] feat: Migrate registry client to a plugin This is a breaking change and requires corresponding changes in downstream ONIX implementation and also the Config files.
This commit is contained in:
@@ -22,8 +22,14 @@ modules:
|
|||||||
handler:
|
handler:
|
||||||
type: std
|
type: std
|
||||||
role: bap
|
role: bap
|
||||||
registryUrl: http://localhost:8080/reg
|
|
||||||
plugins:
|
plugins:
|
||||||
|
registry:
|
||||||
|
id: registry
|
||||||
|
config:
|
||||||
|
url: http://localhost:8080/reg
|
||||||
|
retry_max: 3
|
||||||
|
retry_wait_min: 100ms
|
||||||
|
retry_wait_max: 500ms
|
||||||
keyManager:
|
keyManager:
|
||||||
id: keymanager
|
id: keymanager
|
||||||
config:
|
config:
|
||||||
@@ -59,8 +65,14 @@ modules:
|
|||||||
handler:
|
handler:
|
||||||
type: std
|
type: std
|
||||||
role: bap
|
role: bap
|
||||||
registryUrl: http://localhost:8080/reg
|
|
||||||
plugins:
|
plugins:
|
||||||
|
registry:
|
||||||
|
id: registry
|
||||||
|
config:
|
||||||
|
url: http://localhost:8080/reg
|
||||||
|
retry_max: 3
|
||||||
|
retry_wait_min: 100ms
|
||||||
|
retry_wait_max: 500ms
|
||||||
keyManager:
|
keyManager:
|
||||||
id: keymanager
|
id: keymanager
|
||||||
config:
|
config:
|
||||||
@@ -92,8 +104,14 @@ modules:
|
|||||||
handler:
|
handler:
|
||||||
type: std
|
type: std
|
||||||
role: bpp
|
role: bpp
|
||||||
registryUrl: http://localhost:8080/reg
|
|
||||||
plugins:
|
plugins:
|
||||||
|
registry:
|
||||||
|
id: registry
|
||||||
|
config:
|
||||||
|
url: http://localhost:8080/reg
|
||||||
|
retry_max: 3
|
||||||
|
retry_wait_min: 100ms
|
||||||
|
retry_wait_max: 500ms
|
||||||
keyManager:
|
keyManager:
|
||||||
id: keymanager
|
id: keymanager
|
||||||
config:
|
config:
|
||||||
@@ -125,8 +143,14 @@ modules:
|
|||||||
handler:
|
handler:
|
||||||
type: std
|
type: std
|
||||||
role: bpp
|
role: bpp
|
||||||
registryUrl: http://localhost:8080/reg
|
|
||||||
plugins:
|
plugins:
|
||||||
|
registry:
|
||||||
|
id: registry
|
||||||
|
config:
|
||||||
|
url: http://localhost:8080/reg
|
||||||
|
retry_max: 3
|
||||||
|
retry_wait_min: 100ms
|
||||||
|
retry_wait_max: 500ms
|
||||||
keyManager:
|
keyManager:
|
||||||
id: keymanager
|
id: keymanager
|
||||||
config:
|
config:
|
||||||
|
|||||||
@@ -22,8 +22,14 @@ modules:
|
|||||||
handler:
|
handler:
|
||||||
type: std
|
type: std
|
||||||
role: bap
|
role: bap
|
||||||
registryUrl: http://registry:3030/subscribers
|
|
||||||
plugins:
|
plugins:
|
||||||
|
registry:
|
||||||
|
id: registry
|
||||||
|
config:
|
||||||
|
url: http://registry:3030/subscribers
|
||||||
|
retry_max: 3
|
||||||
|
retry_wait_min: 100ms
|
||||||
|
retry_wait_max: 500ms
|
||||||
keyManager:
|
keyManager:
|
||||||
id: simplekeymanager
|
id: simplekeymanager
|
||||||
config:
|
config:
|
||||||
@@ -36,7 +42,7 @@ modules:
|
|||||||
cache:
|
cache:
|
||||||
id: cache
|
id: cache
|
||||||
config:
|
config:
|
||||||
addr: localhost:6379
|
addr: redis:6379
|
||||||
schemaValidator:
|
schemaValidator:
|
||||||
id: schemavalidator
|
id: schemavalidator
|
||||||
config:
|
config:
|
||||||
@@ -61,8 +67,14 @@ modules:
|
|||||||
handler:
|
handler:
|
||||||
type: std
|
type: std
|
||||||
role: bap
|
role: bap
|
||||||
registryUrl: http://registry:3030/subscribers
|
|
||||||
plugins:
|
plugins:
|
||||||
|
registry:
|
||||||
|
id: registry
|
||||||
|
config:
|
||||||
|
url: http://registry:3030/subscribers
|
||||||
|
retry_max: 3
|
||||||
|
retry_wait_min: 100ms
|
||||||
|
retry_wait_max: 500ms
|
||||||
keyManager:
|
keyManager:
|
||||||
id: simplekeymanager
|
id: simplekeymanager
|
||||||
config:
|
config:
|
||||||
@@ -75,7 +87,7 @@ modules:
|
|||||||
cache:
|
cache:
|
||||||
id: cache
|
id: cache
|
||||||
config:
|
config:
|
||||||
addr: localhost:6379
|
addr: redis:6379
|
||||||
router:
|
router:
|
||||||
id: router
|
id: router
|
||||||
config:
|
config:
|
||||||
@@ -96,8 +108,14 @@ modules:
|
|||||||
handler:
|
handler:
|
||||||
type: std
|
type: std
|
||||||
role: bpp
|
role: bpp
|
||||||
registryUrl: http://registry:3030/subscribers
|
|
||||||
plugins:
|
plugins:
|
||||||
|
registry:
|
||||||
|
id: registry
|
||||||
|
config:
|
||||||
|
url: http://registry:3030/subscribers
|
||||||
|
retry_max: 3
|
||||||
|
retry_wait_min: 100ms
|
||||||
|
retry_wait_max: 500ms
|
||||||
keyManager:
|
keyManager:
|
||||||
id: simplekeymanager
|
id: simplekeymanager
|
||||||
config:
|
config:
|
||||||
@@ -110,7 +128,7 @@ modules:
|
|||||||
cache:
|
cache:
|
||||||
id: cache
|
id: cache
|
||||||
config:
|
config:
|
||||||
addr: localhost:6379
|
addr: redis:6379
|
||||||
schemaValidator:
|
schemaValidator:
|
||||||
id: schemavalidator
|
id: schemavalidator
|
||||||
config:
|
config:
|
||||||
@@ -129,8 +147,14 @@ modules:
|
|||||||
handler:
|
handler:
|
||||||
type: std
|
type: std
|
||||||
role: bpp
|
role: bpp
|
||||||
registryUrl: http://registry:3030/subscribers
|
|
||||||
plugins:
|
plugins:
|
||||||
|
registry:
|
||||||
|
id: registry
|
||||||
|
config:
|
||||||
|
url: http://registry:3030/subscribers
|
||||||
|
retry_max: 3
|
||||||
|
retry_wait_min: 100ms
|
||||||
|
retry_wait_max: 500ms
|
||||||
keyManager:
|
keyManager:
|
||||||
id: simplekeymanager
|
id: simplekeymanager
|
||||||
config:
|
config:
|
||||||
@@ -143,7 +167,7 @@ modules:
|
|||||||
cache:
|
cache:
|
||||||
id: cache
|
id: cache
|
||||||
config:
|
config:
|
||||||
addr: localhost:6379
|
addr: redis:6379
|
||||||
router:
|
router:
|
||||||
id: router
|
id: router
|
||||||
config:
|
config:
|
||||||
@@ -152,4 +176,4 @@ modules:
|
|||||||
id: signer
|
id: signer
|
||||||
steps:
|
steps:
|
||||||
- addRoute
|
- addRoute
|
||||||
- sign
|
- sign
|
||||||
|
|||||||
@@ -23,8 +23,14 @@ modules:
|
|||||||
handler:
|
handler:
|
||||||
type: std
|
type: std
|
||||||
role: bap
|
role: bap
|
||||||
registryUrl: http://localhost:8080/reg
|
|
||||||
plugins:
|
plugins:
|
||||||
|
registry:
|
||||||
|
id: registry
|
||||||
|
config:
|
||||||
|
url: http://localhost:8080/reg
|
||||||
|
retry_max: 3
|
||||||
|
retry_wait_min: 100ms
|
||||||
|
retry_wait_max: 500ms
|
||||||
keyManager:
|
keyManager:
|
||||||
id: secretskeymanager
|
id: secretskeymanager
|
||||||
config:
|
config:
|
||||||
@@ -61,9 +67,15 @@ modules:
|
|||||||
path: /bap/caller/
|
path: /bap/caller/
|
||||||
handler:
|
handler:
|
||||||
type: std
|
type: std
|
||||||
registryUrl: http://localhost:8080/reg
|
|
||||||
role: bap
|
role: bap
|
||||||
plugins:
|
plugins:
|
||||||
|
registry:
|
||||||
|
id: registry
|
||||||
|
config:
|
||||||
|
url: http://localhost:8080/reg
|
||||||
|
retry_max: 3
|
||||||
|
retry_wait_min: 100ms
|
||||||
|
retry_wait_max: 500ms
|
||||||
keyManager:
|
keyManager:
|
||||||
id: secretskeymanager
|
id: secretskeymanager
|
||||||
config:
|
config:
|
||||||
|
|||||||
@@ -24,8 +24,14 @@ modules:
|
|||||||
type: std
|
type: std
|
||||||
role: bpp
|
role: bpp
|
||||||
subscriberId: bpp1
|
subscriberId: bpp1
|
||||||
registryUrl: http://localhost:8080/reg
|
|
||||||
plugins:
|
plugins:
|
||||||
|
registry:
|
||||||
|
id: registry
|
||||||
|
config:
|
||||||
|
url: http://localhost:8080/reg
|
||||||
|
retry_max: 3
|
||||||
|
retry_wait_min: 100ms
|
||||||
|
retry_wait_max: 500ms
|
||||||
keyManager:
|
keyManager:
|
||||||
id: secretskeymanager
|
id: secretskeymanager
|
||||||
config:
|
config:
|
||||||
@@ -63,8 +69,14 @@ modules:
|
|||||||
handler:
|
handler:
|
||||||
type: std
|
type: std
|
||||||
role: bpp
|
role: bpp
|
||||||
registryUrl: http://localhost:8080/reg
|
|
||||||
plugins:
|
plugins:
|
||||||
|
registry:
|
||||||
|
id: registry
|
||||||
|
config:
|
||||||
|
url: http://localhost:8080/reg
|
||||||
|
retry_max: 3
|
||||||
|
retry_wait_min: 100ms
|
||||||
|
retry_wait_max: 500ms
|
||||||
keyManager:
|
keyManager:
|
||||||
id: secretskeymanager
|
id: secretskeymanager
|
||||||
config:
|
config:
|
||||||
|
|||||||
@@ -23,8 +23,14 @@ modules:
|
|||||||
handler:
|
handler:
|
||||||
type: std
|
type: std
|
||||||
role: bap
|
role: bap
|
||||||
registryUrl: http://localhost:8080/reg
|
|
||||||
plugins:
|
plugins:
|
||||||
|
registry:
|
||||||
|
id: registry
|
||||||
|
config:
|
||||||
|
url: http://localhost:8080/reg
|
||||||
|
retry_max: 3
|
||||||
|
retry_wait_min: 100ms
|
||||||
|
retry_wait_max: 500ms
|
||||||
keyManager:
|
keyManager:
|
||||||
id: secretskeymanager
|
id: secretskeymanager
|
||||||
config:
|
config:
|
||||||
@@ -61,9 +67,15 @@ modules:
|
|||||||
path: /bap/caller/
|
path: /bap/caller/
|
||||||
handler:
|
handler:
|
||||||
type: std
|
type: std
|
||||||
registryUrl: http://localhost:8080/reg
|
|
||||||
role: bap
|
role: bap
|
||||||
plugins:
|
plugins:
|
||||||
|
registry:
|
||||||
|
id: registry
|
||||||
|
config:
|
||||||
|
url: http://localhost:8080/reg
|
||||||
|
retry_max: 3
|
||||||
|
retry_wait_min: 100ms
|
||||||
|
retry_wait_max: 500ms
|
||||||
keyManager:
|
keyManager:
|
||||||
id: secretskeymanager
|
id: secretskeymanager
|
||||||
config:
|
config:
|
||||||
@@ -102,8 +114,14 @@ modules:
|
|||||||
type: std
|
type: std
|
||||||
role: bpp
|
role: bpp
|
||||||
subscriberId: bpp1
|
subscriberId: bpp1
|
||||||
registryUrl: http://localhost:8080/reg
|
|
||||||
plugins:
|
plugins:
|
||||||
|
registry:
|
||||||
|
id: registry
|
||||||
|
config:
|
||||||
|
url: http://localhost:8080/reg
|
||||||
|
retry_max: 3
|
||||||
|
retry_wait_min: 100ms
|
||||||
|
retry_wait_max: 500ms
|
||||||
keyManager:
|
keyManager:
|
||||||
id: secretskeymanager
|
id: secretskeymanager
|
||||||
config:
|
config:
|
||||||
@@ -141,8 +159,14 @@ modules:
|
|||||||
handler:
|
handler:
|
||||||
type: std
|
type: std
|
||||||
role: bpp
|
role: bpp
|
||||||
registryUrl: http://localhost:8080/reg
|
|
||||||
plugins:
|
plugins:
|
||||||
|
registry:
|
||||||
|
id: registry
|
||||||
|
config:
|
||||||
|
url: http://localhost:8080/reg
|
||||||
|
retry_max: 3
|
||||||
|
retry_wait_min: 100ms
|
||||||
|
retry_wait_max: 500ms
|
||||||
keyManager:
|
keyManager:
|
||||||
id: secretskeymanager
|
id: secretskeymanager
|
||||||
config:
|
config:
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ type PluginManager interface {
|
|||||||
Signer(ctx context.Context, cfg *plugin.Config) (definition.Signer, error)
|
Signer(ctx context.Context, cfg *plugin.Config) (definition.Signer, error)
|
||||||
Step(ctx context.Context, cfg *plugin.Config) (definition.Step, error)
|
Step(ctx context.Context, cfg *plugin.Config) (definition.Step, error)
|
||||||
Cache(ctx context.Context, cfg *plugin.Config) (definition.Cache, error)
|
Cache(ctx context.Context, cfg *plugin.Config) (definition.Cache, error)
|
||||||
|
Registry(ctx context.Context, cfg *plugin.Config) (definition.RegistryLookup, error)
|
||||||
KeyManager(ctx context.Context, cache definition.Cache, rLookup definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error)
|
KeyManager(ctx context.Context, cache definition.Cache, rLookup definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error)
|
||||||
SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error)
|
SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error)
|
||||||
}
|
}
|
||||||
@@ -39,6 +40,7 @@ type PluginCfg struct {
|
|||||||
Signer *plugin.Config `yaml:"signer,omitempty"`
|
Signer *plugin.Config `yaml:"signer,omitempty"`
|
||||||
Router *plugin.Config `yaml:"router,omitempty"`
|
Router *plugin.Config `yaml:"router,omitempty"`
|
||||||
Cache *plugin.Config `yaml:"cache,omitempty"`
|
Cache *plugin.Config `yaml:"cache,omitempty"`
|
||||||
|
Registry *plugin.Config `yaml:"registry,omitempty"`
|
||||||
KeyManager *plugin.Config `yaml:"keyManager,omitempty"`
|
KeyManager *plugin.Config `yaml:"keyManager,omitempty"`
|
||||||
Middleware []plugin.Config `yaml:"middleware,omitempty"`
|
Middleware []plugin.Config `yaml:"middleware,omitempty"`
|
||||||
Steps []plugin.Config
|
Steps []plugin.Config
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httputil"
|
"net/http/httputil"
|
||||||
|
|
||||||
"github.com/beckn-one/beckn-onix/core/module/client"
|
|
||||||
"github.com/beckn-one/beckn-onix/pkg/log"
|
"github.com/beckn-one/beckn-onix/pkg/log"
|
||||||
"github.com/beckn-one/beckn-onix/pkg/model"
|
"github.com/beckn-one/beckn-onix/pkg/model"
|
||||||
"github.com/beckn-one/beckn-onix/pkg/plugin"
|
"github.com/beckn-one/beckn-onix/pkg/plugin"
|
||||||
@@ -22,6 +21,7 @@ type stdHandler struct {
|
|||||||
steps []definition.Step
|
steps []definition.Step
|
||||||
signValidator definition.SignValidator
|
signValidator definition.SignValidator
|
||||||
cache definition.Cache
|
cache definition.Cache
|
||||||
|
registry definition.RegistryLookup
|
||||||
km definition.KeyManager
|
km definition.KeyManager
|
||||||
schemaValidator definition.SchemaValidator
|
schemaValidator definition.SchemaValidator
|
||||||
router definition.Router
|
router definition.Router
|
||||||
@@ -38,7 +38,7 @@ func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Ha
|
|||||||
role: cfg.Role,
|
role: cfg.Role,
|
||||||
}
|
}
|
||||||
// Initialize plugins.
|
// Initialize plugins.
|
||||||
if err := h.initPlugins(ctx, mgr, &cfg.Plugins, cfg.RegistryURL); err != nil {
|
if err := h.initPlugins(ctx, mgr, &cfg.Plugins); err != nil {
|
||||||
return nil, fmt.Errorf("failed to initialize plugins: %w", err)
|
return nil, fmt.Errorf("failed to initialize plugins: %w", err)
|
||||||
}
|
}
|
||||||
// Initialize steps.
|
// Initialize steps.
|
||||||
@@ -169,8 +169,8 @@ func loadPlugin[T any](ctx context.Context, name string, cfg *plugin.Config, mgr
|
|||||||
return plugin, nil
|
return plugin, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// loadKeyManager loads the KeyManager plugin using the provided PluginManager, cache, and registry URL.
|
// loadKeyManager loads the KeyManager plugin using the provided PluginManager, cache, and registry.
|
||||||
func loadKeyManager(ctx context.Context, mgr PluginManager, cache definition.Cache, cfg *plugin.Config, regURL string) (definition.KeyManager, error) {
|
func loadKeyManager(ctx context.Context, mgr PluginManager, cache definition.Cache, registry definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error) {
|
||||||
if cfg == nil {
|
if cfg == nil {
|
||||||
log.Debug(ctx, "Skipping KeyManager plugin: not configured")
|
log.Debug(ctx, "Skipping KeyManager plugin: not configured")
|
||||||
return nil, nil
|
return nil, nil
|
||||||
@@ -178,10 +178,12 @@ func loadKeyManager(ctx context.Context, mgr PluginManager, cache definition.Cac
|
|||||||
if cache == nil {
|
if cache == nil {
|
||||||
return nil, fmt.Errorf("failed to load KeyManager plugin (%s): Cache plugin not configured", cfg.ID)
|
return nil, fmt.Errorf("failed to load KeyManager plugin (%s): Cache plugin not configured", cfg.ID)
|
||||||
}
|
}
|
||||||
rClient := client.NewRegisteryClient(&client.Config{RegisteryURL: regURL})
|
if registry == nil {
|
||||||
km, err := mgr.KeyManager(ctx, cache, rClient, cfg)
|
return nil, fmt.Errorf("failed to load KeyManager plugin (%s): Registry plugin not configured", cfg.ID)
|
||||||
|
}
|
||||||
|
km, err := mgr.KeyManager(ctx, cache, registry, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to load cache plugin (%s): %w", cfg.ID, err)
|
return nil, fmt.Errorf("failed to load KeyManager plugin (%s): %w", cfg.ID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf(ctx, "Loaded Keymanager plugin: %s", cfg.ID)
|
log.Debugf(ctx, "Loaded Keymanager plugin: %s", cfg.ID)
|
||||||
@@ -189,12 +191,15 @@ func loadKeyManager(ctx context.Context, mgr PluginManager, cache definition.Cac
|
|||||||
}
|
}
|
||||||
|
|
||||||
// initPlugins initializes required plugins for the processor.
|
// initPlugins initializes required plugins for the processor.
|
||||||
func (h *stdHandler) initPlugins(ctx context.Context, mgr PluginManager, cfg *PluginCfg, regURL string) error {
|
func (h *stdHandler) initPlugins(ctx context.Context, mgr PluginManager, cfg *PluginCfg) error {
|
||||||
var err error
|
var err error
|
||||||
if h.cache, err = loadPlugin(ctx, "Cache", cfg.Cache, mgr.Cache); err != nil {
|
if h.cache, err = loadPlugin(ctx, "Cache", cfg.Cache, mgr.Cache); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if h.km, err = loadKeyManager(ctx, mgr, h.cache, cfg.KeyManager, regURL); err != nil {
|
if h.registry, err = loadPlugin(ctx, "Registry", cfg.Registry, mgr.Registry); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if h.km, err = loadKeyManager(ctx, mgr, h.cache, h.registry, cfg.KeyManager); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if h.signValidator, err = loadPlugin(ctx, "SignValidator", cfg.SignValidator, mgr.SignValidator); err != nil {
|
if h.signValidator, err = loadPlugin(ctx, "SignValidator", cfg.SignValidator, mgr.SignValidator); err != nil {
|
||||||
@@ -259,4 +264,4 @@ func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Conf
|
|||||||
}
|
}
|
||||||
log.Infof(ctx, "Processor steps initialized: %v", cfg.Steps)
|
log.Infof(ctx, "Processor steps initialized: %v", cfg.Steps)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ plugins=(
|
|||||||
"keymanager"
|
"keymanager"
|
||||||
"simplekeymanager"
|
"simplekeymanager"
|
||||||
"publisher"
|
"publisher"
|
||||||
|
"registry"
|
||||||
"reqpreprocessor"
|
"reqpreprocessor"
|
||||||
"router"
|
"router"
|
||||||
"schemavalidator"
|
"schemavalidator"
|
||||||
|
|||||||
@@ -7,5 +7,11 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type RegistryLookup interface {
|
type RegistryLookup interface {
|
||||||
|
// looks up Registry entry to obtain public keys to validate signature of the incoming message
|
||||||
Lookup(ctx context.Context, req *model.Subscription) ([]model.Subscription, error)
|
Lookup(ctx context.Context, req *model.Subscription) ([]model.Subscription, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegistryLookupProvider initializes a new registry lookup instance.
|
||||||
|
type RegistryLookupProvider interface {
|
||||||
|
New(context.Context, map[string]string) (RegistryLookup, func() error, error)
|
||||||
|
}
|
||||||
|
|||||||
83
pkg/plugin/implementation/registry/cmd/plugin.go
Normal file
83
pkg/plugin/implementation/registry/cmd/plugin.go
Normal file
@@ -0,0 +1,83 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/beckn-one/beckn-onix/pkg/log"
|
||||||
|
"github.com/beckn-one/beckn-onix/pkg/plugin/definition"
|
||||||
|
"github.com/beckn-one/beckn-onix/pkg/plugin/implementation/registry"
|
||||||
|
)
|
||||||
|
|
||||||
|
// registryProvider implements the RegistryLookupProvider interface for the registry plugin.
|
||||||
|
type registryProvider struct{}
|
||||||
|
|
||||||
|
// newRegistryFunc is a function type that creates a new Registry instance.
|
||||||
|
var newRegistryFunc = registry.New
|
||||||
|
|
||||||
|
// parseConfig parses the configuration map and returns a registry.Config with optional parameters.
|
||||||
|
func (r registryProvider) parseConfig(config map[string]string) (*registry.Config, error) {
|
||||||
|
registryConfig := ®istry.Config{
|
||||||
|
URL: config["url"],
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse retry_max
|
||||||
|
if retryMaxStr, exists := config["retry_max"]; exists && retryMaxStr != "" {
|
||||||
|
retryMax, err := strconv.Atoi(retryMaxStr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid retry_max value '%s': %w", retryMaxStr, err)
|
||||||
|
}
|
||||||
|
registryConfig.RetryMax = retryMax
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse retry_wait_min
|
||||||
|
if retryWaitMinStr, exists := config["retry_wait_min"]; exists && retryWaitMinStr != "" {
|
||||||
|
retryWaitMin, err := time.ParseDuration(retryWaitMinStr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid retry_wait_min value '%s': %w", retryWaitMinStr, err)
|
||||||
|
}
|
||||||
|
registryConfig.RetryWaitMin = retryWaitMin
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse retry_wait_max
|
||||||
|
if retryWaitMaxStr, exists := config["retry_wait_max"]; exists && retryWaitMaxStr != "" {
|
||||||
|
retryWaitMax, err := time.ParseDuration(retryWaitMaxStr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid retry_wait_max value '%s': %w", retryWaitMaxStr, err)
|
||||||
|
}
|
||||||
|
registryConfig.RetryWaitMax = retryWaitMax
|
||||||
|
}
|
||||||
|
|
||||||
|
return registryConfig, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a new registry plugin instance.
|
||||||
|
func (r registryProvider) New(ctx context.Context, config map[string]string) (definition.RegistryLookup, func() error, error) {
|
||||||
|
if ctx == nil {
|
||||||
|
return nil, nil, errors.New("context cannot be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse configuration from map using the dedicated method
|
||||||
|
registryConfig, err := r.parseConfig(config)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf(ctx, err, "Failed to parse registry configuration")
|
||||||
|
return nil, nil, fmt.Errorf("failed to parse registry configuration: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf(ctx, "Registry config mapped: %+v", registryConfig)
|
||||||
|
|
||||||
|
registryClient, closer, err := newRegistryFunc(ctx, registryConfig)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf(ctx, err, "Failed to create registry instance")
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof(ctx, "Registry instance created successfully")
|
||||||
|
return registryClient, closer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Provider is the exported plugin instance
|
||||||
|
var Provider = registryProvider{}
|
||||||
189
pkg/plugin/implementation/registry/cmd/plugin_test.go
Normal file
189
pkg/plugin/implementation/registry/cmd/plugin_test.go
Normal file
@@ -0,0 +1,189 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/beckn-one/beckn-onix/pkg/plugin/implementation/registry"
|
||||||
|
)
|
||||||
|
|
||||||
|
// mockRegistryClient is a mock implementation of the RegistryLookup interface
|
||||||
|
// for testing purposes.
|
||||||
|
type mockRegistryClient struct{}
|
||||||
|
|
||||||
|
func (m *mockRegistryClient) Subscribe(ctx context.Context, subscription interface{}) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (m *mockRegistryClient) Lookup(ctx context.Context, subscription interface{}) ([]interface{}, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestRegistryProvider_ParseConfig tests the configuration parsing logic.
|
||||||
|
func TestRegistryProvider_ParseConfig(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
provider := registryProvider{}
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
config map[string]string
|
||||||
|
expected *registry.Config
|
||||||
|
expectedErr string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "should parse a full, valid config",
|
||||||
|
config: map[string]string{
|
||||||
|
"url": "http://test.com",
|
||||||
|
"retry_max": "5",
|
||||||
|
"retry_wait_min": "100ms",
|
||||||
|
"retry_wait_max": "2s",
|
||||||
|
},
|
||||||
|
expected: ®istry.Config{
|
||||||
|
URL: "http://test.com",
|
||||||
|
RetryMax: 5,
|
||||||
|
RetryWaitMin: 100 * time.Millisecond,
|
||||||
|
RetryWaitMax: 2 * time.Second,
|
||||||
|
},
|
||||||
|
expectedErr: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "should handle missing optional values",
|
||||||
|
config: map[string]string{
|
||||||
|
"url": "http://test.com",
|
||||||
|
},
|
||||||
|
expected: ®istry.Config{
|
||||||
|
URL: "http://test.com",
|
||||||
|
},
|
||||||
|
expectedErr: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "should return error for invalid retry_max",
|
||||||
|
config: map[string]string{
|
||||||
|
"url": "http://test.com",
|
||||||
|
"retry_max": "not-a-number",
|
||||||
|
},
|
||||||
|
expected: nil,
|
||||||
|
expectedErr: "invalid retry_max value 'not-a-number'",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "should return error for invalid retry_wait_min",
|
||||||
|
config: map[string]string{
|
||||||
|
"url": "http://test.com",
|
||||||
|
"retry_wait_min": "bad-duration",
|
||||||
|
},
|
||||||
|
expected: nil,
|
||||||
|
expectedErr: "invalid retry_wait_min value 'bad-duration'",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "should return error for invalid retry_wait_max",
|
||||||
|
config: map[string]string{
|
||||||
|
"url": "http://test.com",
|
||||||
|
"retry_wait_max": "30parsecs",
|
||||||
|
},
|
||||||
|
expected: nil,
|
||||||
|
expectedErr: "invalid retry_wait_max value '30parsecs'",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
parsedConfig, err := provider.parseConfig(tc.config)
|
||||||
|
|
||||||
|
if tc.expectedErr != "" {
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected an error containing '%s' but got none", tc.expectedErr)
|
||||||
|
}
|
||||||
|
if e, a := tc.expectedErr, err.Error(); !(a == e || (len(a) > len(e) && a[:len(e)] == e)) {
|
||||||
|
t.Errorf("expected error message to contain '%s', but got '%s'", e, a)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected no error, but got: %v", err)
|
||||||
|
}
|
||||||
|
if parsedConfig.URL != tc.expected.URL {
|
||||||
|
t.Errorf("expected URL '%s', got '%s'", tc.expected.URL, parsedConfig.URL)
|
||||||
|
}
|
||||||
|
if parsedConfig.RetryMax != tc.expected.RetryMax {
|
||||||
|
t.Errorf("expected RetryMax %d, got %d", tc.expected.RetryMax, parsedConfig.RetryMax)
|
||||||
|
}
|
||||||
|
if parsedConfig.RetryWaitMin != tc.expected.RetryWaitMin {
|
||||||
|
t.Errorf("expected RetryWaitMin %v, got %v", tc.expected.RetryWaitMin, parsedConfig.RetryWaitMin)
|
||||||
|
}
|
||||||
|
if parsedConfig.RetryWaitMax != tc.expected.RetryWaitMax {
|
||||||
|
t.Errorf("expected RetryWaitMax %v, got %v", tc.expected.RetryWaitMax, parsedConfig.RetryWaitMax)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestRegistryProvider_New tests the plugin's main constructor.
|
||||||
|
func TestRegistryProvider_New(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
provider := registryProvider{}
|
||||||
|
originalNewRegistryFunc := newRegistryFunc
|
||||||
|
|
||||||
|
// Cleanup to restore the original function after the test
|
||||||
|
t.Cleanup(func() {
|
||||||
|
newRegistryFunc = originalNewRegistryFunc
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("should return error if context is nil", func(t *testing.T) {
|
||||||
|
_, _, err := provider.New(nil, map[string]string{})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected an error for nil context but got none")
|
||||||
|
}
|
||||||
|
if err.Error() != "context cannot be nil" {
|
||||||
|
t.Errorf("expected 'context cannot be nil' error, got '%s'", err.Error())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("should return error if config parsing fails", func(t *testing.T) {
|
||||||
|
config := map[string]string{"retry_max": "invalid"}
|
||||||
|
_, _, err := provider.New(context.Background(), config)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected an error for bad config but got none")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("should return error if registry.New fails", func(t *testing.T) {
|
||||||
|
// Mock the newRegistryFunc to return an error
|
||||||
|
expectedErr := errors.New("registry creation failed")
|
||||||
|
newRegistryFunc = func(ctx context.Context, cfg *registry.Config) (*registry.RegistryClient, func() error, error) {
|
||||||
|
return nil, nil, expectedErr
|
||||||
|
}
|
||||||
|
|
||||||
|
config := map[string]string{"url": "http://test.com"}
|
||||||
|
_, _, err := provider.New(context.Background(), config)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected an error from registry.New but got none")
|
||||||
|
}
|
||||||
|
if !errors.Is(err, expectedErr) {
|
||||||
|
t.Errorf("expected error '%v', got '%v'", expectedErr, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("should succeed and return a valid instance", func(t *testing.T) {
|
||||||
|
// Mock the newRegistryFunc for a successful case
|
||||||
|
mockCloser := func() error { fmt.Println("closed"); return nil }
|
||||||
|
newRegistryFunc = func(ctx context.Context, cfg *registry.Config) (*registry.RegistryClient, func() error, error) {
|
||||||
|
// Return a non-nil client of th correct concrete type
|
||||||
|
return new(registry.RegistryClient), mockCloser, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
config := map[string]string{"url": "http://test.com"}
|
||||||
|
instance, closer, err := provider.New(context.Background(), config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected no error, but got: %v", err)
|
||||||
|
}
|
||||||
|
if instance == nil {
|
||||||
|
t.Fatal("expected a non-nil instance")
|
||||||
|
}
|
||||||
|
if closer == nil {
|
||||||
|
t.Fatal("expected a non-nil closer function")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
156
pkg/plugin/implementation/registry/registry.go
Normal file
156
pkg/plugin/implementation/registry/registry.go
Normal file
@@ -0,0 +1,156 @@
|
|||||||
|
package registry
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/beckn-one/beckn-onix/pkg/log"
|
||||||
|
"github.com/beckn-one/beckn-onix/pkg/model"
|
||||||
|
"github.com/hashicorp/go-retryablehttp"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Config holds configuration parameters for the registry client.
|
||||||
|
type Config struct {
|
||||||
|
URL string `yaml:"url" json:"url"`
|
||||||
|
RetryMax int `yaml:"retry_max" json:"retry_max"`
|
||||||
|
RetryWaitMin time.Duration `yaml:"retry_wait_min" json:"retry_wait_min"`
|
||||||
|
RetryWaitMax time.Duration `yaml:"retry_wait_max" json:"retry_wait_max"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegistryClient encapsulates the logic for calling the registry endpoints.
|
||||||
|
type RegistryClient struct {
|
||||||
|
config *Config
|
||||||
|
client *retryablehttp.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// validate checks if the provided registry configuration is valid.
|
||||||
|
func validate(cfg *Config) error {
|
||||||
|
if cfg == nil {
|
||||||
|
return fmt.Errorf("registry config cannot be nil")
|
||||||
|
}
|
||||||
|
if cfg.URL == "" {
|
||||||
|
return fmt.Errorf("registry URL cannot be empty")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a new instance of RegistryClient.
|
||||||
|
func New(ctx context.Context, cfg *Config) (*RegistryClient, func() error, error) {
|
||||||
|
log.Debugf(ctx, "Initializing Registry client with config: %+v", cfg)
|
||||||
|
|
||||||
|
if err := validate(cfg); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
rc := retryablehttp.NewClient()
|
||||||
|
|
||||||
|
// Configure retry settings if provided
|
||||||
|
if cfg.RetryMax > 0 {
|
||||||
|
rc.RetryMax = cfg.RetryMax
|
||||||
|
}
|
||||||
|
if cfg.RetryWaitMin > 0 {
|
||||||
|
rc.RetryWaitMin = cfg.RetryWaitMin
|
||||||
|
}
|
||||||
|
if cfg.RetryWaitMax > 0 {
|
||||||
|
rc.RetryWaitMax = cfg.RetryWaitMax
|
||||||
|
}
|
||||||
|
|
||||||
|
client := &RegistryClient{
|
||||||
|
config: cfg,
|
||||||
|
client: rc,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cleanup function
|
||||||
|
closer := func() error {
|
||||||
|
log.Debugf(ctx, "Cleaning up Registry client resources")
|
||||||
|
if client.client != nil {
|
||||||
|
client.client.HTTPClient.CloseIdleConnections()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof(ctx, "Registry client is created successfully")
|
||||||
|
return client, closer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe calls the /subscribe endpoint with retry.
|
||||||
|
func (c *RegistryClient) Subscribe(ctx context.Context, subscription *model.Subscription) error {
|
||||||
|
subscribeURL := fmt.Sprintf("%s/subscribe", c.config.URL)
|
||||||
|
|
||||||
|
jsonData, err := json.Marshal(subscription)
|
||||||
|
if err != nil {
|
||||||
|
return model.NewBadReqErr(fmt.Errorf("failed to marshal subscription data: %w", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := retryablehttp.NewRequest("POST", subscribeURL, bytes.NewBuffer(jsonData))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create request: %w", err)
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
|
||||||
|
log.Debugf(ctx, "Making subscribe request to: %s", subscribeURL)
|
||||||
|
resp, err := c.client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to send subscribe request with retry: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
log.Errorf(ctx, nil, "Subscribe request failed with status: %s, response: %s", resp.Status, string(body))
|
||||||
|
return fmt.Errorf("subscribe request failed with status: %s", resp.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf(ctx, "Subscribe request is initiated successfully")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lookup calls the /lookup endpoint with retry and returns a slice of Subscription.
|
||||||
|
func (c *RegistryClient) Lookup(ctx context.Context, subscription *model.Subscription) ([]model.Subscription, error) {
|
||||||
|
lookupURL := fmt.Sprintf("%s/lookup", c.config.URL)
|
||||||
|
|
||||||
|
jsonData, err := json.Marshal(subscription)
|
||||||
|
if err != nil {
|
||||||
|
return nil, model.NewBadReqErr(fmt.Errorf("failed to marshal subscription data: %w", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := retryablehttp.NewRequest("POST", lookupURL, bytes.NewBuffer(jsonData))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
|
||||||
|
log.Debugf(ctx, "Making lookup request to: %s", lookupURL)
|
||||||
|
resp, err := c.client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to send lookup request with retry: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
log.Errorf(ctx, nil, "Lookup request failed with status: %s, response: %s", resp.Status, string(body))
|
||||||
|
return nil, fmt.Errorf("lookup request failed with status: %s", resp.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to read response body: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var results []model.Subscription
|
||||||
|
err = json.Unmarshal(body, &results)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf(ctx, "Lookup request successful, found %d subscriptions", len(results))
|
||||||
|
return results, nil
|
||||||
|
}
|
||||||
204
pkg/plugin/implementation/registry/registry_test.go
Normal file
204
pkg/plugin/implementation/registry/registry_test.go
Normal file
@@ -0,0 +1,204 @@
|
|||||||
|
package registry
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/beckn-one/beckn-onix/pkg/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestValidate ensures the config validation logic works correctly.
|
||||||
|
func TestValidate(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
config *Config
|
||||||
|
expectedErr string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "should return error for nil config",
|
||||||
|
config: nil,
|
||||||
|
expectedErr: "registry config cannot be nil",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "should return error for empty URL",
|
||||||
|
config: &Config{URL: ""},
|
||||||
|
expectedErr: "registry URL cannot be empty",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "should succeed for valid config",
|
||||||
|
config: &Config{URL: "http://localhost:8080"},
|
||||||
|
expectedErr: "",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
err := validate(tc.config)
|
||||||
|
if tc.expectedErr != "" {
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected an error but got none")
|
||||||
|
}
|
||||||
|
if err.Error() != tc.expectedErr {
|
||||||
|
t.Errorf("expected error message '%s', but got '%s'", tc.expectedErr, err.Error())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected no error, but got: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNew tests the constructor for the RegistryClient.
|
||||||
|
func TestNew(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
t.Run("should fail with invalid config", func(t *testing.T) {
|
||||||
|
_, _, err := New(context.Background(), &Config{URL: ""})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected an error for invalid config but got none")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("should succeed with valid config and set defaults", func(t *testing.T) {
|
||||||
|
cfg := &Config{URL: "http://test.com"}
|
||||||
|
client, closer, err := New(context.Background(), cfg)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected no error, but got: %v", err)
|
||||||
|
}
|
||||||
|
if client == nil {
|
||||||
|
t.Fatal("expected client to be non-nil")
|
||||||
|
}
|
||||||
|
if closer == nil {
|
||||||
|
t.Fatal("expected closer to be non-nil")
|
||||||
|
}
|
||||||
|
// Check if default retry settings are applied (go-retryablehttp defaults)
|
||||||
|
if client.client.RetryMax != 4 {
|
||||||
|
t.Errorf("expected default RetryMax of 4, but got %d", client.client.RetryMax)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("should apply custom retry settings", func(t *testing.T) {
|
||||||
|
cfg := &Config{
|
||||||
|
URL: "http://test.com",
|
||||||
|
RetryMax: 10,
|
||||||
|
RetryWaitMin: 100 * time.Millisecond,
|
||||||
|
RetryWaitMax: 1 * time.Second,
|
||||||
|
}
|
||||||
|
client, _, err := New(context.Background(), cfg)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected no error, but got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if client.client.RetryMax != cfg.RetryMax {
|
||||||
|
t.Errorf("expected RetryMax to be %d, but got %d", cfg.RetryMax, client.client.RetryMax)
|
||||||
|
}
|
||||||
|
if client.client.RetryWaitMin != cfg.RetryWaitMin {
|
||||||
|
t.Errorf("expected RetryWaitMin to be %v, but got %v", cfg.RetryWaitMin, client.client.RetryWaitMin)
|
||||||
|
}
|
||||||
|
if client.client.RetryWaitMax != cfg.RetryWaitMax {
|
||||||
|
t.Errorf("expected RetryWaitMax to be %v, but got %v", cfg.RetryWaitMax, client.client.RetryWaitMax)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestRegistryClient_Lookup tests the Lookup method.
|
||||||
|
func TestRegistryClient_Lookup(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
t.Run("should succeed and unmarshal response", func(t *testing.T) {
|
||||||
|
expectedSubs := []model.Subscription{
|
||||||
|
{
|
||||||
|
KeyID: "test-key",
|
||||||
|
SigningPublicKey: "test-signing-key",
|
||||||
|
EncrPublicKey: "test-encryption-key",
|
||||||
|
ValidFrom: time.Now(),
|
||||||
|
ValidUntil: time.Now().Add(24 * time.Hour),
|
||||||
|
Status: "SUBSCRIBED",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
KeyID: "test-key-2",
|
||||||
|
SigningPublicKey: "test-signing-key-2",
|
||||||
|
EncrPublicKey: "test-encryption-key-2",
|
||||||
|
ValidFrom: time.Now(),
|
||||||
|
ValidUntil: time.Now().Add(48 * time.Hour),
|
||||||
|
Status: "SUBSCRIBED",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.URL.Path != "/lookup" {
|
||||||
|
t.Errorf("expected path '/lookup', got '%s'", r.URL.Path)
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
if err := json.NewEncoder(w).Encode(expectedSubs); err != nil {
|
||||||
|
t.Fatalf("failed to write response: %v", err)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client, closer, err := New(context.Background(), &Config{URL: server.URL})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create client: %v", err)
|
||||||
|
}
|
||||||
|
defer closer()
|
||||||
|
|
||||||
|
results, err := client.Lookup(context.Background(), &model.Subscription{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("lookup failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(results) != len(expectedSubs) {
|
||||||
|
t.Fatalf("expected %d results, but got %d", len(expectedSubs), len(results))
|
||||||
|
}
|
||||||
|
|
||||||
|
if results[0].SubscriberID != expectedSubs[0].SubscriberID {
|
||||||
|
t.Errorf("expected subscriber ID '%s', got '%s'", expectedSubs[0].SubscriberID, results[0].SubscriberID)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("should fail on non-200 status", func(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client, closer, err := New(context.Background(), &Config{URL: server.URL})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create client: %v", err)
|
||||||
|
}
|
||||||
|
defer closer()
|
||||||
|
|
||||||
|
_, err = client.Lookup(context.Background(), &model.Subscription{})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected an error but got none")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("should fail on bad JSON response", func(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
fmt.Fprint(w, `[{"subscriber_id": "bad-json"`) // Malformed JSON
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client, closer, err := New(context.Background(), &Config{URL: server.URL, RetryMax: 1})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create client: %v", err)
|
||||||
|
}
|
||||||
|
defer closer()
|
||||||
|
|
||||||
|
_, err = client.Lookup(context.Background(), &model.Subscription{})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected an unmarshaling error but got none")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -361,6 +361,27 @@ func (m *Manager) SimpleKeyManager(ctx context.Context, cache definition.Cache,
|
|||||||
return km, nil
|
return km, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Registry returns a RegistryLookup instance based on the provided configuration.
|
||||||
|
// It registers a cleanup function for resource management.
|
||||||
|
func (m *Manager) Registry(ctx context.Context, cfg *Config) (definition.RegistryLookup, error) {
|
||||||
|
rp, err := provider[definition.RegistryLookupProvider](m.plugins, cfg.ID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err)
|
||||||
|
}
|
||||||
|
registry, closer, err := rp.New(ctx, cfg.Config)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if closer != nil {
|
||||||
|
m.closers = append(m.closers, func() {
|
||||||
|
if err := closer(); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return registry, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Validator implements handler.PluginManager.
|
// Validator implements handler.PluginManager.
|
||||||
func (m *Manager) Validator(ctx context.Context, cfg *Config) (definition.SchemaValidator, error) {
|
func (m *Manager) Validator(ctx context.Context, cfg *Config) (definition.SchemaValidator, error) {
|
||||||
panic("unimplemented")
|
panic("unimplemented")
|
||||||
|
|||||||
Reference in New Issue
Block a user