diff --git a/pkg/plugin/implementation/registry/cmd/plugin.go b/pkg/plugin/implementation/registry/cmd/plugin.go new file mode 100644 index 0000000..d7a6cfc --- /dev/null +++ b/pkg/plugin/implementation/registry/cmd/plugin.go @@ -0,0 +1,60 @@ +package main + +import ( + "context" + "errors" + "strconv" + "time" + + "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/beckn-one/beckn-onix/pkg/plugin/definition" + "github.com/beckn-one/beckn-onix/pkg/plugin/implementation/registry" +) + +// registryProvider implements the RegistryLookupProvider interface for the registry plugin. +type registryProvider struct{} + +// New creates a new registry plugin instance. +func (r registryProvider) New(ctx context.Context, config map[string]string) (definition.RegistryLookup, func() error, error) { + if ctx == nil { + return nil, nil, errors.New("context cannot be nil") + } + + // Parse configuration from map + registryConfig := ®istry.Config{ + URL: config["url"], + } + + // Parse optional retry settings + if retryMaxStr, exists := config["retry_max"]; exists && retryMaxStr != "" { + if retryMax, err := strconv.Atoi(retryMaxStr); err == nil { + registryConfig.RetryMax = retryMax + } + } + + if retryWaitMinStr, exists := config["retry_wait_min"]; exists && retryWaitMinStr != "" { + if retryWaitMin, err := time.ParseDuration(retryWaitMinStr); err == nil { + registryConfig.RetryWaitMin = retryWaitMin + } + } + + if retryWaitMaxStr, exists := config["retry_wait_max"]; exists && retryWaitMaxStr != "" { + if retryWaitMax, err := time.ParseDuration(retryWaitMaxStr); err == nil { + registryConfig.RetryWaitMax = retryWaitMax + } + } + + log.Debugf(ctx, "Registry config mapped: %+v", registryConfig) + + registryClient, closer, err := registry.New(ctx, registryConfig) + if err != nil { + log.Errorf(ctx, err, "Failed to create registry instance") + return nil, nil, err + } + + log.Infof(ctx, "Registry instance created successfully") + return registryClient, closer, nil +} + +// Provider is the exported plugin instance +var Provider = registryProvider{} diff --git a/pkg/plugin/implementation/registry/registry.go b/pkg/plugin/implementation/registry/registry.go new file mode 100644 index 0000000..a76d9ae --- /dev/null +++ b/pkg/plugin/implementation/registry/registry.go @@ -0,0 +1,156 @@ +package registry + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/beckn-one/beckn-onix/pkg/model" + "github.com/hashicorp/go-retryablehttp" +) + +// Config holds configuration parameters for the registry client. +type Config struct { + URL string + RetryMax int + RetryWaitMin time.Duration + RetryWaitMax time.Duration +} + +// RegistryClient encapsulates the logic for calling the registry endpoints. +type RegistryClient struct { + config *Config + client *retryablehttp.Client +} + +// validate checks if the provided registry configuration is valid. +func validate(cfg *Config) error { + if cfg == nil { + return fmt.Errorf("registry config cannot be nil") + } + if cfg.URL == "" { + return fmt.Errorf("registry URL cannot be empty") + } + return nil +} + +// New creates a new instance of RegistryClient. +func New(ctx context.Context, cfg *Config) (*RegistryClient, func() error, error) { + log.Debugf(ctx, "Initializing Registry client with config: %+v", cfg) + + if err := validate(cfg); err != nil { + return nil, nil, err + } + + retryClient := retryablehttp.NewClient() + + // Configure retry settings if provided + if cfg.RetryMax > 0 { + retryClient.RetryMax = cfg.RetryMax + } + if cfg.RetryWaitMin > 0 { + retryClient.RetryWaitMin = cfg.RetryWaitMin + } + if cfg.RetryWaitMax > 0 { + retryClient.RetryWaitMax = cfg.RetryWaitMax + } + + client := &RegistryClient{ + config: cfg, + client: retryClient, + } + + // Cleanup function + closer := func() error { + log.Debugf(ctx, "Cleaning up Registry client resources") + if client.client != nil { + client.client.HTTPClient.CloseIdleConnections() + } + return nil + } + + log.Infof(ctx, "Registry client connection established successfully") + return client, closer, nil +} + +// Subscribe calls the /subscribe endpoint with retry. +func (c *RegistryClient) Subscribe(ctx context.Context, subscription *model.Subscription) error { + subscribeURL := fmt.Sprintf("%s/subscribe", c.config.URL) + + jsonData, err := json.Marshal(subscription) + if err != nil { + return model.NewBadReqErr(fmt.Errorf("failed to marshal subscription data: %w", err)) + } + + req, err := retryablehttp.NewRequest("POST", subscribeURL, bytes.NewBuffer(jsonData)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req = req.WithContext(ctx) + + log.Debugf(ctx, "Making subscribe request to: %s", subscribeURL) + resp, err := c.client.Do(req) + if err != nil { + return fmt.Errorf("failed to send subscribe request with retry: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + log.Errorf(ctx, nil, "Subscribe request failed with status: %s, response: %s", resp.Status, string(body)) + return fmt.Errorf("subscribe request failed with status: %s", resp.Status) + } + + log.Debugf(ctx, "Subscribe request successful") + return nil +} + +// Lookup calls the /lookup endpoint with retry and returns a slice of Subscription. +func (c *RegistryClient) Lookup(ctx context.Context, subscription *model.Subscription) ([]model.Subscription, error) { + lookupURL := fmt.Sprintf("%s/lookup", c.config.URL) + + jsonData, err := json.Marshal(subscription) + if err != nil { + return nil, model.NewBadReqErr(fmt.Errorf("failed to marshal subscription data: %w", err)) + } + + req, err := retryablehttp.NewRequest("POST", lookupURL, bytes.NewBuffer(jsonData)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req = req.WithContext(ctx) + + log.Debugf(ctx, "Making lookup request to: %s", lookupURL) + resp, err := c.client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send lookup request with retry: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + log.Errorf(ctx, nil, "Lookup request failed with status: %s, response: %s", resp.Status, string(body)) + return nil, fmt.Errorf("lookup request failed with status: %s", resp.Status) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + var results []model.Subscription + err = json.Unmarshal(body, &results) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal response body: %w", err) + } + + log.Debugf(ctx, "Lookup request successful, found %d subscriptions", len(results)) + return results, nil +}