Issue 511 - Initial commit part 2
This commit is contained in:
@@ -19,6 +19,7 @@ type PluginManager interface {
|
||||
Signer(ctx context.Context, cfg *plugin.Config) (definition.Signer, error)
|
||||
Step(ctx context.Context, cfg *plugin.Config) (definition.Step, error)
|
||||
Cache(ctx context.Context, cfg *plugin.Config) (definition.Cache, error)
|
||||
Registry(ctx context.Context, cfg *plugin.Config) (definition.RegistryLookup, error)
|
||||
KeyManager(ctx context.Context, cache definition.Cache, rLookup definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error)
|
||||
SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error)
|
||||
}
|
||||
@@ -39,6 +40,7 @@ type PluginCfg struct {
|
||||
Signer *plugin.Config `yaml:"signer,omitempty"`
|
||||
Router *plugin.Config `yaml:"router,omitempty"`
|
||||
Cache *plugin.Config `yaml:"cache,omitempty"`
|
||||
Registry *plugin.Config `yaml:"registry,omitempty"`
|
||||
KeyManager *plugin.Config `yaml:"keyManager,omitempty"`
|
||||
Middleware []plugin.Config `yaml:"middleware,omitempty"`
|
||||
Steps []plugin.Config
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/core/module/client"
|
||||
"github.com/beckn-one/beckn-onix/pkg/log"
|
||||
"github.com/beckn-one/beckn-onix/pkg/model"
|
||||
"github.com/beckn-one/beckn-onix/pkg/plugin"
|
||||
@@ -22,6 +21,7 @@ type stdHandler struct {
|
||||
steps []definition.Step
|
||||
signValidator definition.SignValidator
|
||||
cache definition.Cache
|
||||
registry definition.RegistryLookup
|
||||
km definition.KeyManager
|
||||
schemaValidator definition.SchemaValidator
|
||||
router definition.Router
|
||||
@@ -38,7 +38,7 @@ func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Ha
|
||||
role: cfg.Role,
|
||||
}
|
||||
// Initialize plugins.
|
||||
if err := h.initPlugins(ctx, mgr, &cfg.Plugins, cfg.RegistryURL); err != nil {
|
||||
if err := h.initPlugins(ctx, mgr, &cfg.Plugins); err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize plugins: %w", err)
|
||||
}
|
||||
// Initialize steps.
|
||||
@@ -169,8 +169,8 @@ func loadPlugin[T any](ctx context.Context, name string, cfg *plugin.Config, mgr
|
||||
return plugin, nil
|
||||
}
|
||||
|
||||
// loadKeyManager loads the KeyManager plugin using the provided PluginManager, cache, and registry URL.
|
||||
func loadKeyManager(ctx context.Context, mgr PluginManager, cache definition.Cache, cfg *plugin.Config, regURL string) (definition.KeyManager, error) {
|
||||
// loadKeyManager loads the KeyManager plugin using the provided PluginManager, cache, and registry.
|
||||
func loadKeyManager(ctx context.Context, mgr PluginManager, cache definition.Cache, registry definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error) {
|
||||
if cfg == nil {
|
||||
log.Debug(ctx, "Skipping KeyManager plugin: not configured")
|
||||
return nil, nil
|
||||
@@ -178,10 +178,12 @@ func loadKeyManager(ctx context.Context, mgr PluginManager, cache definition.Cac
|
||||
if cache == nil {
|
||||
return nil, fmt.Errorf("failed to load KeyManager plugin (%s): Cache plugin not configured", cfg.ID)
|
||||
}
|
||||
rClient := client.NewRegisteryClient(&client.Config{RegisteryURL: regURL})
|
||||
km, err := mgr.KeyManager(ctx, cache, rClient, cfg)
|
||||
if registry == nil {
|
||||
return nil, fmt.Errorf("failed to load KeyManager plugin (%s): Registry plugin not configured", cfg.ID)
|
||||
}
|
||||
km, err := mgr.KeyManager(ctx, cache, registry, cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load cache plugin (%s): %w", cfg.ID, err)
|
||||
return nil, fmt.Errorf("failed to load KeyManager plugin (%s): %w", cfg.ID, err)
|
||||
}
|
||||
|
||||
log.Debugf(ctx, "Loaded Keymanager plugin: %s", cfg.ID)
|
||||
@@ -189,12 +191,15 @@ func loadKeyManager(ctx context.Context, mgr PluginManager, cache definition.Cac
|
||||
}
|
||||
|
||||
// initPlugins initializes required plugins for the processor.
|
||||
func (h *stdHandler) initPlugins(ctx context.Context, mgr PluginManager, cfg *PluginCfg, regURL string) error {
|
||||
func (h *stdHandler) initPlugins(ctx context.Context, mgr PluginManager, cfg *PluginCfg) error {
|
||||
var err error
|
||||
if h.cache, err = loadPlugin(ctx, "Cache", cfg.Cache, mgr.Cache); err != nil {
|
||||
return err
|
||||
}
|
||||
if h.km, err = loadKeyManager(ctx, mgr, h.cache, cfg.KeyManager, regURL); err != nil {
|
||||
if h.registry, err = loadPlugin(ctx, "Registry", cfg.Registry, mgr.Registry); err != nil {
|
||||
return err
|
||||
}
|
||||
if h.km, err = loadKeyManager(ctx, mgr, h.cache, h.registry, cfg.KeyManager); err != nil {
|
||||
return err
|
||||
}
|
||||
if h.signValidator, err = loadPlugin(ctx, "SignValidator", cfg.SignValidator, mgr.SignValidator); err != nil {
|
||||
@@ -259,4 +264,4 @@ func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Conf
|
||||
}
|
||||
log.Infof(ctx, "Processor steps initialized: %v", cfg.Steps)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ plugins=(
|
||||
"keymanager"
|
||||
"simplekeymanager"
|
||||
"publisher"
|
||||
"registry"
|
||||
"reqpreprocessor"
|
||||
"router"
|
||||
"schemavalidator"
|
||||
|
||||
@@ -8,4 +8,10 @@ import (
|
||||
|
||||
type RegistryLookup interface {
|
||||
Lookup(ctx context.Context, req *model.Subscription) ([]model.Subscription, error)
|
||||
Subscribe(ctx context.Context, subscription *model.Subscription) error
|
||||
}
|
||||
|
||||
// RegistryLookupProvider initializes a new registry lookup instance.
|
||||
type RegistryLookupProvider interface {
|
||||
New(context.Context, map[string]string) (RegistryLookup, func() error, error)
|
||||
}
|
||||
|
||||
@@ -361,6 +361,27 @@ func (m *Manager) SimpleKeyManager(ctx context.Context, cache definition.Cache,
|
||||
return km, nil
|
||||
}
|
||||
|
||||
// Registry returns a RegistryLookup instance based on the provided configuration.
|
||||
// It registers a cleanup function for resource management.
|
||||
func (m *Manager) Registry(ctx context.Context, cfg *Config) (definition.RegistryLookup, error) {
|
||||
rp, err := provider[definition.RegistryLookupProvider](m.plugins, cfg.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err)
|
||||
}
|
||||
registry, closer, err := rp.New(ctx, cfg.Config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if closer != nil {
|
||||
m.closers = append(m.closers, func() {
|
||||
if err := closer(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
return registry, nil
|
||||
}
|
||||
|
||||
// Validator implements handler.PluginManager.
|
||||
func (m *Manager) Validator(ctx context.Context, cfg *Config) (definition.SchemaValidator, error) {
|
||||
panic("unimplemented")
|
||||
|
||||
Reference in New Issue
Block a user