diff --git a/core/module/handler/config.go b/core/module/handler/config.go index b35eb15..2de4476 100644 --- a/core/module/handler/config.go +++ b/core/module/handler/config.go @@ -19,6 +19,7 @@ type PluginManager interface { Signer(ctx context.Context, cfg *plugin.Config) (definition.Signer, error) Step(ctx context.Context, cfg *plugin.Config) (definition.Step, error) Cache(ctx context.Context, cfg *plugin.Config) (definition.Cache, error) + Registry(ctx context.Context, cfg *plugin.Config) (definition.RegistryLookup, error) KeyManager(ctx context.Context, cache definition.Cache, rLookup definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error) SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error) } @@ -39,6 +40,7 @@ type PluginCfg struct { Signer *plugin.Config `yaml:"signer,omitempty"` Router *plugin.Config `yaml:"router,omitempty"` Cache *plugin.Config `yaml:"cache,omitempty"` + Registry *plugin.Config `yaml:"registry,omitempty"` KeyManager *plugin.Config `yaml:"keyManager,omitempty"` Middleware []plugin.Config `yaml:"middleware,omitempty"` Steps []plugin.Config diff --git a/core/module/handler/stdHandler.go b/core/module/handler/stdHandler.go index 8d472ee..c674abe 100644 --- a/core/module/handler/stdHandler.go +++ b/core/module/handler/stdHandler.go @@ -8,7 +8,6 @@ import ( "net/http" "net/http/httputil" - "github.com/beckn-one/beckn-onix/core/module/client" "github.com/beckn-one/beckn-onix/pkg/log" "github.com/beckn-one/beckn-onix/pkg/model" "github.com/beckn-one/beckn-onix/pkg/plugin" @@ -22,6 +21,7 @@ type stdHandler struct { steps []definition.Step signValidator definition.SignValidator cache definition.Cache + registry definition.RegistryLookup km definition.KeyManager schemaValidator definition.SchemaValidator router definition.Router @@ -38,7 +38,7 @@ func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Ha role: cfg.Role, } // Initialize plugins. - if err := h.initPlugins(ctx, mgr, &cfg.Plugins, cfg.RegistryURL); err != nil { + if err := h.initPlugins(ctx, mgr, &cfg.Plugins); err != nil { return nil, fmt.Errorf("failed to initialize plugins: %w", err) } // Initialize steps. @@ -169,8 +169,8 @@ func loadPlugin[T any](ctx context.Context, name string, cfg *plugin.Config, mgr return plugin, nil } -// loadKeyManager loads the KeyManager plugin using the provided PluginManager, cache, and registry URL. -func loadKeyManager(ctx context.Context, mgr PluginManager, cache definition.Cache, cfg *plugin.Config, regURL string) (definition.KeyManager, error) { +// loadKeyManager loads the KeyManager plugin using the provided PluginManager, cache, and registry. +func loadKeyManager(ctx context.Context, mgr PluginManager, cache definition.Cache, registry definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error) { if cfg == nil { log.Debug(ctx, "Skipping KeyManager plugin: not configured") return nil, nil @@ -178,10 +178,12 @@ func loadKeyManager(ctx context.Context, mgr PluginManager, cache definition.Cac if cache == nil { return nil, fmt.Errorf("failed to load KeyManager plugin (%s): Cache plugin not configured", cfg.ID) } - rClient := client.NewRegisteryClient(&client.Config{RegisteryURL: regURL}) - km, err := mgr.KeyManager(ctx, cache, rClient, cfg) + if registry == nil { + return nil, fmt.Errorf("failed to load KeyManager plugin (%s): Registry plugin not configured", cfg.ID) + } + km, err := mgr.KeyManager(ctx, cache, registry, cfg) if err != nil { - return nil, fmt.Errorf("failed to load cache plugin (%s): %w", cfg.ID, err) + return nil, fmt.Errorf("failed to load KeyManager plugin (%s): %w", cfg.ID, err) } log.Debugf(ctx, "Loaded Keymanager plugin: %s", cfg.ID) @@ -189,12 +191,15 @@ func loadKeyManager(ctx context.Context, mgr PluginManager, cache definition.Cac } // initPlugins initializes required plugins for the processor. -func (h *stdHandler) initPlugins(ctx context.Context, mgr PluginManager, cfg *PluginCfg, regURL string) error { +func (h *stdHandler) initPlugins(ctx context.Context, mgr PluginManager, cfg *PluginCfg) error { var err error if h.cache, err = loadPlugin(ctx, "Cache", cfg.Cache, mgr.Cache); err != nil { return err } - if h.km, err = loadKeyManager(ctx, mgr, h.cache, cfg.KeyManager, regURL); err != nil { + if h.registry, err = loadPlugin(ctx, "Registry", cfg.Registry, mgr.Registry); err != nil { + return err + } + if h.km, err = loadKeyManager(ctx, mgr, h.cache, h.registry, cfg.KeyManager); err != nil { return err } if h.signValidator, err = loadPlugin(ctx, "SignValidator", cfg.SignValidator, mgr.SignValidator); err != nil { @@ -259,4 +264,4 @@ func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Conf } log.Infof(ctx, "Processor steps initialized: %v", cfg.Steps) return nil -} \ No newline at end of file +} diff --git a/install/build-plugins.sh b/install/build-plugins.sh index f998e0e..d41b47f 100755 --- a/install/build-plugins.sh +++ b/install/build-plugins.sh @@ -13,6 +13,7 @@ plugins=( "keymanager" "simplekeymanager" "publisher" + "registry" "reqpreprocessor" "router" "schemavalidator" diff --git a/pkg/plugin/definition/registry.go b/pkg/plugin/definition/registry.go index 8684a6a..36f70d7 100644 --- a/pkg/plugin/definition/registry.go +++ b/pkg/plugin/definition/registry.go @@ -8,4 +8,10 @@ import ( type RegistryLookup interface { Lookup(ctx context.Context, req *model.Subscription) ([]model.Subscription, error) + Subscribe(ctx context.Context, subscription *model.Subscription) error +} + +// RegistryLookupProvider initializes a new registry lookup instance. +type RegistryLookupProvider interface { + New(context.Context, map[string]string) (RegistryLookup, func() error, error) } diff --git a/pkg/plugin/manager.go b/pkg/plugin/manager.go index 49140dc..d2aba88 100644 --- a/pkg/plugin/manager.go +++ b/pkg/plugin/manager.go @@ -361,6 +361,27 @@ func (m *Manager) SimpleKeyManager(ctx context.Context, cache definition.Cache, return km, nil } +// Registry returns a RegistryLookup instance based on the provided configuration. +// It registers a cleanup function for resource management. +func (m *Manager) Registry(ctx context.Context, cfg *Config) (definition.RegistryLookup, error) { + rp, err := provider[definition.RegistryLookupProvider](m.plugins, cfg.ID) + if err != nil { + return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err) + } + registry, closer, err := rp.New(ctx, cfg.Config) + if err != nil { + return nil, err + } + if closer != nil { + m.closers = append(m.closers, func() { + if err := closer(); err != nil { + panic(err) + } + }) + } + return registry, nil +} + // Validator implements handler.PluginManager. func (m *Manager) Validator(ctx context.Context, cfg *Config) (definition.SchemaValidator, error) { panic("unimplemented")