diff --git a/Dockerfile.adapter b/Dockerfile.adapter new file mode 100644 index 0000000..a8eb006 --- /dev/null +++ b/Dockerfile.adapter @@ -0,0 +1,25 @@ +FROM golang:1.24-bullseye AS builder + +WORKDIR /app +COPY cmd/adapter ./cmd/adapter +COPY core/ ./core +COPY pkg/ ./pkg +COPY go.mod . +COPY go.sum . +RUN go mod download + +RUN go build -o server cmd/adapter/main.go + +# Create a minimal runtime image +FROM cgr.dev/chainguard/wolfi-base +# ✅ Alpine is removed; using minimal Debian +WORKDIR /app + +# Copy only the built binary and plugin +COPY --from=builder /app/server . + +# Expose port 8080 +EXPOSE 8080 + +# Run the Go server with the config flag from environment variable. +CMD ["sh", "-c", "./server --config=${CONFIG_FILE}"] \ No newline at end of file diff --git a/cmd/adapter/main.go b/cmd/adapter/main.go new file mode 100644 index 0000000..1c88c60 --- /dev/null +++ b/cmd/adapter/main.go @@ -0,0 +1,181 @@ +package main + +import ( + "context" + "flag" + "fmt" + "net" + "net/http" + "os" + "strings" + "sync" + "time" + + "gopkg.in/yaml.v2" + + "github.com/beckn/beckn-onix/core/module" + "github.com/beckn/beckn-onix/core/module/handler" + "github.com/beckn/beckn-onix/pkg/log" + "github.com/beckn/beckn-onix/pkg/plugin" +) + +// Config struct holds all configurations. +type Config struct { + AppName string `yaml:"appName"` + Log log.Config `yaml:"log"` + PluginManager *plugin.ManagerConfig `yaml:"pluginManager"` + Modules []module.Config `yaml:"modules"` + HTTP httpConfig `yaml:"http"` +} + +type httpConfig struct { + Port string `yaml:"port"` + Timeouts timeoutConfig `yaml:"timeout"` +} + +type timeoutConfig struct { + Read time.Duration `yaml:"read"` + Write time.Duration `yaml:"write"` + Idle time.Duration `yaml:"idle"` +} + +var configPath string +var runFunc = run + +func main() { + // Define and parse command-line flags. + flag.StringVar(&configPath, "config", "../../config/onix/adapter.yaml", "Path to the configuration file") + flag.Parse() + + // Use custom log for initial setup messages. + log.Infof(context.Background(), "Starting application with config: %s", configPath) + + // Run the application within a context. + if err := runFunc(context.Background(), configPath); err != nil { + log.Fatalf(context.Background(), err, "Application failed: %v", err) + } + log.Infof(context.Background(), "Application finished") +} + +// initConfig loads and validates the configuration. +func initConfig(ctx context.Context, path string) (*Config, error) { + // Open the configuration file. + file, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf("could not open config file: %w", err) + } + defer file.Close() + + // Decode the YAML configuration. + var cfg Config + if err := yaml.NewDecoder(file).Decode(&cfg); err != nil { + return nil, fmt.Errorf("could not decode config: %w", err) + } + log.Debugf(ctx, "Read config: %#v", cfg) + // Validate the configuration. + if err := validateConfig(&cfg); err != nil { + return nil, fmt.Errorf("invalid config: %w", err) + } + + return &cfg, nil +} + +// validateConfig validates the configuration. +func validateConfig(cfg *Config) error { + if strings.TrimSpace(cfg.AppName) == "" { + return fmt.Errorf("missing app name") + } + if strings.TrimSpace(cfg.HTTP.Port) == "" { + return fmt.Errorf("missing port") + } + return nil +} + +// newServer creates and initializes the HTTP server. +func newServer(ctx context.Context, mgr handler.PluginManager, cfg *Config) (http.Handler, error) { + mux := http.NewServeMux() + err := module.Register(ctx, cfg.Modules, mux, mgr) + if err != nil { + return nil, fmt.Errorf("failed to register modules: %w", err) + } + return mux, nil +} + +var newManagerFunc = plugin.NewManager +var newServerFunc = newServer + +// run encapsulates the application logic. +func run(ctx context.Context, configPath string) error { + closers := []func(){} + // Initialize configuration and logger. + cfg, err := initConfig(ctx, configPath) + if err != nil { + return fmt.Errorf("failed to initialize config: %w", err) + } + log.Infof(ctx, "Initializing logger with config: %+v", cfg.Log) + if err := log.InitLogger(cfg.Log); err != nil { + return fmt.Errorf("failed to initialize logger: %w", err) + } + + // Initialize plugin manager. + log.Infof(ctx, "Initializing plugin manager") + mgr, closer, err := newManagerFunc(ctx, cfg.PluginManager) + if err != nil { + return fmt.Errorf("failed to create plugin manager: %w", err) + } + closers = append(closers, closer) + log.Debug(ctx, "Plugin manager loaded.") + + // Initialize HTTP server. + log.Infof(ctx, "Initializing HTTP server") + srv, err := newServerFunc(ctx, mgr, cfg) + if err != nil { + return fmt.Errorf("failed to initialize server: %w", err) + } + + // Configure HTTP server. + httpServer := &http.Server{ + Addr: net.JoinHostPort("", cfg.HTTP.Port), + Handler: srv, + ReadTimeout: cfg.HTTP.Timeouts.Read * time.Second, + WriteTimeout: cfg.HTTP.Timeouts.Write * time.Second, + IdleTimeout: cfg.HTTP.Timeouts.Idle * time.Second, + } + + // Start HTTP server. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + log.Infof(ctx, "Server listening on %s", httpServer.Addr) + if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Errorf(ctx, fmt.Errorf("http server ListenAndServe: %w", err), "error listening and serving") + } + }() + + // Handle shutdown. + shutdown(ctx, httpServer, &wg, closers) + wg.Wait() + log.Infof(ctx, "Server shutdown complete") + return nil +} + +// shutdown handles server shutdown. +func shutdown(ctx context.Context, httpServer *http.Server, wg *sync.WaitGroup, closers []func()) { + wg.Add(1) + go func() { + defer wg.Done() + <-ctx.Done() + log.Infof(ctx, "Shutting down server...") + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := httpServer.Shutdown(shutdownCtx); err != nil { + log.Errorf(ctx, fmt.Errorf("http server Shutdown: %w", err), "error shutting down http server") + } + + // Call all closer functions. + for _, closer := range closers { + closer() + } + }() +} diff --git a/cmd/adapter/main_test.go b/cmd/adapter/main_test.go new file mode 100644 index 0000000..8bfd24e --- /dev/null +++ b/cmd/adapter/main_test.go @@ -0,0 +1,500 @@ +package main + +import ( + "context" + "errors" + "flag" + "net/http" + "os" + "strings" + "testing" + "time" + + "github.com/beckn/beckn-onix/core/module" + "github.com/beckn/beckn-onix/core/module/handler" + "github.com/beckn/beckn-onix/pkg/plugin" + "github.com/beckn/beckn-onix/pkg/plugin/definition" + "github.com/stretchr/testify/mock" +) + +// MockPluginManager implements handler.PluginManager for testing. +type MockPluginManager struct { + mock.Mock +} + +// Middleware returns a middleware function based on the provided configuration. +func (m *MockPluginManager) Middleware(ctx context.Context, cfg *plugin.Config) (func(http.Handler) http.Handler, error) { + return nil, nil +} + +// SignValidator returns a mock implementation of the Verifier interface. +func (m *MockPluginManager) SignValidator(ctx context.Context, cfg *plugin.Config) (definition.SignValidator, error) { + return nil, nil +} + +// Validator returns a mock implementation of the SchemaValidator interface. +func (m *MockPluginManager) Validator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error) { + return nil, nil +} + +// Router returns a mock implementation of the Router interface. +func (m *MockPluginManager) Router(ctx context.Context, cfg *plugin.Config) (definition.Router, error) { + return nil, nil +} + +// Publisher returns a mock implementation of the Publisher interface. +func (m *MockPluginManager) Publisher(ctx context.Context, cfg *plugin.Config) (definition.Publisher, error) { + return nil, nil +} + +// Signer returns a mock implementation of the Signer interface. +func (m *MockPluginManager) Signer(ctx context.Context, cfg *plugin.Config) (definition.Signer, error) { + return nil, nil +} + +// Step returns a mock implementation of the Step interface. +func (m *MockPluginManager) Step(ctx context.Context, cfg *plugin.Config) (definition.Step, error) { + return nil, nil +} + +// Cache returns a mock implementation of the Cache interface. +func (m *MockPluginManager) Cache(ctx context.Context, cfg *plugin.Config) (definition.Cache, error) { + return nil, nil +} + +// KeyManager returns a mock implementation of the KeyManager interface. +func (m *MockPluginManager) KeyManager(ctx context.Context, cache definition.Cache, rLookup definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error) { + return nil, nil +} + +// SchemaValidator returns a mock implementation of the SchemaValidator interface. +func (m *MockPluginManager) SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error) { + return nil, nil +} + +// mockRun is a mock implementation of the `run` function, simulating a successful run. +func mockRun(ctx context.Context, configPath string) error { + return nil // Simulate a successful run +} + +// TestMainFunction tests the main function execution, including command-line argument parsing. +func TestMainFunction(t *testing.T) { + // Backup original run function and restore it after test + origRun := runFunc + defer func() { runFunc = origRun }() + runFunc = mockRun + + origArgs := os.Args + defer func() { os.Args = origArgs }() + + // Set mock command-line arguments + os.Args = []string{"cmd", "-config=../../config/test-config.yaml"} + + fs := flag.NewFlagSet("test", flag.ExitOnError) + fs.StringVar(&configPath, "config", "../../config/clientSideHandler-config.yaml", "Path to the configuration file") + + if err := fs.Parse(os.Args[1:]); err != nil { + t.Fatalf("Failed to parse flags: %v", err) + } + main() +} + +// TestRunSuccess tests the successful execution of the run function with different configurations. +func TestRunSuccess(t *testing.T) { + tests := []struct { + name string + configData string + mockMgr func() (*plugin.Manager, func(), error) + mockLogger func(cfg *Config) error + mockServer func(ctx context.Context, mgr handler.PluginManager, cfg *Config) (http.Handler, error) + }{ + { + name: "Valid Config", + configData: "valid_config.yaml", + mockMgr: func() (*plugin.Manager, func(), error) { + return &plugin.Manager{}, func() {}, nil + }, + mockLogger: func(cfg *Config) error { + return nil + }, + mockServer: func(ctx context.Context, mgr handler.PluginManager, cfg *Config) (http.Handler, error) { + return http.NewServeMux(), nil + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + testFilePath := tt.configData + mockConfig := `appName: "testAdapter" +log: + level: debug + destinations: + - type: stdout + context_keys: + - transaction_id + - message_id +http: + port: 8080 + timeout: + read: 30 + write: 30 + idle: 30 +plugin: + root: "/mock/plugins" + pluginZipPath: "/mock/plugins/plugins_bundle.zip" + plugins: + - testPlugin1 + - testPlugin2 +modules: + - name: testModule + type: transaction + path: /testPath + targetType: msgQ + plugin: + schemaValidator: + id: testValidator + publisher: + id: testPublisher + config: + project: test-project + topic: test-topic + router: + id: testRouter + config: + routingConfigPath: "/mock/configs/testRouting-config.yaml"` + + err := os.WriteFile(testFilePath, []byte(mockConfig), 0644) + if err != nil { + t.Errorf("Failed to create test config file: %v", err) + } + defer os.Remove(testFilePath) + + // Mock dependencies + originalNewManager := newManagerFunc + newManagerFunc = func(ctx context.Context, cfg *plugin.ManagerConfig) (*plugin.Manager, func(), error) { + return tt.mockMgr() + } + defer func() { newManagerFunc = originalNewManager }() + + originalNewServer := newServerFunc + newServerFunc = func(ctx context.Context, mgr handler.PluginManager, cfg *Config) (http.Handler, error) { + return tt.mockServer(ctx, mgr, cfg) + } + defer func() { newServerFunc = originalNewServer }() + + // Run function + err = run(ctx, testFilePath) + if err != nil { + t.Errorf("Expected no error, but got: %v", err) + } + }) + } +} + +// TestRunFailure validates failure scenarios for the run function. +func TestRunFailure(t *testing.T) { + tests := []struct { + name string + configData string + mockMgr func() (*MockPluginManager, func(), error) + mockLogger func(cfg *Config) error + mockServer func(ctx context.Context, mgr handler.PluginManager, cfg *Config) (http.Handler, error) + expectedErr string + }{ + { + name: "Invalid Config File", + configData: "invalid_config.yaml", + mockMgr: func() (*MockPluginManager, func(), error) { + return &MockPluginManager{}, func() {}, nil + }, + mockLogger: func(cfg *Config) error { + return nil + }, + mockServer: func(ctx context.Context, mgr handler.PluginManager, cfg *Config) (http.Handler, error) { + return nil, errors.New("failed to start server") + }, + expectedErr: "failed to initialize config: invalid config: missing app name", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + testFilePath := tt.configData + mockConfig := `invalid: "config"` + err := os.WriteFile(testFilePath, []byte(mockConfig), 0644) + if err != nil { + t.Errorf("Failed to create test config file: %v", err) + } + defer os.Remove(testFilePath) + + // Mock dependencies + originalNewManager := newManagerFunc + // newManagerFunc = func(ctx context.Context, cfg *plugin.ManagerConfig) (*plugin.Manager, func(), error) { + // return tt.mockMgr() + // } + newManagerFunc = nil + defer func() { newManagerFunc = originalNewManager }() + + originalNewServer := newServerFunc + newServerFunc = func(ctx context.Context, mgr handler.PluginManager, cfg *Config) (http.Handler, error) { + return tt.mockServer(ctx, mgr, cfg) + } + defer func() { newServerFunc = originalNewServer }() + + // Run function + err = run(ctx, testFilePath) + if err == nil { + t.Errorf("Expected error, but got nil") + } else if err.Error() != tt.expectedErr { + t.Errorf("Expected error '%s', but got '%s'", tt.expectedErr, err.Error()) + } + }) + } +} + +// TestInitConfigSuccess tests the successful initialization of the config. +func TestInitConfigSuccess(t *testing.T) { + tests := []struct { + name string + configData string + }{ + { + name: "Valid Config", + configData: ` +appName: "TestApp" +http: + port: "8080" + timeout: + read: 5 + write: 5 + idle: 10 +`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + configPath := "test_config_success.yaml" + defer os.Remove(configPath) + + err := os.WriteFile(configPath, []byte(tt.configData), 0644) + if err != nil { + t.Errorf("Failed to create test config file: %v", err) + } + + _, err = initConfig(context.Background(), configPath) + if err != nil { + t.Errorf("Expected no error, but got: %v", err) + } + }) + } +} + +// TestInitConfigFailure tests failure scenarios for config initialization. +func TestInitConfigFailure(t *testing.T) { + tests := []struct { + name string + configData string + expectedErr string + }{ + { + name: "Invalid YAML Format", + configData: `appName: "TestApp"\nhttp: { invalid_yaml }`, + expectedErr: "could not decode config", + }, + { + name: "Missing Required Fields", + configData: `appName: ""\nhttp:\n timeout:\n read: 5\n`, + expectedErr: "could not decode config: yaml: did not find expected key", + }, + { + name: "Non-Existent File", + configData: "", + expectedErr: "could not open config file", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + configPath := "test_config_failure.yaml" + + if tt.configData != "" { + err := os.WriteFile(configPath, []byte(tt.configData), 0644) + if err != nil { + t.Errorf("Failed to create test config file: %v", err) + } + defer os.Remove(configPath) + } else { + // Ensure file does not exist for non-existent file test + os.Remove(configPath) + } + + _, err := initConfig(context.Background(), configPath) + if err == nil { + t.Errorf("Expected error but got nil") + } else if !strings.Contains(err.Error(), tt.expectedErr) { + t.Errorf("Expected error containing '%s', but got '%s'", tt.expectedErr, err.Error()) + } + }) + } +} + +// TestNewServerSuccess tests successful server creation. +func TestNewServerSuccess(t *testing.T) { + tests := []struct { + name string + modules []module.Config + }{ + { + name: "Successful server creation with no modules", + modules: []module.Config{}, // No modules to simplify the test + }, + } + + mockMgr := new(MockPluginManager) // Mocking PluginManager + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := &Config{ + Modules: tt.modules, + HTTP: httpConfig{ + Port: "8080", + Timeouts: timeoutConfig{ + Read: 5, + Write: 5, + Idle: 10, + }, + }, + } + + handler, err := newServer(context.Background(), mockMgr, cfg) + + if err != nil { + t.Errorf("Expected no error, but got: %v", err) + } + if handler == nil { + t.Errorf("Expected handler to be non-nil, but got nil") + } + }) + } +} + +// TestNewServerFailure tests failure scenarios when creating a server. +func TestNewServerFailure(t *testing.T) { + tests := []struct { + name string + modules []module.Config + }{ + { + name: "Module registration failure", + modules: []module.Config{ + { + Name: "InvalidModule", + Path: "/invalid", + }, + }, + }, + } + + mockMgr := new(MockPluginManager) // Mocking PluginManager + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := &Config{ + Modules: tt.modules, + HTTP: httpConfig{ + Port: "8080", + Timeouts: timeoutConfig{ + Read: 5, + Write: 5, + Idle: 10, + }, + }, + } + + handler, err := newServer(context.Background(), mockMgr, cfg) + + if err == nil { + t.Errorf("Expected an error, but got nil") + } + if handler != nil { + t.Errorf("Expected handler to be nil, but got a non-nil value") + } + }) + } +} + +// TestValidateConfigSuccess tests validation of a correct config. +func TestValidateConfigSuccess(t *testing.T) { + tests := []struct { + name string + cfg Config + }{ + { + name: "Valid Config", + cfg: Config{ + AppName: "TestApp", + HTTP: httpConfig{ + Port: "8080", + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateConfig(&tt.cfg) + if err != nil { + t.Errorf("Expected no error, but got: %v", err) + } + }) + } +} + +// TestValidateConfigFailure tests validation failures for incorrect config. +func TestValidateConfigFailure(t *testing.T) { + tests := []struct { + name string + cfg Config + expectedErr string + }{ + { + name: "Missing AppName", + cfg: Config{ + AppName: "", + HTTP: httpConfig{ + Port: "8080", + }, + }, + expectedErr: "missing app name", + }, + { + name: "Missing Port", + cfg: Config{ + AppName: "TestApp", + HTTP: httpConfig{ + Port: "", + }, + }, + expectedErr: "missing port", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateConfig(&tt.cfg) + if err == nil { + t.Errorf("Expected error '%s', but got nil", tt.expectedErr) + } else if err.Error() != tt.expectedErr { + t.Errorf("Expected error '%s', but got '%s'", tt.expectedErr, err.Error()) + } + }) + } +} diff --git a/core/module/client/registery.go b/core/module/client/registery.go new file mode 100644 index 0000000..3045ebb --- /dev/null +++ b/core/module/client/registery.go @@ -0,0 +1,101 @@ +package client + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/beckn/beckn-onix/pkg/model" + "github.com/hashicorp/go-retryablehttp" +) + +// Config struct to hold configuration parameters. +type Config struct { + RegisteryURL string + RetryMax int + RetryWaitMin time.Duration + RetryWaitMax time.Duration +} + +// registryClient encapsulates the logic for calling the subscribe and lookup endpoints. +type registryClient struct { + config *Config + client *retryablehttp.Client +} + +// NewRegisteryClient creates a new instance of Client. +func NewRegisteryClient(config *Config) *registryClient { + retryClient := retryablehttp.NewClient() + + return ®istryClient{config: config, client: retryClient} +} + +// Subscribe calls the /subscribe endpoint with retry. +func (c *registryClient) Subscribe(ctx context.Context, subscription *model.Subscription) error { + subscribeURL := fmt.Sprintf("%s/subscribe", c.config.RegisteryURL) + + jsonData, err := json.Marshal(subscription) + if err != nil { + return fmt.Errorf("failed to marshal subscription data: %w", err) + } + + req, err := retryablehttp.NewRequest("POST", subscribeURL, bytes.NewBuffer(jsonData)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := c.client.Do(req) + if err != nil { + return fmt.Errorf("failed to send request with retry: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("subscribe request failed with status: %s", resp.Status) + } + return nil +} + +// Lookup calls the /lookup endpoint with retry and returns a slice of Subscription. +func (c *registryClient) Lookup(ctx context.Context, subscription *model.Subscription) ([]model.Subscription, error) { + lookupURL := fmt.Sprintf("%s/lookUp", c.config.RegisteryURL) + + jsonData, err := json.Marshal(subscription) + if err != nil { + return nil, fmt.Errorf("failed to marshal subscription data: %w", err) + } + + req, err := retryablehttp.NewRequest("POST", lookupURL, bytes.NewBuffer(jsonData)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := c.client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send request with retry: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("lookup request failed with status: %s", resp.Status) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + var results []model.Subscription + err = json.Unmarshal(body, &results) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal response body: %w", err) + } + + return results, nil +} diff --git a/core/module/client/registry_test.go b/core/module/client/registry_test.go new file mode 100644 index 0000000..ac05c14 --- /dev/null +++ b/core/module/client/registry_test.go @@ -0,0 +1,231 @@ +package client + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/beckn/beckn-onix/pkg/model" + "github.com/stretchr/testify/require" +) + +// TestSubscribeSuccess verifies that the Subscribe function succeeds when the server responds with HTTP 200. +func TestSubscribeSuccess(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + if _, err := w.Write([]byte("{}")); err != nil { + t.Errorf("failed to write response: %v", err) + } + })) + defer server.Close() + + client := NewRegisteryClient(&Config{ + RegisteryURL: server.URL, + RetryMax: 3, + RetryWaitMin: time.Millisecond * 100, + RetryWaitMax: time.Millisecond * 500, + }) + + subscription := &model.Subscription{ + KeyID: "test-key", + SigningPublicKey: "test-signing-key", + EncrPublicKey: "test-encryption-key", + ValidFrom: time.Now(), + ValidUntil: time.Now().Add(24 * time.Hour), + Status: "SUBSCRIBED", + } + err := client.Subscribe(context.Background(), subscription) + if err != nil { + t.Fatalf("Subscribe() failed with error: %v", err) + } +} + +// TestSubscribeFailure tests different failure scenarios using a mock client. +func TestSubscribeFailure(t *testing.T) { + tests := []struct { + name string + mockError error + }{ + { + name: "Failed subscription - Internal Server Error", + mockError: errors.New("internal server error"), + }, + { + name: "Failed subscription - Bad Request", + mockError: errors.New("bad request"), + }, + { + name: "Request Timeout", + mockError: context.DeadlineExceeded, + }, + { + name: "Network Failure", + mockError: errors.New("network failure"), + }, + { + name: "JSON Marshalling Failure", + mockError: errors.New("json marshalling failure"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client := NewRegisteryClient(&Config{ + RetryMax: 1, + RetryWaitMin: 1 * time.Millisecond, + RetryWaitMax: 2 * time.Millisecond, + }) + + subscription := &model.Subscription{ + KeyID: "test-key", + SigningPublicKey: "test-signing-key", + EncrPublicKey: "test-encryption-key", + ValidFrom: time.Now(), + ValidUntil: time.Now().Add(24 * time.Hour), + Status: "SUBSCRIBED", + } + + if tt.name == "JSON Marshalling Failure" { + subscription = &model.Subscription{} // Example of an invalid object + } + + err := client.Subscribe(context.Background(), subscription) + require.Error(t, err) // Directly checking for an error since all cases should fail + }) + } +} + +// TestLookupSuccess tests successful lookup scenarios. +func TestLookupSuccess(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + response := []model.Subscription{ + { + Subscriber: model.Subscriber{ + SubscriberID: "123", + }, + KeyID: "test-key", + SigningPublicKey: "test-signing-key", + EncrPublicKey: "test-encryption-key", + ValidFrom: time.Now(), + ValidUntil: time.Now().Add(24 * time.Hour), + Status: "SUBSCRIBED", + }, + } + bodyBytes, _ := json.Marshal(response) + if _, err := w.Write(bodyBytes); err != nil { + t.Errorf("failed to write response: %v", err) + } + })) + defer server.Close() + + config := &Config{ + RegisteryURL: server.URL, + RetryMax: 1, + RetryWaitMin: 1 * time.Millisecond, + RetryWaitMax: 2 * time.Millisecond, + } + rClient := NewRegisteryClient(config) + ctx := context.Background() + subscription := &model.Subscription{ + Subscriber: model.Subscriber{ + SubscriberID: "123", + }, + KeyID: "test-key", + SigningPublicKey: "test-signing-key", + EncrPublicKey: "test-encryption-key", + ValidFrom: time.Now(), + ValidUntil: time.Now().Add(24 * time.Hour), + Status: "SUBSCRIBED", + } + + result, err := rClient.Lookup(ctx, subscription) + require.NoError(t, err) + require.NotEmpty(t, result) + require.Equal(t, subscription.Subscriber.SubscriberID, result[0].Subscriber.SubscriberID) +} + +// TestLookupFailure tests failure scenarios for the Lookup function. +func TestLookupFailure(t *testing.T) { + tests := []struct { + name string + responseBody interface{} + responseCode int + setupMock func(*httptest.Server) + }{ + { + name: "Lookup failure - non 200 status", + responseBody: "Internal Server Error", + responseCode: http.StatusInternalServerError, + }, + { + name: "Invalid JSON response", + responseBody: "Invalid JSON", + responseCode: http.StatusOK, + }, + { + name: "Server timeout", + setupMock: func(server *httptest.Server) { + server.Config.WriteTimeout = 1 * time.Millisecond // Force timeout + }, + }, + { + name: "Empty response body", + responseBody: "", + responseCode: http.StatusOK, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if tc.responseCode != 0 { // Prevent WriteHeader(0) error + w.WriteHeader(tc.responseCode) + } + if tc.responseBody != nil { + if str, ok := tc.responseBody.(string); ok { + if _, err := w.Write([]byte(str)); err != nil { + t.Errorf("failed to write response: %v", err) + } + } else { + bodyBytes, _ := json.Marshal(tc.responseBody) + if _, err := w.Write(bodyBytes); err != nil { + t.Errorf("failed to write response: %v", err) + } + } + } + })) + defer server.Close() + + if tc.setupMock != nil { + tc.setupMock(server) + } + + config := &Config{ + RegisteryURL: server.URL, + RetryMax: 0, + RetryWaitMin: 1 * time.Millisecond, + RetryWaitMax: 2 * time.Millisecond, + } + rClient := NewRegisteryClient(config) + ctx := context.Background() + subscription := &model.Subscription{ + Subscriber: model.Subscriber{}, + KeyID: "test-key", + SigningPublicKey: "test-signing-key", + EncrPublicKey: "test-encryption-key", + ValidFrom: time.Now(), + ValidUntil: time.Now().Add(24 * time.Hour), + Status: "SUBSCRIBED", + } + + result, err := rClient.Lookup(ctx, subscription) + require.Error(t, err) + require.Empty(t, result) + }) + } +} diff --git a/core/module/handler/config.go b/core/module/handler/config.go new file mode 100644 index 0000000..16a2c0c --- /dev/null +++ b/core/module/handler/config.go @@ -0,0 +1,55 @@ +package handler + +import ( + "context" + "net/http" + + "github.com/beckn/beckn-onix/pkg/model" + "github.com/beckn/beckn-onix/pkg/plugin" + "github.com/beckn/beckn-onix/pkg/plugin/definition" +) + +// PluginManager defines an interface for managing plugins dynamically. +type PluginManager interface { + Middleware(ctx context.Context, cfg *plugin.Config) (func(http.Handler) http.Handler, error) + SignValidator(ctx context.Context, cfg *plugin.Config) (definition.SignValidator, error) + Validator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error) + Router(ctx context.Context, cfg *plugin.Config) (definition.Router, error) + Publisher(ctx context.Context, cfg *plugin.Config) (definition.Publisher, error) + Signer(ctx context.Context, cfg *plugin.Config) (definition.Signer, error) + Step(ctx context.Context, cfg *plugin.Config) (definition.Step, error) + Cache(ctx context.Context, cfg *plugin.Config) (definition.Cache, error) + KeyManager(ctx context.Context, cache definition.Cache, rLookup definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error) + SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error) +} + +// Type defines different handler types for processing requests. +type Type string + +const ( + // HandlerTypeStd represents the standard handler type used for general request processing. + HandlerTypeStd Type = "std" +) + +// PluginCfg holds the configuration for various plugins. +type PluginCfg struct { + SchemaValidator *plugin.Config `yaml:"schemaValidator,omitempty"` + SignValidator *plugin.Config `yaml:"signValidator,omitempty"` + Publisher *plugin.Config `yaml:"publisher,omitempty"` + Signer *plugin.Config `yaml:"signer,omitempty"` + Router *plugin.Config `yaml:"router,omitempty"` + Cache *plugin.Config `yaml:"cache,omitempty"` + KeyManager *plugin.Config `yaml:"keyManager,omitempty"` + Middleware []plugin.Config `yaml:"middleware,omitempty"` + Steps []plugin.Config +} + +// Config holds the configuration for request processing handlers. +type Config struct { + Plugins PluginCfg `yaml:"plugins"` + Steps []string + Type Type + RegistryURL string `yaml:"registryUrl"` + Role model.Role + SubscriberID string `yaml:"subscriberId"` +} diff --git a/core/module/handler/stdHandler.go b/core/module/handler/stdHandler.go new file mode 100644 index 0000000..251102e --- /dev/null +++ b/core/module/handler/stdHandler.go @@ -0,0 +1,264 @@ +package handler + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/http/httputil" + "net/url" + + "github.com/beckn/beckn-onix/core/module/client" + "github.com/beckn/beckn-onix/pkg/log" + "github.com/beckn/beckn-onix/pkg/model" + "github.com/beckn/beckn-onix/pkg/plugin" + "github.com/beckn/beckn-onix/pkg/plugin/definition" + "github.com/beckn/beckn-onix/pkg/response" +) + +// stdHandler orchestrates the execution of defined processing steps. +type stdHandler struct { + signer definition.Signer + steps []definition.Step + signValidator definition.SignValidator + cache definition.Cache + km definition.KeyManager + schemaValidator definition.SchemaValidator + router definition.Router + publisher definition.Publisher + SubscriberID string + role model.Role +} + +// NewStdHandler initializes a new processor with plugins and steps. +func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Handler, error) { + h := &stdHandler{ + steps: []definition.Step{}, + SubscriberID: cfg.SubscriberID, + role: cfg.Role, + } + // Initialize plugins. + if err := h.initPlugins(ctx, mgr, &cfg.Plugins, cfg.RegistryURL); err != nil { + return nil, fmt.Errorf("failed to initialize plugins: %w", err) + } + // Initialize steps. + if err := h.initSteps(ctx, mgr, cfg); err != nil { + return nil, fmt.Errorf("failed to initialize steps: %w", err) + } + return h, nil +} + +// ServeHTTP processes an incoming HTTP request and executes defined processing steps. +func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx, err := h.stepCtx(r, w.Header()) + if err != nil { + log.Errorf(r.Context(), err, "stepCtx(r):%v", err) + response.SendNack(r.Context(), w, err) + return + } + log.Request(r.Context(), r, ctx.Body) + + // Execute processing steps. + for _, step := range h.steps { + if err := step.Run(ctx); err != nil { + log.Errorf(ctx, err, "%T.run(%v):%v", step, ctx, err) + response.SendNack(ctx, w, err) + return + } + } + // Restore request body before forwarding or publishing. + r.Body = io.NopCloser(bytes.NewReader(ctx.Body)) + if ctx.Route == nil { + response.SendAck(w) + return + } + + // Handle routing based on the defined route type. + route(ctx, r, w, h.publisher) +} + +// stepCtx creates a new StepContext for processing an HTTP request. +func (h *stdHandler) stepCtx(r *http.Request, rh http.Header) (*model.StepContext, error) { + var bodyBuffer bytes.Buffer + if _, err := io.Copy(&bodyBuffer, r.Body); err != nil { + return nil, model.NewBadReqErr(err) + } + r.Body.Close() + subID := h.subID(r.Context()) + if len(subID) == 0 { + return nil, model.NewBadReqErr(fmt.Errorf("subscriberID not set")) + } + return &model.StepContext{ + Context: r.Context(), + Request: r, + Body: bodyBuffer.Bytes(), + Role: h.role, + SubID: subID, + RespHeader: rh, + }, nil +} + +// subID retrieves the subscriber ID from the request context. +func (h *stdHandler) subID(ctx context.Context) string { + rSubID, ok := ctx.Value("subscriber_id").(string) + if ok { + return rSubID + } + return h.SubscriberID +} + +var proxyFunc = proxy + +// route handles request forwarding or message publishing based on the routing type. +func route(ctx *model.StepContext, r *http.Request, w http.ResponseWriter, pb definition.Publisher) { + log.Debugf(ctx, "Routing to ctx.Route to %#v", ctx.Route) + switch ctx.Route.TargetType { + case "url": + log.Infof(ctx.Context, "Forwarding request to URL: %s", ctx.Route.URL) + proxyFunc(r, w, ctx.Route.URL) + return + case "publisher": + if pb == nil { + err := fmt.Errorf("publisher plugin not configured") + log.Errorf(ctx.Context, err, "Invalid configuration:%v", err) + response.SendNack(ctx, w, err) + return + } + log.Infof(ctx.Context, "Publishing message to: %s", ctx.Route.PublisherID) + if err := pb.Publish(ctx, ctx.Route.PublisherID, ctx.Body); err != nil { + log.Errorf(ctx.Context, err, "Failed to publish message") + http.Error(w, "Error publishing message", http.StatusInternalServerError) + response.SendNack(ctx, w, err) + return + } + default: + err := fmt.Errorf("unknown route type: %s", ctx.Route.TargetType) + log.Errorf(ctx.Context, err, "Invalid configuration:%v", err) + response.SendNack(ctx, w, err) + return + } + response.SendAck(w) +} + +// proxy forwards the request to a target URL using a reverse proxy. +func proxy(r *http.Request, w http.ResponseWriter, target *url.URL) { + r.URL.Scheme = target.Scheme + r.URL.Host = target.Host + r.URL.Path = target.Path + + r.Header.Set("X-Forwarded-Host", r.Host) + proxy := httputil.NewSingleHostReverseProxy(target) + log.Infof(r.Context(), "Proxying request to: %s", target) + + proxy.ServeHTTP(w, r) +} + +// loadPlugin is a generic function to load and validate plugins. +func loadPlugin[T any](ctx context.Context, name string, cfg *plugin.Config, mgrFunc func(context.Context, *plugin.Config) (T, error)) (T, error) { + var zero T + if cfg == nil { + log.Debugf(ctx, "Skipping %s plugin: not configured", name) + return zero, nil + } + + plugin, err := mgrFunc(ctx, cfg) + if err != nil { + return zero, fmt.Errorf("failed to load %s plugin (%s): %w", name, cfg.ID, err) + } + + log.Debugf(ctx, "Loaded %s plugin: %s", name, cfg.ID) + return plugin, nil +} + +// loadKeyManager loads the KeyManager plugin using the provided PluginManager, cache, and registry URL. +func loadKeyManager(ctx context.Context, mgr PluginManager, cache definition.Cache, cfg *plugin.Config, regURL string) (definition.KeyManager, error) { + if cfg == nil { + log.Debug(ctx, "Skipping KeyManager plugin: not configured") + return nil, nil + } + if cache == nil { + return nil, fmt.Errorf("failed to load KeyManager plugin (%s): Cache plugin not configured", cfg.ID) + } + rClient := client.NewRegisteryClient(&client.Config{RegisteryURL: regURL}) + km, err := mgr.KeyManager(ctx, cache, rClient, cfg) + if err != nil { + return nil, fmt.Errorf("failed to load cache plugin (%s): %w", cfg.ID, err) + } + + log.Debugf(ctx, "Loaded Keymanager plugin: %s", cfg.ID) + return km, nil +} + +// initPlugins initializes required plugins for the processor. +func (h *stdHandler) initPlugins(ctx context.Context, mgr PluginManager, cfg *PluginCfg, regURL string) error { + var err error + if h.cache, err = loadPlugin(ctx, "Cache", cfg.Cache, mgr.Cache); err != nil { + return err + } + if h.km, err = loadKeyManager(ctx, mgr, h.cache, cfg.KeyManager, regURL); err != nil { + return err + } + if h.signValidator, err = loadPlugin(ctx, "SignValidator", cfg.SignValidator, mgr.SignValidator); err != nil { + return err + } + if h.schemaValidator, err = loadPlugin(ctx, "SchemaValidator", cfg.SchemaValidator, mgr.SchemaValidator); err != nil { + return err + } + if h.router, err = loadPlugin(ctx, "Router", cfg.Router, mgr.Router); err != nil { + return err + } + if h.publisher, err = loadPlugin(ctx, "Publisher", cfg.Publisher, mgr.Publisher); err != nil { + return err + } + if h.signer, err = loadPlugin(ctx, "Signer", cfg.Signer, mgr.Signer); err != nil { + return err + } + + log.Debugf(ctx, "All required plugins successfully loaded for stdHandler") + return nil +} + +// initSteps initializes and validates processing steps for the processor. +func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Config) error { + steps := make(map[string]definition.Step) + + // Load plugin-based steps + for _, c := range cfg.Plugins.Steps { + step, err := mgr.Step(ctx, &c) + if err != nil { + return fmt.Errorf("failed to initialize plugin step %s: %w", c.ID, err) + } + steps[c.ID] = step + } + + // Register processing steps + for _, step := range cfg.Steps { + var s definition.Step + var err error + + switch step { + case "sign": + s, err = newSignStep(h.signer, h.km) + case "validateSign": + s, err = newValidateSignStep(h.signValidator, h.km) + case "validateSchema": + s, err = newValidateSchemaStep(h.schemaValidator) + case "addRoute": + s, err = newAddRouteStep(h.router) + default: + if customStep, exists := steps[step]; exists { + s = customStep + } else { + return fmt.Errorf("unrecognized step: %s", step) + } + } + + if err != nil { + return err + } + h.steps = append(h.steps, s) + } + log.Infof(ctx, "Processor steps initialized: %v", cfg.Steps) + return nil +} diff --git a/core/module/handler/step.go b/core/module/handler/step.go new file mode 100644 index 0000000..936ee98 --- /dev/null +++ b/core/module/handler/step.go @@ -0,0 +1,169 @@ +package handler + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/beckn/beckn-onix/pkg/log" + "github.com/beckn/beckn-onix/pkg/model" + "github.com/beckn/beckn-onix/pkg/plugin/definition" +) + +// signStep represents the signing step in the processing pipeline. +type signStep struct { + signer definition.Signer + km definition.KeyManager +} + +// newSignStep initializes and returns a new signing step. +func newSignStep(signer definition.Signer, km definition.KeyManager) (definition.Step, error) { + if signer == nil { + return nil, fmt.Errorf("invalid config: Signer plugin not configured") + } + if km == nil { + return nil, fmt.Errorf("invalid config: KeyManager plugin not configured") + } + + return &signStep{signer: signer, km: km}, nil +} + +// Run executes the signing step. +func (s *signStep) Run(ctx *model.StepContext) error { + keyID, key, err := s.km.SigningPrivateKey(ctx, ctx.SubID) + if err != nil { + return fmt.Errorf("failed to get signing key: %w", err) + } + createdAt := time.Now().Unix() + validTill := time.Now().Add(5 * time.Minute).Unix() + sign, err := s.signer.Sign(ctx, ctx.Body, key, createdAt, validTill) + if err != nil { + return fmt.Errorf("failed to sign request: %w", err) + } + + authHeader := s.generateAuthHeader(ctx.SubID, keyID, createdAt, validTill, sign) + + header := model.AuthHeaderSubscriber + if ctx.Role == model.RoleGateway { + header = model.AuthHeaderGateway + } + ctx.Request.Header.Set(header, authHeader) + return nil +} + +// generateAuthHeader constructs the authorization header for the signed request. +// It includes key ID, algorithm, creation time, expiration time, required headers, and signature. +func (s *signStep) generateAuthHeader(subID, keyID string, createdAt, validTill int64, signature string) string { + return fmt.Sprintf( + "Signature keyId=\"%s|%s|ed25519\",algorithm=\"ed25519\",created=\"%d\",expires=\"%d\",headers=\"(created) (expires) digest\",signature=\"%s\"", + subID, keyID, createdAt, validTill, signature, + ) +} + +// validateSignStep represents the signature validation step. +type validateSignStep struct { + validator definition.SignValidator + km definition.KeyManager +} + +// newValidateSignStep initializes and returns a new validate sign step. +func newValidateSignStep(signValidator definition.SignValidator, km definition.KeyManager) (definition.Step, error) { + if signValidator == nil { + return nil, fmt.Errorf("invalid config: SignValidator plugin not configured") + } + if km == nil { + return nil, fmt.Errorf("invalid config: KeyManager plugin not configured") + } + return &validateSignStep{validator: signValidator, km: km}, nil +} + +// Run executes the validation step. +func (s *validateSignStep) Run(ctx *model.StepContext) error { + unauthHeader := fmt.Sprintf("Signature realm=\"%s\",headers=\"(created) (expires) digest\"", ctx.SubID) + headerValue := ctx.Request.Header.Get(model.AuthHeaderGateway) + if len(headerValue) != 0 { + if err := s.validate(ctx, headerValue); err != nil { + ctx.RespHeader.Set(model.UnaAuthorizedHeaderGateway, unauthHeader) + return model.NewSignValidationErr(fmt.Errorf("failed to validate %s: %w", model.AuthHeaderGateway, err)) + } + } + headerValue = ctx.Request.Header.Get(model.AuthHeaderSubscriber) + if len(headerValue) == 0 { + ctx.RespHeader.Set(model.UnaAuthorizedHeaderSubscriber, unauthHeader) + return model.NewSignValidationErr(fmt.Errorf("%s missing", model.UnaAuthorizedHeaderSubscriber)) + } + if err := s.validate(ctx, headerValue); err != nil { + ctx.RespHeader.Set(model.UnaAuthorizedHeaderSubscriber, unauthHeader) + return model.NewSignValidationErr(fmt.Errorf("failed to validate %s: %w", model.AuthHeaderSubscriber, err)) + } + return nil +} + +// validate checks the validity of the provided signature header. +func (s *validateSignStep) validate(ctx *model.StepContext, value string) error { + headerParts := strings.Split(value, "|") + ids := strings.Split(headerParts[0], "\"") + if len(ids) < 2 || len(headerParts) < 3 { + return fmt.Errorf("malformed sign header") + } + subID := ids[1] + keyID := headerParts[1] + key, err := s.km.SigningPublicKey(ctx, subID, keyID) + if err != nil { + return fmt.Errorf("failed to get validation key: %w", err) + } + if err := s.validator.Validate(ctx, ctx.Body, value, key); err != nil { + return fmt.Errorf("sign validation failed: %w", err) + } + return nil +} + +// validateSchemaStep represents the schema validation step. +type validateSchemaStep struct { + validator definition.SchemaValidator +} + +// newValidateSchemaStep creates and returns the validateSchema step after validation. +func newValidateSchemaStep(schemaValidator definition.SchemaValidator) (definition.Step, error) { + if schemaValidator == nil { + return nil, fmt.Errorf("invalid config: SchemaValidator plugin not configured") + } + log.Debug(context.Background(), "adding schema validator") + return &validateSchemaStep{validator: schemaValidator}, nil +} + +// Run executes the schema validation step. +func (s *validateSchemaStep) Run(ctx *model.StepContext) error { + if err := s.validator.Validate(ctx, ctx.Request.URL, ctx.Body); err != nil { + return fmt.Errorf("schema validation failed: %w", err) + } + return nil +} + +// addRouteStep represents the route determination step. +type addRouteStep struct { + router definition.Router +} + +// newAddRouteStep creates and returns the addRoute step after validation. +func newAddRouteStep(router definition.Router) (definition.Step, error) { + if router == nil { + return nil, fmt.Errorf("invalid config: Router plugin not configured") + } + return &addRouteStep{router: router}, nil +} + +// Run executes the routing step. +func (s *addRouteStep) Run(ctx *model.StepContext) error { + route, err := s.router.Route(ctx, ctx.Request.URL, ctx.Body) + if err != nil { + return fmt.Errorf("failed to determine route: %w", err) + } + ctx.Route = &model.Route{ + TargetType: route.TargetType, + PublisherID: route.PublisherID, + URL: route.URL, + } + return nil +} diff --git a/core/module/module.go b/core/module/module.go new file mode 100644 index 0000000..3b0fcef --- /dev/null +++ b/core/module/module.go @@ -0,0 +1,73 @@ +package module + +import ( + "context" + "fmt" + "net/http" + + "github.com/beckn/beckn-onix/core/module/handler" + "github.com/beckn/beckn-onix/pkg/log" +) + +// Config represents the configuration for a module. +type Config struct { + Name string `yaml:"name"` + Path string `yaml:"path"` + Handler handler.Config +} + +// Provider represents a function that initializes an HTTP handler using a PluginManager. +type Provider func(ctx context.Context, mgr handler.PluginManager, cfg *handler.Config) (http.Handler, error) + +// handlerProviders maintains a mapping of handler types to their respective providers. +var handlerProviders = map[handler.Type]Provider{ + handler.HandlerTypeStd: handler.NewStdHandler, +} + +// Register initializes and registers handlers based on the provided configuration. +// It iterates over the module configurations, retrieves appropriate handler providers, +// and registers the handlers with the HTTP multiplexer. +func Register(ctx context.Context, mCfgs []Config, mux *http.ServeMux, mgr handler.PluginManager) error { + log.Debugf(ctx, "Registering modules with config: %#v", mCfgs) + // Iterate over the handlers in the configuration. + for _, c := range mCfgs { + rmp, ok := handlerProviders[c.Handler.Type] + if !ok { + return fmt.Errorf("invalid module : %s", c.Name) + } + h, err := rmp(ctx, mgr, &c.Handler) + if err != nil { + return fmt.Errorf("%s : %w", c.Name, err) + } + h, err = addMiddleware(ctx, mgr, h, &c.Handler) + if err != nil { + return fmt.Errorf("failed to add middleware: %w", err) + + } + log.Debugf(ctx, "Registering handler %s, of type %s @ %s", c.Name, c.Handler.Type, c.Path) + mux.Handle(c.Path, h) + } + return nil +} + +// addMiddleware applies middleware plugins to the provided handler in reverse order. +// It retrieves middleware instances from the plugin manager and chains them to the handler. +func addMiddleware(ctx context.Context, mgr handler.PluginManager, handler http.Handler, hCfg *handler.Config) (http.Handler, error) { + mws := hCfg.Plugins.Middleware + log.Debugf(ctx, "Applying %d middleware(s) to the handler", len(mws)) + // Apply the middleware in reverse order. + for i := len(mws) - 1; i >= 0; i-- { + log.Debugf(ctx, "Loading middleware: %s", mws[i].ID) + mw, err := mgr.Middleware(ctx, &mws[i]) + if err != nil { + log.Errorf(ctx, err, "Failed to load middleware %s: %v", mws[i].ID, err) + return nil, fmt.Errorf("failed to load middleware %s: %w", mws[i].ID, err) + } + // Apply the middleware to the handler. + handler = mw(handler) + log.Debugf(ctx, "Applied middleware: %s", mws[i].ID) + } + + log.Debugf(ctx, "Middleware chain setup completed") + return handler, nil +} diff --git a/core/module/module_test.go b/core/module/module_test.go new file mode 100644 index 0000000..8f9016c --- /dev/null +++ b/core/module/module_test.go @@ -0,0 +1,153 @@ +package module + +import ( + "context" + "errors" + "net/http" + "testing" + + "github.com/beckn/beckn-onix/core/module/handler" + "github.com/beckn/beckn-onix/pkg/plugin" + "github.com/beckn/beckn-onix/pkg/plugin/definition" +) + +// mockPluginManager is a mock implementation of the PluginManager interface +// with support for dynamically setting behavior. +type mockPluginManager struct { + middlewareFunc func(ctx context.Context, cfg *plugin.Config) (func(http.Handler) http.Handler, error) +} + +// Middleware returns a mock middleware function based on the provided configuration. +func (m *mockPluginManager) Middleware(ctx context.Context, cfg *plugin.Config) (func(http.Handler) http.Handler, error) { + return m.middlewareFunc(ctx, cfg) +} + +// SignValidator returns a mock verifier implementation. +func (m *mockPluginManager) SignValidator(ctx context.Context, cfg *plugin.Config) (definition.SignValidator, error) { + return nil, nil +} + +// Validator returns a mock schema validator implementation. +func (m *mockPluginManager) Validator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error) { + return nil, nil +} + +// Router returns a mock router implementation. +func (m *mockPluginManager) Router(ctx context.Context, cfg *plugin.Config) (definition.Router, error) { + return nil, nil +} + +// Publisher returns a mock publisher implementation. +func (m *mockPluginManager) Publisher(ctx context.Context, cfg *plugin.Config) (definition.Publisher, error) { + return nil, nil +} + +// Signer returns a mock signer implementation. +func (m *mockPluginManager) Signer(ctx context.Context, cfg *plugin.Config) (definition.Signer, error) { + return nil, nil +} + +// Step returns a mock step implementation. +func (m *mockPluginManager) Step(ctx context.Context, cfg *plugin.Config) (definition.Step, error) { + return nil, nil +} + +// Cache returns a mock cache implementation. +func (m *mockPluginManager) Cache(ctx context.Context, cfg *plugin.Config) (definition.Cache, error) { + return nil, nil +} + +// KeyManager returns a mock key manager implementation. +func (m *mockPluginManager) KeyManager(ctx context.Context, cache definition.Cache, rLookup definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error) { + return nil, nil +} + +// SchemaValidator returns a mock schema validator implementation. +func (m *mockPluginManager) SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error) { + return nil, nil +} + +// TestRegisterSuccess tests scenarios where the handler registration should succeed. +func TestRegisterSuccess(t *testing.T) { + mCfgs := []Config{ + { + Name: "test-module", + Path: "/test", + Handler: handler.Config{ + Type: handler.HandlerTypeStd, + Plugins: handler.PluginCfg{ + Middleware: []plugin.Config{{ID: "mock-middleware"}}, + }, + }, + }, + } + + mockManager := &mockPluginManager{ + middlewareFunc: func(ctx context.Context, cfg *plugin.Config) (func(http.Handler) http.Handler, error) { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + next.ServeHTTP(w, r) + }) + }, nil + }, + } + + mux := http.NewServeMux() + err := Register(context.Background(), mCfgs, mux, mockManager) + if err != nil { + t.Errorf("unexpected error: %v", err) + } +} + +// TestRegisterFailure tests scenarios where the handler registration should fail. +func TestRegisterFailure(t *testing.T) { + tests := []struct { + name string + mCfgs []Config + mockManager *mockPluginManager + }{ + { + name: "invalid handler type", + mCfgs: []Config{ + { + Name: "invalid-module", + Path: "/invalid", + Handler: handler.Config{ + Type: "invalid-type", + }, + }, + }, + mockManager: &mockPluginManager{}, + }, + { + name: "middleware error", + mCfgs: []Config{ + { + Name: "test-module", + Path: "/test", + Handler: handler.Config{ + Type: handler.HandlerTypeStd, + Plugins: handler.PluginCfg{ + Middleware: []plugin.Config{{ID: "mock-middleware"}}, + }, + }, + }, + }, + mockManager: &mockPluginManager{ + middlewareFunc: func(ctx context.Context, cfg *plugin.Config) (func(http.Handler) http.Handler, error) { + return nil, errors.New("middleware error") + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mux := http.NewServeMux() + err := Register(context.Background(), tt.mCfgs, mux, tt.mockManager) + if err == nil { + t.Errorf("expected an error but got nil") + } + }) + } +} diff --git a/go.mod b/go.mod index dd6f0e8..3ec4911 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/beckn/beckn-onix -go 1.24.1 +go 1.24 require ( github.com/kr/pretty v0.3.1 // indirect @@ -10,45 +10,32 @@ require ( gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect ) -toolchain go1.23.7 +require github.com/stretchr/testify v1.10.0 require ( - github.com/rs/zerolog v1.33.0 github.com/davecgh/go-spew v1.1.1 // indirect - github.com/google/uuid v1.6.0 + github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/stretchr/testify v1.10.0 - github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03 - golang.org/x/text v0.23.0 // indirect - golang.org/x/crypto v0.36.0 - gopkg.in/natefinch/lumberjack.v2 v2.2.1 + github.com/stretchr/objx v0.5.2 // indirect gopkg.in/yaml.v3 v3.0.1 ) +require github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03 + require ( - cloud.google.com/go v0.119.0 // indirect - cloud.google.com/go/auth v0.15.0 // indirect - cloud.google.com/go/auth/oauth2adapt v0.2.7 // indirect - cloud.google.com/go/compute/metadata v0.6.0 // indirect - cloud.google.com/go/iam v1.4.1 // indirect - github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/go-logr/logr v1.4.2 // indirect - github.com/go-logr/stdr v1.2.2 // indirect - github.com/google/s2a-go v0.1.9 // indirect - github.com/googleapis/enterprise-certificate-proxy v0.3.5 // indirect - github.com/googleapis/gax-go/v2 v2.14.1 // indirect - github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03 + github.com/google/uuid v1.6.0 + golang.org/x/text v0.23.0 // indirect ) -require golang.org/x/text v0.23.0 // indirect - -require golang.org/x/sys v0.31.0 // indirect require ( github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect golang.org/x/sys v0.31.0 // indirect ) + require ( - cloud.google.com/go/pubsub v1.48.0 + github.com/hashicorp/go-retryablehttp v0.7.7 + github.com/rs/zerolog v1.34.0 + gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index 7df543a..b00b8d5 100644 --- a/go.sum +++ b/go.sum @@ -1,23 +1,20 @@ -cloud.google.com/go v0.119.0/go.mod h1:fwB8QLzTcNevxqi8dcpR+hoMIs3jBherGS9VUBDAW08= -cloud.google.com/go/auth v0.15.0/go.mod h1:WJDGqZ1o9E9wKIL+IwStfyn/+s59zl4Bi+1KQNVXLZ8= -cloud.google.com/go/auth/oauth2adapt v0.2.7/go.mod h1:NTbTTzfvPl1Y3V1nPpOgl2w6d/FjO7NNUQaWSox6ZMc= -cloud.google.com/go/compute/metadata v0.6.0/go.mod h1:FjyFAW1MW0C203CEOMDTu3Dk1FlqW3Rga40jzHL4hfg= -cloud.google.com/go/iam v1.4.1/go.mod h1:2vUEJpUG3Q9p2UdsyksaKpDzlwOrnMzS30isdReIcLM= -cloud.google.com/go/pubsub v1.48.0/go.mod h1:AAtyjyIT/+zaY1ERKFJbefOvkUxRDNp3nD6TdfdqUZk= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dlclark/regexp2 v1.11.0 h1:G/nrcoOa7ZXlpoa/91N3X7mM3r8eIlMBBJZvsz/mxKI= github.com/dlclark/regexp2 v1.11.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= -github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= -github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM= +github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= +github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/googleapis/enterprise-certificate-proxy v0.3.5/go.mod h1:MkHOF77EYAE7qfSuSS9PU6g4Nt4e11cnsDUowfwewLA= -github.com/googleapis/gax-go/v2 v2.14.1/go.mod h1:Hb/NubMaVM88SrNkvl8X/o8XWwDJEPqouaLeN2IUxoA= +github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= +github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= +github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= +github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= +github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU= +github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= @@ -25,27 +22,28 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= +github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= +github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 h1:PKK9DyHxif4LZo+uQSgXNqs0jj5+xZwwfKHgph2lxBw= github.com/santhosh-tekuri/jsonschema/v6 v6.0.1/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= -github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= -github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= -github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= -github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= -github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= -github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= -github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03 h1:m1h+vudopHsI67FPT9MOncyndWhTcdUoBtI1R1uajGY= github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03/go.mod h1:8sheVFH84v3PCyFY/O02mIgSQY9I6wMYPWsq7mDnEZY= golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= @@ -60,9 +58,9 @@ golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= -gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= diff --git a/pkg/log/log.go b/pkg/log/log.go index 8531eee..eabd9f0 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -25,11 +25,13 @@ type destination struct { Config map[string]string `yaml:"config"` } +// Destination types for logging output. const ( Stdout destinationType = "stdout" File destinationType = "file" ) +// Log levels define the severity of log messages. const ( DebugLevel level = "debug" InfoLevel level = "info" @@ -48,6 +50,7 @@ var logLevels = map[level]zerolog.Level{ PanicLevel: zerolog.PanicLevel, } +// Config represents the configuration for logging. type Config struct { Level level `yaml:"level"` Destinations []destination `yaml:"destinations"` @@ -60,6 +63,7 @@ var ( once sync.Once ) +// Logger instance and configuration. var ( ErrInvalidLogLevel = errors.New("invalid log level") ErrLogDestinationNil = errors.New("log Destinations cant be empty") @@ -163,6 +167,8 @@ func getLogger(config Config) (zerolog.Logger, error) { return newLogger, nil } +// InitLogger initializes the logger with the given configuration. +// It ensures that the logger is initialized only once using sync.Once. func InitLogger(c Config) error { var initErr error once.Do(func() { @@ -175,60 +181,74 @@ func InitLogger(c Config) error { return initErr } +// Debug logs a debug-level message with the provided context. func Debug(ctx context.Context, msg string) { logEvent(ctx, zerolog.DebugLevel, msg, nil) } +// Debugf logs a formatted debug-level message with the provided context. func Debugf(ctx context.Context, format string, v ...any) { msg := fmt.Sprintf(format, v...) logEvent(ctx, zerolog.DebugLevel, msg, nil) } +// Info logs an info-level message with the provided context. func Info(ctx context.Context, msg string) { logEvent(ctx, zerolog.InfoLevel, msg, nil) } +// Infof logs a formatted info-level message with the provided context. func Infof(ctx context.Context, format string, v ...any) { msg := fmt.Sprintf(format, v...) logEvent(ctx, zerolog.InfoLevel, msg, nil) } +// Warn logs a warning-level message with the provided context. func Warn(ctx context.Context, msg string) { logEvent(ctx, zerolog.WarnLevel, msg, nil) } +// Warnf logs a formatted warning-level message with the provided context. func Warnf(ctx context.Context, format string, v ...any) { msg := fmt.Sprintf(format, v...) logEvent(ctx, zerolog.WarnLevel, msg, nil) } +// Error logs an error-level message along with an error object. func Error(ctx context.Context, err error, msg string) { logEvent(ctx, zerolog.ErrorLevel, msg, err) } +// Errorf logs a formatted error-level message along with an error object. func Errorf(ctx context.Context, err error, format string, v ...any) { msg := fmt.Sprintf(format, v...) logEvent(ctx, zerolog.ErrorLevel, msg, err) } +// Fatal logs a fatal-level message along with an error object and exits the application. func Fatal(ctx context.Context, err error, msg string) { logEvent(ctx, zerolog.FatalLevel, msg, err) } +// Fatalf logs a formatted fatal-level message along with an error object and exits the application. func Fatalf(ctx context.Context, err error, format string, v ...any) { msg := fmt.Sprintf(format, v...) logEvent(ctx, zerolog.FatalLevel, msg, err) } +// Panic logs a panic-level message along with an error object and panics. func Panic(ctx context.Context, err error, msg string) { logEvent(ctx, zerolog.PanicLevel, msg, err) } +// Panicf logs a formatted panic-level message along with an error object and panics. func Panicf(ctx context.Context, err error, format string, v ...any) { msg := fmt.Sprintf(format, v...) logEvent(ctx, zerolog.PanicLevel, msg, err) } +// logEvent logs an event at the specified log level with an optional error message. +// It adds contextual information before logging the message. func logEvent(ctx context.Context, level zerolog.Level, msg string, err error) { event := logger.WithLevel(level) @@ -239,6 +259,7 @@ func logEvent(ctx context.Context, level zerolog.Level, msg string, err error) { event.Msg(msg) } +// Request logs details of an incoming HTTP request, including method, URL, body, and remote address. func Request(ctx context.Context, r *http.Request, body []byte) { event := logger.Info() addCtx(ctx, event) @@ -249,6 +270,7 @@ func Request(ctx context.Context, r *http.Request, body []byte) { Msg("HTTP Request") } +// addCtx adds context values to the log event based on configured context keys. func addCtx(ctx context.Context, event *zerolog.Event) { for _, key := range cfg.ContextKeys { val, ok := ctx.Value(key).(string) @@ -260,6 +282,7 @@ func addCtx(ctx context.Context, event *zerolog.Event) { } } +// Response logs details of an outgoing HTTP response, including method, URL, status code, and response time. func Response(ctx context.Context, r *http.Request, statusCode int, responseTime time.Duration) { event := logger.Info() addCtx(ctx, event) diff --git a/pkg/model/model.go b/pkg/model/model.go index fcd8c65..621bdeb 100644 --- a/pkg/model/model.go +++ b/pkg/model/model.go @@ -8,12 +8,15 @@ import ( "time" ) +// Subscriber represents a unique operational configuration of a trusted platform on a network. type Subscriber struct { SubscriberID string `json:"subscriber_id"` URL string `json:"url" format:"uri"` Type string `json:"type" enum:"BAP,BPP,BG"` Domain string `json:"domain"` } + +// Subscription represents subscription details of a network participant. type Subscription struct { Subscriber `json:",inline"` KeyID string `json:"key_id" format:"uuid"` @@ -27,6 +30,7 @@ type Subscription struct { Nonce string } +// Authorization-related constants for headers. const ( AuthHeaderSubscriber string = "Authorization" AuthHeaderGateway string = "X-Gateway-Authorization" @@ -36,14 +40,20 @@ const ( type contextKey string +// MsgIDKey is the context key used to store and retrieve the message ID in a request context. const MsgIDKey = contextKey("message_id") +// Role defines the type of participant in the network. type Role string const ( - RoleBAP Role = "bap" - RoleBPP Role = "bpp" - RoleGateway Role = "gateway" + // RoleBAP represents a Buyer App Participant (BAP) in the network. + RoleBAP Role = "bap" + // RoleBPP represents a Buyer Platform Participant (BPP) in the network. + RoleBPP Role = "bpp" + // RoleGateway represents a Gateway that facilitates communication in the network. + RoleGateway Role = "gateway" + // RoleRegistery represents the Registry that maintains network participant details. RoleRegistery Role = "registery" ) @@ -54,6 +64,7 @@ var validRoles = map[Role]bool{ RoleRegistery: true, } +// UnmarshalYAML implements custom YAML unmarshalling for Role to ensure only valid values are accepted. func (r *Role) UnmarshalYAML(unmarshal func(interface{}) error) error { var roleName string if err := unmarshal(&roleName); err != nil { @@ -68,12 +79,23 @@ func (r *Role) UnmarshalYAML(unmarshal func(interface{}) error) error { return nil } +// Route represents a network route for message processing. type Route struct { - Type string - URL *url.URL - Publisher string + TargetType string // "url" or "publisher" + PublisherID string // For message queues + URL *url.URL // For API calls } +// Keyset represents a collection of cryptographic keys used for signing and encryption. +type Keyset struct { + UniqueKeyID string // UniqueKeyID is the identifier for the key pair. + SigningPrivate string // SigningPrivate is the private key used for signing operations. + SigningPublic string // SigningPublic is the public key corresponding to the signing private key. + EncrPrivate string // EncrPrivate is the private key used for encryption operations. + EncrPublic string // EncrPublic is the public key corresponding to the encryption private key. +} + +// StepContext holds context information for a request processing step. type StepContext struct { context.Context Request *http.Request @@ -84,24 +106,36 @@ type StepContext struct { RespHeader http.Header } +// WithContext updates the existing StepContext with a new context. func (ctx *StepContext) WithContext(newCtx context.Context) { ctx.Context = newCtx } +// Status represents the acknowledgment status in a response. type Status string const ( - StatusACK Status = "ACK" + // StatusACK indicates a successful acknowledgment. + StatusACK Status = "ACK" + // StatusNACK indicates a negative acknowledgment or failure. StatusNACK Status = "NACK" ) +// Ack represents an acknowledgment response. type Ack struct { + // Status holds the acknowledgment status (ACK/NACK). Status Status `json:"status"` } + +// Message represents the structure of a response message. type Message struct { - Ack Ack `json:"ack"` + // Ack contains the acknowledgment status. + Ack Ack `json:"ack"` + // Error holds error details, if any, in the response. Error *Error `json:"error,omitempty"` } + +// Response represents the main response structure. type Response struct { Message Message `json:"message"` } diff --git a/pkg/plugin/config.go b/pkg/plugin/config.go new file mode 100644 index 0000000..c20170d --- /dev/null +++ b/pkg/plugin/config.go @@ -0,0 +1,21 @@ +package plugin + +type PublisherCfg struct { + ID string `yaml:"id"` + Config map[string]string `yaml:"config"` +} + +type ValidatorCfg struct { + ID string `yaml:"id"` + Config map[string]string `yaml:"config"` +} + +type Config struct { + ID string `yaml:"id"` + Config map[string]string `yaml:"config"` +} + +type ManagerConfig struct { + Root string `yaml:"root"` + RemoteRoot string `yaml:"remoteRoot"` +} diff --git a/pkg/plugin/definition/cache.go b/pkg/plugin/definition/cache.go new file mode 100644 index 0000000..488249f --- /dev/null +++ b/pkg/plugin/definition/cache.go @@ -0,0 +1,27 @@ +package definition + +import ( + "context" + "time" +) + +// Cache defines the general cache interface for caching plugins. +type Cache interface { + // Get retrieves a value from the cache based on the given key. + Get(ctx context.Context, key string) (string, error) + + // Set stores a value in the cache with the given key and TTL (time-to-live) in seconds. + Set(ctx context.Context, key, value string, ttl time.Duration) error + + // Delete removes a value from the cache based on the given key. + Delete(ctx context.Context, key string) error + + // Clear removes all values from the cache. + Clear(ctx context.Context) error +} + +// CacheProvider interface defines the contract for managing cache instances. +type CacheProvider interface { + // New initializes a new cache instance with the given configuration. + New(ctx context.Context, config map[string]string) (Cache, func() error, error) +} diff --git a/pkg/plugin/definition/keymanager.go b/pkg/plugin/definition/keymanager.go new file mode 100644 index 0000000..f2c0e2f --- /dev/null +++ b/pkg/plugin/definition/keymanager.go @@ -0,0 +1,23 @@ +package definition + +import ( + "context" + + "github.com/beckn/beckn-onix/pkg/model" +) + +// KeyManager defines the interface for key management operations/methods. +type KeyManager interface { + GenerateKeyPairs() (*model.Keyset, error) + StorePrivateKeys(ctx context.Context, keyID string, keys *model.Keyset) error + SigningPrivateKey(ctx context.Context, keyID string) (string, string, error) + EncrPrivateKey(ctx context.Context, keyID string) (string, string, error) + SigningPublicKey(ctx context.Context, subscriberID, uniqueKeyID string) (string, error) + EncrPublicKey(ctx context.Context, subscriberID, uniqueKeyID string) (string, error) + DeletePrivateKeys(ctx context.Context, keyID string) error +} + +// KeyManagerProvider initializes a new signer instance. +type KeyManagerProvider interface { + New(context.Context, Cache, RegistryLookup, map[string]string) (KeyManager, func() error, error) +} diff --git a/pkg/plugin/definition/middleware.go b/pkg/plugin/definition/middleware.go new file mode 100644 index 0000000..7701ed5 --- /dev/null +++ b/pkg/plugin/definition/middleware.go @@ -0,0 +1,10 @@ +package definition + +import ( + "context" + "net/http" +) + +type MiddlewareProvider interface { + New(ctx context.Context, cfg map[string]string) (func(http.Handler) http.Handler, error) +} diff --git a/pkg/plugin/definition/publisher.go b/pkg/plugin/definition/publisher.go index 93f9e21..4eba687 100644 --- a/pkg/plugin/definition/publisher.go +++ b/pkg/plugin/definition/publisher.go @@ -5,12 +5,10 @@ import "context" // Publisher defines the general publisher interface for messaging plugins. type Publisher interface { // Publish sends a message (as a byte slice) using the underlying messaging system. - Publish(ctx context.Context, msg []byte) error - - Close() error // Important for releasing resources. + Publish(context.Context, string, []byte) error } type PublisherProvider interface { // New initializes a new publisher instance with the given configuration. - New(ctx context.Context, config map[string]string) (Publisher, error) + New(ctx context.Context, config map[string]string) (Publisher, func(), error) } diff --git a/pkg/plugin/definition/registry.go b/pkg/plugin/definition/registry.go new file mode 100644 index 0000000..22881f3 --- /dev/null +++ b/pkg/plugin/definition/registry.go @@ -0,0 +1,11 @@ +package definition + +import ( + "context" + + "github.com/beckn/beckn-onix/pkg/model" +) + +type RegistryLookup interface { + Lookup(ctx context.Context, req *model.Subscription) ([]model.Subscription, error) +} diff --git a/pkg/plugin/definition/router.go b/pkg/plugin/definition/router.go index 05e2e30..f30a1ca 100644 --- a/pkg/plugin/definition/router.go +++ b/pkg/plugin/definition/router.go @@ -3,14 +3,9 @@ package definition import ( "context" "net/url" -) -// Route defines the structure for the Route returned. -type Route struct { - TargetType string // "url" or "msgq" or "bap" or "bpp" - PublisherID string // For message queues - URL *url.URL // For API calls -} + "github.com/beckn/beckn-onix/pkg/model" +) // RouterProvider initializes the a new Router instance with the given config. type RouterProvider interface { @@ -20,5 +15,5 @@ type RouterProvider interface { // Router defines the interface for routing requests. type Router interface { // Route determines the routing destination based on the request context. - Route(ctx context.Context, url *url.URL, body []byte) (*Route, error) + Route(ctx context.Context, url *url.URL, body []byte) (*model.Route, error) } diff --git a/pkg/plugin/definition/signVerifier.go b/pkg/plugin/definition/signVerifier.go deleted file mode 100644 index fe36358..0000000 --- a/pkg/plugin/definition/signVerifier.go +++ /dev/null @@ -1,22 +0,0 @@ -package definition - -import "context" - -// Verifier defines the method for verifying signatures. -type Verifier interface { - // Verify checks the validity of the signature for the given body. - Verify(ctx context.Context, body []byte, header []byte, publicKeyBase64 string) (bool, error) - Close() error // Close for releasing resources -} - -// VerifierProvider initializes a new Verifier instance with the given config. -type VerifierProvider interface { - // New creates a new Verifier instance based on the provided config. - New(ctx context.Context, config map[string]string) (Verifier, func() error, error) -} - -// PublicKeyManager is the interface for key management plugin. -type PublicKeyManager interface { - // PublicKey retrieves the public key for the given subscriberID and keyID. - PublicKey(ctx context.Context, subscriberID string, keyID string) (string, error) -} diff --git a/pkg/plugin/definition/signer.go b/pkg/plugin/definition/signer.go index 84db5f5..eff7bae 100644 --- a/pkg/plugin/definition/signer.go +++ b/pkg/plugin/definition/signer.go @@ -8,7 +8,6 @@ type Signer interface { // The signature is created with the given timestamps: createdAt (signature creation time) // and expiresAt (signature expiration time). Sign(ctx context.Context, body []byte, privateKeyBase64 string, createdAt, expiresAt int64) (string, error) - Close() error // Close for releasing resources } // SignerProvider initializes a new signer instance with the given config. @@ -16,9 +15,3 @@ type SignerProvider interface { // New creates a new signer instance based on the provided config. New(ctx context.Context, config map[string]string) (Signer, func() error, error) } - -// PrivateKeyManager is the interface for key management plugin. -type PrivateKeyManager interface { - // PrivateKey retrieves the private key for the given subscriberID and keyID. - PrivateKey(ctx context.Context, subscriberID string, keyID string) (string, error) -} diff --git a/pkg/plugin/definition/signvalidator.go b/pkg/plugin/definition/signvalidator.go new file mode 100644 index 0000000..e900a37 --- /dev/null +++ b/pkg/plugin/definition/signvalidator.go @@ -0,0 +1,15 @@ +package definition + +import "context" + +// SignValidator defines the method for verifying signatures. +type SignValidator interface { + // Validate checks the validity of the signature for the given body. + Validate(ctx context.Context, body []byte, header string, publicKeyBase64 string) error +} + +// SignValidatorProvider initializes a new Verifier instance with the given config. +type SignValidatorProvider interface { + // New creates a new Verifier instance based on the provided config. + New(ctx context.Context, config map[string]string) (SignValidator, func() error, error) +} diff --git a/pkg/plugin/definition/step.go b/pkg/plugin/definition/step.go new file mode 100644 index 0000000..627675a --- /dev/null +++ b/pkg/plugin/definition/step.go @@ -0,0 +1,15 @@ +package definition + +import ( + "context" + + "github.com/beckn/beckn-onix/pkg/model" +) + +type Step interface { + Run(ctx *model.StepContext) error +} + +type StepProvider interface { + New(context.Context, map[string]string) (Step, func(), error) +} diff --git a/pkg/plugin/implementation/decrypter/cmd/plugin.go b/pkg/plugin/implementation/decrypter/cmd/plugin.go index cb988a9..628e2cb 100644 --- a/pkg/plugin/implementation/decrypter/cmd/plugin.go +++ b/pkg/plugin/implementation/decrypter/cmd/plugin.go @@ -7,13 +7,13 @@ import ( decrypter "github.com/beckn/beckn-onix/pkg/plugin/implementation/decrypter" ) -// DecrypterProvider implements the definition.DecrypterProvider interface. -type DecrypterProvider struct{} +// decrypterProvider implements the definition.decrypterProvider interface. +type decrypterProvider struct{} // New creates a new Decrypter instance using the provided configuration. -func (dp DecrypterProvider) New(ctx context.Context, config map[string]string) (definition.Decrypter, func() error, error) { +func (dp decrypterProvider) New(ctx context.Context, config map[string]string) (definition.Decrypter, func() error, error) { return decrypter.New(ctx) } // Provider is the exported symbol that the plugin manager will look for. -var Provider definition.DecrypterProvider = DecrypterProvider{} +var Provider = decrypterProvider{} diff --git a/pkg/plugin/implementation/decrypter/cmd/plugin_test.go b/pkg/plugin/implementation/decrypter/cmd/plugin_test.go index 6a4f168..0e8a079 100644 --- a/pkg/plugin/implementation/decrypter/cmd/plugin_test.go +++ b/pkg/plugin/implementation/decrypter/cmd/plugin_test.go @@ -25,7 +25,7 @@ func TestDecrypterProviderSuccess(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - provider := DecrypterProvider{} + provider := decrypterProvider{} decrypter, cleanup, err := provider.New(tt.ctx, tt.config) // Check error. diff --git a/pkg/plugin/implementation/encrypter/cmd/plugin.go b/pkg/plugin/implementation/encrypter/cmd/plugin.go index aad52ef..31e0044 100644 --- a/pkg/plugin/implementation/encrypter/cmd/plugin.go +++ b/pkg/plugin/implementation/encrypter/cmd/plugin.go @@ -7,12 +7,12 @@ import ( "github.com/beckn/beckn-onix/pkg/plugin/implementation/encrypter" ) -// EncrypterProvider implements the definition.EncrypterProvider interface. -type EncrypterProvider struct{} +// encrypterProvider implements the definition.encrypterProvider interface. +type encrypterProvider struct{} -func (ep EncrypterProvider) New(ctx context.Context, config map[string]string) (definition.Encrypter, func() error, error) { +func (ep encrypterProvider) New(ctx context.Context, config map[string]string) (definition.Encrypter, func() error, error) { return encrypter.New(ctx) } // Provider is the exported symbol that the plugin manager will look for. -var Provider definition.EncrypterProvider = EncrypterProvider{} +var Provider = encrypterProvider{} diff --git a/pkg/plugin/implementation/encrypter/cmd/plugin_test.go b/pkg/plugin/implementation/encrypter/cmd/plugin_test.go index cbb469e..1f65450 100644 --- a/pkg/plugin/implementation/encrypter/cmd/plugin_test.go +++ b/pkg/plugin/implementation/encrypter/cmd/plugin_test.go @@ -28,7 +28,7 @@ func TestEncrypterProviderSuccess(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Create provider and encrypter. - provider := EncrypterProvider{} + provider := encrypterProvider{} encrypter, cleanup, err := provider.New(tt.ctx, tt.config) if err != nil { t.Fatalf("EncrypterProvider.New() error = %v", err) diff --git a/pkg/plugin/implementation/requestPreProcessor/cmd/plugin.go b/pkg/plugin/implementation/reqpreprocessor/cmd/plugin.go similarity index 51% rename from pkg/plugin/implementation/requestPreProcessor/cmd/plugin.go rename to pkg/plugin/implementation/reqpreprocessor/cmd/plugin.go index 4a05ecc..b89b650 100644 --- a/pkg/plugin/implementation/requestPreProcessor/cmd/plugin.go +++ b/pkg/plugin/implementation/reqpreprocessor/cmd/plugin.go @@ -5,17 +5,20 @@ import ( "net/http" "strings" - requestpreprocessor "github.com/beckn/beckn-onix/pkg/plugin/implementation/requestPreProcessor" + "github.com/beckn/beckn-onix/pkg/plugin/implementation/reqpreprocessor" ) type provider struct{} func (p provider) New(ctx context.Context, c map[string]string) (func(http.Handler) http.Handler, error) { - config := &requestpreprocessor.Config{} - if contextKeysStr, ok := c["ContextKeys"]; ok { + config := &reqpreprocessor.Config{} + if contextKeysStr, ok := c["contextKeys"]; ok { config.ContextKeys = strings.Split(contextKeysStr, ",") } - return requestpreprocessor.NewUUIDSetter(config) + if role, ok := c["role"]; ok { + config.Role = role + } + return reqpreprocessor.NewPreProcessor(config) } var Provider = provider{} diff --git a/pkg/plugin/implementation/requestPreProcessor/cmd/plugin_test.go b/pkg/plugin/implementation/reqpreprocessor/cmd/plugin_test.go similarity index 97% rename from pkg/plugin/implementation/requestPreProcessor/cmd/plugin_test.go rename to pkg/plugin/implementation/reqpreprocessor/cmd/plugin_test.go index 0890dbc..6044c44 100644 --- a/pkg/plugin/implementation/requestPreProcessor/cmd/plugin_test.go +++ b/pkg/plugin/implementation/reqpreprocessor/cmd/plugin_test.go @@ -34,7 +34,7 @@ func TestProviderNew(t *testing.T) { { name: "With Check Keys", config: map[string]string{ - "ContextKeys": "message_id,transaction_id", + "contextKeys": "message_id,transaction_id", }, expectedError: false, expectedStatus: http.StatusOK, diff --git a/pkg/plugin/implementation/requestPreProcessor/reqpreprocessor.go b/pkg/plugin/implementation/reqpreprocessor/reqpreprocessor.go similarity index 68% rename from pkg/plugin/implementation/requestPreProcessor/reqpreprocessor.go rename to pkg/plugin/implementation/reqpreprocessor/reqpreprocessor.go index 13d4da0..24ffa67 100644 --- a/pkg/plugin/implementation/requestPreProcessor/reqpreprocessor.go +++ b/pkg/plugin/implementation/reqpreprocessor/reqpreprocessor.go @@ -1,4 +1,4 @@ -package requestpreprocessor +package reqpreprocessor import ( "bytes" @@ -9,24 +9,26 @@ import ( "io" "net/http" + "github.com/beckn/beckn-onix/pkg/log" "github.com/google/uuid" ) +// Config holds the configuration settings for the application. type Config struct { - ContextKeys []string - Role string + ContextKeys []string // ContextKeys is a list of context keys used for request processing. + Role string // Role specifies the role of the entity (e.g., subscriber, gateway). } type becknRequest struct { Context map[string]any `json:"context"` } -type contextKeyType string - const contextKey = "context" -const subscriberIDKey contextKeyType = "subscriber_id" +const subscriberIDKey = "subscriber_id" -func NewUUIDSetter(cfg *Config) (func(http.Handler) http.Handler, error) { +// NewPreProcessor creates a middleware that processes incoming HTTP requests by extracting +// and modifying the request context based on the provided configuration. +func NewPreProcessor(cfg *Config) (func(http.Handler) http.Handler, error) { if err := validateConfig(cfg); err != nil { return nil, err } @@ -34,6 +36,7 @@ func NewUUIDSetter(cfg *Config) (func(http.Handler) http.Handler, error) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, _ := io.ReadAll(r.Body) var req becknRequest + ctx := r.Context() if err := json.Unmarshal(body, &req); err != nil { http.Error(w, "Failed to decode request body", http.StatusBadRequest) return @@ -49,11 +52,15 @@ func NewUUIDSetter(cfg *Config) (func(http.Handler) http.Handler, error) { case "bpp": subID = req.Context["bpp_id"] } - ctx := context.WithValue(r.Context(), subscriberIDKey, subID) + if subID != nil { + log.Debugf(ctx, "adding subscriberId to request:%s, %v", subscriberIDKey, subID) + // TODO: Add a ContextKey type in model and use it here instead of raw context key. + ctx = context.WithValue(ctx, subscriberIDKey, subID) + } for _, key := range cfg.ContextKeys { value := uuid.NewString() updatedValue := update(req.Context, key, value) - ctx = context.WithValue(ctx, contextKeyType(key), updatedValue) + ctx = context.WithValue(ctx, key, updatedValue) } reqData := map[string]any{"context": req.Context} updatedBody, _ := json.Marshal(reqData) diff --git a/pkg/plugin/implementation/requestPreProcessor/reqpreprocessor_test.go b/pkg/plugin/implementation/reqpreprocessor/reqpreprocessor_test.go similarity index 97% rename from pkg/plugin/implementation/requestPreProcessor/reqpreprocessor_test.go rename to pkg/plugin/implementation/reqpreprocessor/reqpreprocessor_test.go index 307a7e7..d70af8e 100644 --- a/pkg/plugin/implementation/requestPreProcessor/reqpreprocessor_test.go +++ b/pkg/plugin/implementation/reqpreprocessor/reqpreprocessor_test.go @@ -1,4 +1,4 @@ -package requestpreprocessor +package reqpreprocessor import ( "bytes" @@ -52,7 +52,7 @@ func TestNewUUIDSetterSuccessCases(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - middleware, err := NewUUIDSetter(tt.config) + middleware, err := NewPreProcessor(tt.config) if err != nil { t.Fatalf("Unexpected error while creating middleware: %v", err) } @@ -148,7 +148,7 @@ func TestNewUUIDSetterErrorCases(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - middleware, err := NewUUIDSetter(tt.config) + middleware, err := NewPreProcessor(tt.config) if tt.config == nil { if err == nil { t.Error("Expected an error for nil config, but got none") diff --git a/pkg/plugin/implementation/router/cmd/plugin.go b/pkg/plugin/implementation/router/cmd/plugin.go index 556f129..d5d71e3 100644 --- a/pkg/plugin/implementation/router/cmd/plugin.go +++ b/pkg/plugin/implementation/router/cmd/plugin.go @@ -4,8 +4,8 @@ import ( "context" "errors" - definition "github.com/beckn/beckn-onix/pkg/plugin/definition" - router "github.com/beckn/beckn-onix/pkg/plugin/implementation/router" + "github.com/beckn/beckn-onix/pkg/plugin/definition" + "github.com/beckn/beckn-onix/pkg/plugin/implementation/router" ) // RouterProvider provides instances of Router. diff --git a/pkg/plugin/implementation/router/router.go b/pkg/plugin/implementation/router/router.go index 9d16767..52e628b 100644 --- a/pkg/plugin/implementation/router/router.go +++ b/pkg/plugin/implementation/router/router.go @@ -9,7 +9,7 @@ import ( "path" "strings" - definition "github.com/beckn/beckn-onix/pkg/plugin/definition" + "github.com/beckn/beckn-onix/pkg/model" "gopkg.in/yaml.v3" ) @@ -26,7 +26,7 @@ type routingConfig struct { // Router implements Router interface. type Router struct { - rules map[string]map[string]map[string]*definition.Route // domain -> version -> endpoint -> route + rules map[string]map[string]map[string]*model.Route // domain -> version -> endpoint -> route } // RoutingRule represents a single routing rule. @@ -61,7 +61,7 @@ func New(ctx context.Context, config *Config) (*Router, func() error, error) { return nil, nil, fmt.Errorf("config cannot be nil") } router := &Router{ - rules: make(map[string]map[string]map[string]*definition.Route), + rules: make(map[string]map[string]map[string]*model.Route), } // Load rules at bootup @@ -71,30 +71,6 @@ func New(ctx context.Context, config *Config) (*Router, func() error, error) { return router, nil, nil } -// parseTargetURL parses a URL string into a url.URL object with strict validation -func parseTargetURL(urlStr string) (*url.URL, error) { - if urlStr == "" { - return nil, nil - } - - parsed, err := url.Parse(urlStr) - if err != nil { - return nil, fmt.Errorf("invalid URL '%s': %w", urlStr, err) - } - - // Enforce scheme requirement - if parsed.Scheme == "" { - return nil, fmt.Errorf("URL '%s' must include a scheme (http/https)", urlStr) - } - - // Optionally validate scheme is http or https - if parsed.Scheme != "https" { - return nil, fmt.Errorf("URL '%s' must use https scheme", urlStr) - } - - return parsed, nil -} - // LoadRules reads and parses routing rules from the YAML configuration file. func (r *Router) loadRules(configPath string) error { if configPath == "" { @@ -117,41 +93,41 @@ func (r *Router) loadRules(configPath string) error { for _, rule := range config.RoutingRules { // Initialize domain map if not exists if _, ok := r.rules[rule.Domain]; !ok { - r.rules[rule.Domain] = make(map[string]map[string]*definition.Route) + r.rules[rule.Domain] = make(map[string]map[string]*model.Route) } // Initialize version map if not exists if _, ok := r.rules[rule.Domain][rule.Version]; !ok { - r.rules[rule.Domain][rule.Version] = make(map[string]*definition.Route) + r.rules[rule.Domain][rule.Version] = make(map[string]*model.Route) } // Add all endpoints for this rule for _, endpoint := range rule.Endpoints { - var route *definition.Route + var route *model.Route switch rule.TargetType { case targetTypePublisher: - route = &definition.Route{ + route = &model.Route{ TargetType: rule.TargetType, PublisherID: rule.Target.PublisherID, } case targetTypeURL: - parsedURL, err := parseTargetURL(rule.Target.URL) + parsedURL, err := url.Parse(rule.Target.URL) if err != nil { return fmt.Errorf("invalid URL in rule: %w", err) } - route = &definition.Route{ + route = &model.Route{ TargetType: rule.TargetType, URL: parsedURL, } case targetTypeBPP, targetTypeBAP: var parsedURL *url.URL if rule.Target.URL != "" { - parsedURL, err = parseTargetURL(rule.Target.URL) + parsedURL, err = url.Parse(rule.Target.URL) if err != nil { return fmt.Errorf("invalid URL in rule: %w", err) } } - route = &definition.Route{ + route = &model.Route{ TargetType: rule.TargetType, URL: parsedURL, } @@ -177,7 +153,7 @@ func validateRules(rules []routingRule) error { if rule.Target.URL == "" { return fmt.Errorf("invalid rule: url is required for targetType 'url'") } - if _, err := parseTargetURL(rule.Target.URL); err != nil { + if _, err := url.Parse(rule.Target.URL); err != nil { return fmt.Errorf("invalid URL - %s: %w", rule.Target.URL, err) } case targetTypePublisher: @@ -186,7 +162,7 @@ func validateRules(rules []routingRule) error { } case targetTypeBPP, targetTypeBAP: if rule.Target.URL != "" { - if _, err := parseTargetURL(rule.Target.URL); err != nil { + if _, err := url.Parse(rule.Target.URL); err != nil { return fmt.Errorf("invalid URL - %s defined in routing config for target type %s: %w", rule.Target.URL, rule.TargetType, err) } } @@ -199,8 +175,7 @@ func validateRules(rules []routingRule) error { } // Route determines the routing destination based on the request context. -func (r *Router) Route(ctx context.Context, url *url.URL, body []byte) (*definition.Route, error) { - +func (r *Router) Route(ctx context.Context, url *url.URL, body []byte) (*model.Route, error) { // Parse the body to extract domain and version var requestBody struct { Context struct { @@ -244,32 +219,32 @@ func (r *Router) Route(ctx context.Context, url *url.URL, body []byte) (*definit } // handleProtocolMapping handles both BPP and BAP routing with proper URL construction -func handleProtocolMapping(route *definition.Route, requestURI, endpoint string) (*definition.Route, error) { - uri := strings.TrimSpace(requestURI) - var targetURL *url.URL - if len(uri) != 0 { - parsedURL, err := parseTargetURL(uri) - if err != nil { - return nil, fmt.Errorf("invalid %s URI - %s in request body for %s: %w", strings.ToUpper(route.TargetType), uri, endpoint, err) - } - targetURL = parsedURL - } - - // If no request URI, fall back to configured URL with endpoint appended - if targetURL == nil { +func handleProtocolMapping(route *model.Route, npURI, endpoint string) (*model.Route, error) { + target := strings.TrimSpace(npURI) + if len(target) == 0 { if route.URL == nil { return nil, fmt.Errorf("could not determine destination for endpoint '%s': neither request contained a %s URI nor was a default URL configured in routing rules", endpoint, strings.ToUpper(route.TargetType)) } - - targetURL = &url.URL{ - Scheme: route.URL.Scheme, - Host: route.URL.Host, - Path: path.Join(route.URL.Path, endpoint), - } + return &model.Route{ + TargetType: targetTypeURL, + URL: &url.URL{ + Scheme: route.URL.Scheme, + Host: route.URL.Host, + Path: path.Join(route.URL.Path, endpoint), + }, + }, nil + } + targetURL, err := url.Parse(target) + if err != nil { + return nil, fmt.Errorf("invalid %s URI - %s in request body for %s: %w", strings.ToUpper(route.TargetType), target, endpoint, err) } - return &definition.Route{ + return &model.Route{ TargetType: targetTypeURL, - URL: targetURL, + URL: &url.URL{ + Scheme: targetURL.Scheme, + Host: targetURL.Host, + Path: path.Join(targetURL.Path, endpoint), + }, }, nil } diff --git a/pkg/plugin/implementation/schemaValidator/cmd/plugin.go b/pkg/plugin/implementation/schemavalidator/cmd/plugin.go similarity index 72% rename from pkg/plugin/implementation/schemaValidator/cmd/plugin.go rename to pkg/plugin/implementation/schemavalidator/cmd/plugin.go index 2a8f44a..f71aaaf 100644 --- a/pkg/plugin/implementation/schemaValidator/cmd/plugin.go +++ b/pkg/plugin/implementation/schemavalidator/cmd/plugin.go @@ -4,8 +4,8 @@ import ( "context" "errors" - definition "github.com/beckn/beckn-onix/pkg/plugin/definition" - schemaValidator "github.com/beckn/beckn-onix/pkg/plugin/implementation/schemaValidator" + "github.com/beckn/beckn-onix/pkg/plugin/definition" + "github.com/beckn/beckn-onix/pkg/plugin/implementation/schemavalidator" ) // schemaValidatorProvider provides instances of schemaValidator. @@ -24,10 +24,10 @@ func (vp schemaValidatorProvider) New(ctx context.Context, config map[string]str } // Create a new schemaValidator instance with the provided configuration - return schemaValidator.New(ctx, &schemaValidator.Config{ + return schemavalidator.New(ctx, &schemavalidator.Config{ SchemaDir: schemaDir, }) } // Provider is the exported symbol that the plugin manager will look for. -var Provider definition.SchemaValidatorProvider = schemaValidatorProvider{} +var Provider = schemaValidatorProvider{} diff --git a/pkg/plugin/implementation/schemaValidator/cmd/plugin_test.go b/pkg/plugin/implementation/schemavalidator/cmd/plugin_test.go similarity index 100% rename from pkg/plugin/implementation/schemaValidator/cmd/plugin_test.go rename to pkg/plugin/implementation/schemavalidator/cmd/plugin_test.go diff --git a/pkg/plugin/implementation/schemaValidator/schemaValidator.go b/pkg/plugin/implementation/schemavalidator/schemavalidator.go similarity index 90% rename from pkg/plugin/implementation/schemaValidator/schemaValidator.go rename to pkg/plugin/implementation/schemavalidator/schemavalidator.go index 2d6b189..715def7 100644 --- a/pkg/plugin/implementation/schemaValidator/schemaValidator.go +++ b/pkg/plugin/implementation/schemavalidator/schemavalidator.go @@ -1,4 +1,4 @@ -package schemaValidator +package schemavalidator import ( "context" @@ -10,7 +10,7 @@ import ( "path/filepath" "strings" - response "github.com/beckn/beckn-onix/pkg/response" + "github.com/beckn/beckn-onix/pkg/model" "github.com/santhosh-tekuri/jsonschema/v6" ) @@ -23,8 +23,8 @@ type payload struct { } `json:"context"` } -// SchemaValidator implements the Validator interface. -type SchemaValidator struct { +// schemaValidator implements the Validator interface. +type schemaValidator struct { config *Config schemaCache map[string]*jsonschema.Schema } @@ -35,12 +35,12 @@ type Config struct { } // New creates a new ValidatorProvider instance. -func New(ctx context.Context, config *Config) (*SchemaValidator, func() error, error) { +func New(ctx context.Context, config *Config) (*schemaValidator, func() error, error) { // Check if config is nil if config == nil { return nil, nil, fmt.Errorf("config cannot be nil") } - v := &SchemaValidator{ + v := &schemaValidator{ config: config, schemaCache: make(map[string]*jsonschema.Schema), } @@ -53,7 +53,7 @@ func New(ctx context.Context, config *Config) (*SchemaValidator, func() error, e } // Validate validates the given data against the schema. -func (v *SchemaValidator) Validate(ctx context.Context, url *url.URL, data []byte) error { +func (v *schemaValidator) Validate(ctx context.Context, url *url.URL, data []byte) error { var payloadData payload err := json.Unmarshal(data, &payloadData) if err != nil { @@ -61,14 +61,14 @@ func (v *SchemaValidator) Validate(ctx context.Context, url *url.URL, data []byt } // Extract domain, version, and endpoint from the payload and uri. - cxt_domain := payloadData.Context.Domain + cxtDomain := payloadData.Context.Domain version := payloadData.Context.Version version = fmt.Sprintf("v%s", version) endpoint := path.Base(url.String()) // ToDo Add debug log here fmt.Println("Handling request for endpoint:", endpoint) - domain := strings.ToLower(cxt_domain) + domain := strings.ToLower(cxtDomain) domain = strings.ReplaceAll(domain, ":", "_") // Construct the schema file name. @@ -89,20 +89,20 @@ func (v *SchemaValidator) Validate(ctx context.Context, url *url.URL, data []byt // Handle schema validation errors if validationErr, ok := err.(*jsonschema.ValidationError); ok { // Convert validation errors into an array of SchemaValError - var schemaErrors []response.Error + var schemaErrors []model.Error for _, cause := range validationErr.Causes { // Extract the path and message from the validation error path := strings.Join(cause.InstanceLocation, ".") // JSON path to the invalid field message := cause.Error() // Validation error message // Append the error to the schemaErrors array - schemaErrors = append(schemaErrors, response.Error{ + schemaErrors = append(schemaErrors, model.Error{ Paths: path, Message: message, }) } // Return the array of schema validation errors - return &response.SchemaValidationErr{Errors: schemaErrors} + return &model.SchemaValidationErr{Errors: schemaErrors} } // Return a generic error for non-validation errors return fmt.Errorf("validation failed: %v", err) @@ -117,7 +117,7 @@ type ValidatorProvider struct{} // Initialise initialises the validator provider by compiling all the JSON schema files // from the specified directory and storing them in a cache indexed by their schema filenames. -func (v *SchemaValidator) initialise() error { +func (v *schemaValidator) initialise() error { schemaDir := v.config.SchemaDir // Check if the directory exists and is accessible. info, err := os.Stat(schemaDir) diff --git a/pkg/plugin/implementation/schemaValidator/schemaValidator_test.go b/pkg/plugin/implementation/schemavalidator/schemavalidator_test.go similarity index 99% rename from pkg/plugin/implementation/schemaValidator/schemaValidator_test.go rename to pkg/plugin/implementation/schemavalidator/schemavalidator_test.go index 277b539..bdb4201 100644 --- a/pkg/plugin/implementation/schemaValidator/schemaValidator_test.go +++ b/pkg/plugin/implementation/schemavalidator/schemavalidator_test.go @@ -1,4 +1,4 @@ -package schemaValidator +package schemavalidator import ( "context" @@ -272,7 +272,7 @@ func TestValidator_Initialise(t *testing.T) { } config := &Config{SchemaDir: schemaDir} - v := &SchemaValidator{ + v := &schemaValidator{ config: config, schemaCache: make(map[string]*jsonschema.Schema), } diff --git a/pkg/plugin/implementation/signVerifier/cmd/plugin.go b/pkg/plugin/implementation/signVerifier/cmd/plugin.go deleted file mode 100644 index 35c1287..0000000 --- a/pkg/plugin/implementation/signVerifier/cmd/plugin.go +++ /dev/null @@ -1,25 +0,0 @@ -package main - -import ( - "context" - "errors" - - "github.com/beckn/beckn-onix/pkg/plugin/definition" - - verifier "github.com/beckn/beckn-onix/pkg/plugin/implementation/signVerifier" -) - -// VerifierProvider provides instances of Verifier. -type VerifierProvider struct{} - -// New initializes a new Verifier instance. -func (vp VerifierProvider) New(ctx context.Context, config map[string]string) (definition.Verifier, func() error, error) { - if ctx == nil { - return nil, nil, errors.New("context cannot be nil") - } - - return verifier.New(ctx, &verifier.Config{}) -} - -// Provider is the exported symbol that the plugin manager will look for. -var Provider definition.VerifierProvider = VerifierProvider{} diff --git a/pkg/plugin/implementation/signer/cmd/plugin.go b/pkg/plugin/implementation/signer/cmd/plugin.go index 2d78d98..1df515f 100644 --- a/pkg/plugin/implementation/signer/cmd/plugin.go +++ b/pkg/plugin/implementation/signer/cmd/plugin.go @@ -21,4 +21,4 @@ func (p SignerProvider) New(ctx context.Context, config map[string]string) (defi } // Provider is the exported symbol that the plugin manager will look for. -var Provider definition.SignerProvider = SignerProvider{} +var Provider = SignerProvider{} diff --git a/pkg/plugin/implementation/signer/signer.go b/pkg/plugin/implementation/signer/signer.go index 90bc6a4..1f5be86 100644 --- a/pkg/plugin/implementation/signer/signer.go +++ b/pkg/plugin/implementation/signer/signer.go @@ -23,7 +23,7 @@ type Signer struct { func New(ctx context.Context, config *Config) (*Signer, func() error, error) { s := &Signer{config: config} - return s, s.Close, nil + return s, nil, nil } // hash generates a signing string using BLAKE-512 hashing. @@ -71,8 +71,3 @@ func (s *Signer) Sign(ctx context.Context, body []byte, privateKeyBase64 string, return base64.StdEncoding.EncodeToString(signature), nil } - -// Close releases resources (mock implementation returning nil). -func (s *Signer) Close() error { - return nil -} diff --git a/pkg/plugin/implementation/signvalidator/cmd/plugin.go b/pkg/plugin/implementation/signvalidator/cmd/plugin.go new file mode 100644 index 0000000..947f956 --- /dev/null +++ b/pkg/plugin/implementation/signvalidator/cmd/plugin.go @@ -0,0 +1,24 @@ +package main + +import ( + "context" + "errors" + + "github.com/beckn/beckn-onix/pkg/plugin/definition" + "github.com/beckn/beckn-onix/pkg/plugin/implementation/signvalidator" +) + +// provider provides instances of Verifier. +type provider struct{} + +// New initializes a new Verifier instance. +func (vp provider) New(ctx context.Context, config map[string]string) (definition.SignValidator, func() error, error) { + if ctx == nil { + return nil, nil, errors.New("context cannot be nil") + } + + return signvalidator.New(ctx, &signvalidator.Config{}) +} + +// Provider is the exported symbol that the plugin manager will look for. +var Provider = provider{} diff --git a/pkg/plugin/implementation/signVerifier/cmd/plugin_test.go b/pkg/plugin/implementation/signvalidator/cmd/plugin_test.go similarity index 96% rename from pkg/plugin/implementation/signVerifier/cmd/plugin_test.go rename to pkg/plugin/implementation/signvalidator/cmd/plugin_test.go index 85caee5..a001ebf 100644 --- a/pkg/plugin/implementation/signVerifier/cmd/plugin_test.go +++ b/pkg/plugin/implementation/signvalidator/cmd/plugin_test.go @@ -7,7 +7,7 @@ import ( // TestVerifierProviderSuccess tests successful creation of a verifier. func TestVerifierProviderSuccess(t *testing.T) { - provider := VerifierProvider{} + provider := provider{} tests := []struct { name string @@ -52,7 +52,7 @@ func TestVerifierProviderSuccess(t *testing.T) { // TestVerifierProviderFailure tests cases where verifier creation should fail. func TestVerifierProviderFailure(t *testing.T) { - provider := VerifierProvider{} + provider := provider{} tests := []struct { name string diff --git a/pkg/plugin/implementation/signVerifier/signVerifier.go b/pkg/plugin/implementation/signvalidator/signvalidator.go similarity index 79% rename from pkg/plugin/implementation/signVerifier/signVerifier.go rename to pkg/plugin/implementation/signvalidator/signvalidator.go index 963d137..c381d40 100644 --- a/pkg/plugin/implementation/signVerifier/signVerifier.go +++ b/pkg/plugin/implementation/signvalidator/signvalidator.go @@ -1,4 +1,4 @@ -package verifier +package signvalidator import ( "context" @@ -16,36 +16,36 @@ import ( type Config struct { } -// Verifier implements the Verifier interface. -type Verifier struct { +// validator implements the validator interface. +type validator struct { config *Config } // New creates a new Verifier instance. -func New(ctx context.Context, config *Config) (*Verifier, func() error, error) { - v := &Verifier{config: config} +func New(ctx context.Context, config *Config) (*validator, func() error, error) { + v := &validator{config: config} - return v, v.Close, nil + return v, nil, nil } // Verify checks the signature for the given payload and public key. -func (v *Verifier) Verify(ctx context.Context, body []byte, header []byte, publicKeyBase64 string) (bool, error) { - createdTimestamp, expiredTimestamp, signature, err := parseAuthHeader(string(header)) +func (v *validator) Validate(ctx context.Context, body []byte, header string, publicKeyBase64 string) error { + createdTimestamp, expiredTimestamp, signature, err := parseAuthHeader(header) if err != nil { // TODO: Return appropriate error code when Error Code Handling Module is ready - return false, fmt.Errorf("error parsing header: %w", err) + return fmt.Errorf("error parsing header: %w", err) } signatureBytes, err := base64.StdEncoding.DecodeString(signature) if err != nil { // TODO: Return appropriate error code when Error Code Handling Module is ready - return false, fmt.Errorf("error decoding signature: %w", err) + return fmt.Errorf("error decoding signature: %w", err) } currentTime := time.Now().Unix() if createdTimestamp > currentTime || currentTime > expiredTimestamp { // TODO: Return appropriate error code when Error Code Handling Module is ready - return false, fmt.Errorf("signature is expired or not yet valid") + return fmt.Errorf("signature is expired or not yet valid") } createdTime := time.Unix(createdTimestamp, 0) @@ -56,15 +56,15 @@ func (v *Verifier) Verify(ctx context.Context, body []byte, header []byte, publi decodedPublicKey, err := base64.StdEncoding.DecodeString(publicKeyBase64) if err != nil { // TODO: Return appropriate error code when Error Code Handling Module is ready - return false, fmt.Errorf("error decoding public key: %w", err) + return fmt.Errorf("error decoding public key: %w", err) } if !ed25519.Verify(ed25519.PublicKey(decodedPublicKey), []byte(signingString), signatureBytes) { // TODO: Return appropriate error code when Error Code Handling Module is ready - return false, fmt.Errorf("signature verification failed") + return fmt.Errorf("signature verification failed") } - return true, nil + return nil } // parseAuthHeader extracts signature values from the Authorization header. @@ -113,8 +113,3 @@ func hash(payload []byte, createdTimestamp, expiredTimestamp int64) string { return fmt.Sprintf("(created): %d\n(expires): %d\ndigest: BLAKE-512=%s", createdTimestamp, expiredTimestamp, digestB64) } - -// Close releases resources (mock implementation returning nil). -func (v *Verifier) Close() error { - return nil -} diff --git a/pkg/plugin/implementation/signVerifier/signVerifier_test.go b/pkg/plugin/implementation/signvalidator/signvalidator_test.go similarity index 92% rename from pkg/plugin/implementation/signVerifier/signVerifier_test.go rename to pkg/plugin/implementation/signvalidator/signvalidator_test.go index 36da03a..160d28b 100644 --- a/pkg/plugin/implementation/signVerifier/signVerifier_test.go +++ b/pkg/plugin/implementation/signvalidator/signvalidator_test.go @@ -1,4 +1,4 @@ -package verifier +package signvalidator import ( "context" @@ -52,14 +52,11 @@ func TestVerifySuccess(t *testing.T) { `", signature="` + signature + `"` verifier, close, _ := New(context.Background(), &Config{}) - valid, err := verifier.Verify(context.Background(), tt.body, []byte(header), publicKeyBase64) + err := verifier.Validate(context.Background(), tt.body, header, publicKeyBase64) if err != nil { t.Fatalf("Expected no error, but got: %v", err) } - if !valid { - t.Fatal("Expected signature verification to succeed") - } if close != nil { if err := close(); err != nil { t.Fatalf("Test %q failed: cleanup function returned an error: %v", tt.name, err) @@ -135,14 +132,11 @@ func TestVerifyFailure(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { verifier, close, _ := New(context.Background(), &Config{}) - valid, err := verifier.Verify(context.Background(), tt.body, []byte(tt.header), tt.pubKey) + err := verifier.Validate(context.Background(), tt.body, tt.header, tt.pubKey) if err == nil { t.Fatal("Expected an error but got none") } - if valid { - t.Fatal("Expected verification to fail") - } if close != nil { if err := close(); err != nil { t.Fatalf("Test %q failed: cleanup function returned an error: %v", tt.name, err) diff --git a/pkg/plugin/manager.go b/pkg/plugin/manager.go index 86a0b02..bd969c4 100644 --- a/pkg/plugin/manager.go +++ b/pkg/plugin/manager.go @@ -1,171 +1,381 @@ package plugin import ( + "archive/zip" "context" "fmt" + "io" + "io/fs" + "net/http" + "os" "path/filepath" "plugin" "strings" + "time" + "github.com/beckn/beckn-onix/pkg/log" "github.com/beckn/beckn-onix/pkg/plugin/definition" ) -// Config represents the plugin manager configuration. -type Config struct { - Root string `yaml:"root"` - Signer PluginConfig `yaml:"signer"` - Verifier PluginConfig `yaml:"verifier"` - Decrypter PluginConfig `yaml:"decrypter"` - Encrypter PluginConfig `yaml:"encrypter"` - Publisher PluginConfig `yaml:"publisher"` -} +// TODO: Add unit tests for the plugin manager functions to ensure proper functionality and error handling. -// PluginConfig represents configuration details for a plugin. -type PluginConfig struct { - ID string `yaml:"id"` - Config map[string]string `yaml:"config"` -} - -// Manager handles dynamic plugin loading and management. +// Manager is responsible for managing dynamically loaded plugins. type Manager struct { - sp definition.SignerProvider - vp definition.VerifierProvider - dp definition.DecrypterProvider - ep definition.EncrypterProvider - pb definition.PublisherProvider - cfg *Config + plugins map[string]*plugin.Plugin // plugins holds the dynamically loaded plugins. + closers []func() // closers contains functions to release resources when the manager is closed. } -// NewManager initializes a new Manager with the given configuration file. -func NewManager(ctx context.Context, cfg *Config) (*Manager, error) { - if cfg == nil { - return nil, fmt.Errorf("configuration cannot be nil") - } - - // Load signer plugin. - sp, err := provider[definition.SignerProvider](cfg.Root, cfg.Signer.ID) - if err != nil { - return nil, fmt.Errorf("failed to load signer plugin: %w", err) - } - - // Load publisher plugin. - pb, err := provider[definition.PublisherProvider](cfg.Root, cfg.Publisher.ID) - if err != nil { - return nil, fmt.Errorf("failed to load publisher plugin: %w", err) - } - - // Load verifier plugin. - vp, err := provider[definition.VerifierProvider](cfg.Root, cfg.Verifier.ID) - if err != nil { - return nil, fmt.Errorf("failed to load Verifier plugin: %w", err) - } - - // Load decrypter plugin. - dp, err := provider[definition.DecrypterProvider](cfg.Root, cfg.Decrypter.ID) - if err != nil { - return nil, fmt.Errorf("failed to load Decrypter plugin: %w", err) - } - - // Load encryption plugin. - ep, err := provider[definition.EncrypterProvider](cfg.Root, cfg.Encrypter.ID) - if err != nil { - return nil, fmt.Errorf("failed to load encryption plugin: %w", err) - } - - return &Manager{sp: sp, vp: vp, pb: pb, ep: ep, dp: dp, cfg: cfg}, nil +func validateMgrCfg(cfg *ManagerConfig) error { + return nil } -// provider loads a plugin dynamically and retrieves its provider instance. -func provider[T any](root, id string) (T, error) { +// NewManager initializes a new Manager instance by loading plugins from the specified configuration. +func NewManager(ctx context.Context, cfg *ManagerConfig) (*Manager, func(), error) { + if err := validateMgrCfg(cfg); err != nil { + return nil, nil, fmt.Errorf("Invalid config: %w", err) + } + log.Debugf(ctx, "RemoteRoot : %s", cfg.RemoteRoot) + if len(cfg.RemoteRoot) != 0 { + log.Debugf(ctx, "Unzipping files from : %s to : %s", cfg.RemoteRoot, cfg.Root) + if err := unzip(cfg.RemoteRoot, cfg.Root); err != nil { + return nil, nil, err + } + } + plugins, err := plugins(ctx, cfg) + if err != nil { + return nil, nil, err + } + + closers := []func(){} + return &Manager{plugins: plugins, closers: closers}, func() { + for _, closer := range closers { + closer() + } + }, nil +} + +func plugins(ctx context.Context, cfg *ManagerConfig) (map[string]*plugin.Plugin, error) { + plugins := make(map[string]*plugin.Plugin) + + err := filepath.WalkDir(cfg.Root, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + + if d.IsDir() { + return nil // Skip directories + } + + if strings.HasSuffix(d.Name(), ".so") { + id := strings.TrimSuffix(d.Name(), ".so") // Extract plugin ID + p, elapsed, err := loadPlugin(ctx, path, id) + if err != nil { + return err + } + plugins[id] = p + log.Debugf(ctx, "Loaded plugin: %s in %s", id, elapsed) + } + return nil + }) + + if err != nil { + return nil, err + } + + return plugins, nil +} + +// loadPlugin attempts to load a plugin from the given path and logs the execution time. +func loadPlugin(ctx context.Context, path, id string) (*plugin.Plugin, time.Duration, error) { + log.Debugf(ctx, "Loading plugin: %s", id) + start := time.Now() + + p, err := plugin.Open(path) + if err != nil { + return nil, 0, fmt.Errorf("failed to open plugin %s: %w", id, err) + } + + elapsed := time.Since(start) + return p, elapsed, nil +} + +func provider[T any](plugins map[string]*plugin.Plugin, id string) (T, error) { var zero T - if len(strings.TrimSpace(id)) == 0 { - return zero, nil + pgn, ok := plugins[id] + if !ok { + return zero, fmt.Errorf("plugin %s not found", id) } - - p, err := plugin.Open(pluginPath(root, id)) + provider, err := pgn.Lookup("Provider") if err != nil { - return zero, fmt.Errorf("failed to open plugin %s: %w", id, err) + return zero, fmt.Errorf("failed to lookup Provider for %s: %w", id, err) } + log.Debugf(context.Background(), "Provider type: %T\n", provider) - symbol, err := p.Lookup("Provider") - if err != nil { - return zero, fmt.Errorf("failed to find Provider symbol in plugin %s: %w", id, err) - } - - prov, ok := symbol.(*T) + pp, ok := provider.(T) if !ok { return zero, fmt.Errorf("failed to cast Provider for %s", id) } - - return *prov, nil + log.Debugf(context.Background(), "Casting successful for: %s", provider) + return pp, nil } -// pluginPath constructs the path to the plugin shared object file. -func pluginPath(root, id string) string { - return filepath.Join(root, id+".so") -} - -// Signer retrieves the signing plugin instance. -func (m *Manager) Signer(ctx context.Context) (definition.Signer, func() error, error) { - if m.sp == nil { - return nil, nil, fmt.Errorf("signing plugin provider not loaded") - } - - signer, close, err := m.sp.New(ctx, m.cfg.Signer.Config) +// Publisher returns a Publisher instance based on the provided configuration. +// It reuses the loaded provider and registers a cleanup function. +func (m *Manager) Publisher(ctx context.Context, cfg *Config) (definition.Publisher, error) { + pp, err := provider[definition.PublisherProvider](m.plugins, cfg.ID) if err != nil { - return nil, nil, fmt.Errorf("failed to initialize signer: %w", err) + return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err) } - return signer, close, nil -} - -// Verifier retrieves the verification plugin instance. -func (m *Manager) Verifier(ctx context.Context) (definition.Verifier, func() error, error) { - if m.vp == nil { - return nil, nil, fmt.Errorf("Verifier plugin provider not loaded") - } - - Verifier, close, err := m.vp.New(ctx, m.cfg.Verifier.Config) + p, closer, err := pp.New(ctx, cfg.Config) if err != nil { - return nil, nil, fmt.Errorf("failed to initialize Verifier: %w", err) + return nil, err } - return Verifier, close, nil + m.addCloser(closer) + return p, nil } -// Decrypter retrieves the decryption plugin instance. -func (m *Manager) Decrypter(ctx context.Context) (definition.Decrypter, func() error, error) { - if m.dp == nil { - return nil, nil, fmt.Errorf("decrypter plugin provider not loaded") +// addCloser appends a cleanup function to the Manager's closers list. +func (m *Manager) addCloser(closer func()) { + if closer != nil { + m.closers = append(m.closers, closer) } +} - decrypter, close, err := m.dp.New(ctx, m.cfg.Decrypter.Config) +// SchemaValidator returns a SchemaValidator instance based on the provided configuration. +// It registers a cleanup function for resource management. +func (m *Manager) SchemaValidator(ctx context.Context, cfg *Config) (definition.SchemaValidator, error) { + vp, err := provider[definition.SchemaValidatorProvider](m.plugins, cfg.ID) if err != nil { - return nil, nil, fmt.Errorf("failed to initialize Decrypter: %w", err) + return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err) } - return decrypter, close, nil -} - -// Encrypter retrieves the encryption plugin instance. -func (m *Manager) Encrypter(ctx context.Context) (definition.Encrypter, func() error, error) { - if m.ep == nil { - return nil, nil, fmt.Errorf("encryption plugin provider not loaded") - } - - encrypter, close, err := m.ep.New(ctx, m.cfg.Encrypter.Config) + v, closer, err := vp.New(ctx, cfg.Config) if err != nil { - return nil, nil, fmt.Errorf("failed to initialize encrypter: %w", err) + return nil, err } - return encrypter, close, nil + if closer != nil { + m.addCloser(func() { + if err := closer(); err != nil { + panic(err) + } + }) + } + return v, nil } -// Publisher retrieves the publisher plugin instance. -func (m *Manager) Publisher(ctx context.Context) (definition.Publisher, error) { - if m.pb == nil { - return nil, fmt.Errorf("publisher plugin provider not loaded") - } - - publisher, err := m.pb.New(ctx, m.cfg.Publisher.Config) +// Router returns a Router instance based on the provided configuration. +// It registers a cleanup function for resource management. +func (m *Manager) Router(ctx context.Context, cfg *Config) (definition.Router, error) { + rp, err := provider[definition.RouterProvider](m.plugins, cfg.ID) if err != nil { - return nil, fmt.Errorf("failed to initialize publisher: %w", err) + return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err) } - return publisher, nil + router, closer, err := rp.New(ctx, cfg.Config) + if err != nil { + return nil, err + } + if closer != nil { + m.addCloser(func() { + if err := closer(); err != nil { + panic(err) + } + }) + } + return router, nil +} + +// Middleware returns an HTTP middleware function based on the provided configuration. +func (m *Manager) Middleware(ctx context.Context, cfg *Config) (func(http.Handler) http.Handler, error) { + mwp, err := provider[definition.MiddlewareProvider](m.plugins, cfg.ID) + if err != nil { + return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err) + } + return mwp.New(ctx, cfg.Config) +} + +// Step returns a Step instance based on the provided configuration. +func (m *Manager) Step(ctx context.Context, cfg *Config) (definition.Step, error) { + sp, err := provider[definition.StepProvider](m.plugins, cfg.ID) + if err != nil { + return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err) + } + step, closer, error := sp.New(ctx, cfg.Config) + if closer != nil { + m.closers = append(m.closers, closer) + } + return step, error +} + +// Cache returns a Cache instance based on the provided configuration. +// It registers a cleanup function for resource management. +func (m *Manager) Cache(ctx context.Context, cfg *Config) (definition.Cache, error) { + cp, err := provider[definition.CacheProvider](m.plugins, cfg.ID) + if err != nil { + return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err) + } + c, close, err := cp.New(ctx, cfg.Config) + if err != nil { + return nil, err + } + m.addCloser(func() { + if err := close(); err != nil { + panic(err) + } + }) + return c, nil +} + +// Signer returns a Signer instance based on the provided configuration. +// It registers a cleanup function for resource management. +func (m *Manager) Signer(ctx context.Context, cfg *Config) (definition.Signer, error) { + sp, err := provider[definition.SignerProvider](m.plugins, cfg.ID) + if err != nil { + return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err) + } + s, closer, err := sp.New(ctx, cfg.Config) + if err != nil { + return nil, err + } + if closer != nil { + m.addCloser(func() { + if err := closer(); err != nil { + panic(err) + } + }) + } + return s, nil +} + +// Encryptor returns an Encrypter instance based on the provided configuration. +// It registers a cleanup function for resource management. +func (m *Manager) Encryptor(ctx context.Context, cfg *Config) (definition.Encrypter, error) { + ep, err := provider[definition.EncrypterProvider](m.plugins, cfg.ID) + if err != nil { + return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err) + } + encrypter, closer, err := ep.New(ctx, cfg.Config) + if err != nil { + return nil, err + } + if closer != nil { + m.addCloser(func() { + if err := closer(); err != nil { + panic(err) + } + }) + } + return encrypter, nil +} + +// Decryptor returns a Decrypter instance based on the provided configuration. +// It registers a cleanup function for resource management. +func (m *Manager) Decryptor(ctx context.Context, cfg *Config) (definition.Decrypter, error) { + dp, err := provider[definition.DecrypterProvider](m.plugins, cfg.ID) + if err != nil { + return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err) + } + + decrypter, closer, err := dp.New(ctx, cfg.Config) + if err != nil { + return nil, err + } + + if closer != nil { + m.addCloser(func() { + if err := closer(); err != nil { + panic(err) + } + }) + } + + return decrypter, nil +} + +// SignValidator returns a SignValidator instance based on the provided configuration. +// It registers a cleanup function for resource management. +func (m *Manager) SignValidator(ctx context.Context, cfg *Config) (definition.SignValidator, error) { + svp, err := provider[definition.SignValidatorProvider](m.plugins, cfg.ID) + if err != nil { + return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err) + } + v, closer, err := svp.New(ctx, cfg.Config) + if err != nil { + return nil, err + } + if closer != nil { + m.addCloser(func() { + if err := closer(); err != nil { + panic(err) + } + }) + } + return v, nil +} + +// KeyManager returns a KeyManager instance based on the provided configuration. +// It reuses the loaded provider. +func (m *Manager) KeyManager(ctx context.Context, cache definition.Cache, rClient definition.RegistryLookup, cfg *Config) (definition.KeyManager, error) { + + kmp, err := provider[definition.KeyManagerProvider](m.plugins, cfg.ID) + if err != nil { + return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err) + } + km, close, err := kmp.New(ctx, cache, rClient, cfg.Config) + if err != nil { + return nil, err + } + m.addCloser(func() { + if err := close(); err != nil { + panic(err) + } + }) + return km, nil +} + +// Validator implements handler.PluginManager. +func (m *Manager) Validator(ctx context.Context, cfg *Config) (definition.SchemaValidator, error) { + panic("unimplemented") +} + +// Unzip extracts a ZIP file to the specified destination +func unzip(src, dest string) error { + r, err := zip.OpenReader(src) + if err != nil { + return err + } + defer r.Close() + + // Ensure the destination directory exists + if err := os.MkdirAll(dest, 0755); err != nil { + return err + } + + for _, f := range r.File { + + fpath := filepath.Join(dest, f.Name) + // Ensure directory exists + log.Debugf(context.Background(), "Pain : fpath: %s,filepath.Dir(fpath): %s", fpath, filepath.Dir(fpath)) + if err := os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil { + return err + } + // Open the file inside the zip + srcFile, err := f.Open() + if err != nil { + return err + } + defer srcFile.Close() + + // Create the destination file + dstFile, err := os.Create(fpath) + if err != nil { + return err + } + defer dstFile.Close() + + // Copy file contents + if _, err := io.Copy(dstFile, srcFile); err != nil { + return err + } + } + + return nil } diff --git a/pkg/response/response.go b/pkg/response/response.go index c6d1094..a5ab0c4 100644 --- a/pkg/response/response.go +++ b/pkg/response/response.go @@ -7,40 +7,10 @@ import ( "fmt" "net/http" - "strings" - "github.com/beckn/beckn-onix/pkg/model" ) - -type Error struct { - Code string `json:"code,omitempty"` - Message string `json:"message,omitempty"` - Paths string `json:"paths,omitempty"` -} - - -// SchemaValidationErr represents a collection of schema validation failures. -type SchemaValidationErr struct { - Errors []Error -} - -// Error implements the error interface for SchemaValidationErr. -func (e *SchemaValidationErr) Error() string { - var errorMessages []string - for _, err := range e.Errors { - errorMessages = append(errorMessages, fmt.Sprintf("%s: %s", err.Paths, err.Message)) - } - return strings.Join(errorMessages, "; ") -} - -type Message struct { - Ack struct { - Status string `json:"status,omitempty"` - } `json:"ack,omitempty"` - Error *Error `json:"error,omitempty"` -} - +// SendAck sends an acknowledgment response (ACK) to the client. func SendAck(w http.ResponseWriter) { resp := &model.Response{ Message: model.Message{ @@ -61,7 +31,8 @@ func SendAck(w http.ResponseWriter) { } } -func nack(w http.ResponseWriter, err *model.Error, status int, ctx context.Context) { +// nack sends a negative acknowledgment (NACK) response with an error message. +func nack(ctx context.Context, w http.ResponseWriter, err *model.Error, status int) { resp := &model.Response{ Message: model.Message{ Ack: model.Ack{ @@ -82,6 +53,7 @@ func nack(w http.ResponseWriter, err *model.Error, status int, ctx context.Conte } } +// internalServerError generates an internal server error response. func internalServerError(ctx context.Context) *model.Error { return &model.Error{ Code: http.StatusText(http.StatusInternalServerError), @@ -89,6 +61,7 @@ func internalServerError(ctx context.Context) *model.Error { } } +// SendNack processes different types of errors and sends an appropriate NACK response. func SendNack(ctx context.Context, w http.ResponseWriter, err error) { var schemaErr *model.SchemaValidationErr var signErr *model.SignValidationErr @@ -97,19 +70,19 @@ func SendNack(ctx context.Context, w http.ResponseWriter, err error) { switch { case errors.As(err, &schemaErr): - nack(w, schemaErr.BecknError(), http.StatusBadRequest, ctx) + nack(ctx, w, schemaErr.BecknError(), http.StatusBadRequest) return case errors.As(err, &signErr): - nack(w, signErr.BecknError(), http.StatusUnauthorized, ctx) + nack(ctx, w, signErr.BecknError(), http.StatusUnauthorized) return case errors.As(err, &badReqErr): - nack(w, badReqErr.BecknError(), http.StatusBadRequest, ctx) + nack(ctx, w, badReqErr.BecknError(), http.StatusBadRequest) return case errors.As(err, ¬FoundErr): - nack(w, notFoundErr.BecknError(), http.StatusNotFound, ctx) + nack(ctx, w, notFoundErr.BecknError(), http.StatusNotFound) return default: - nack(w, internalServerError(ctx), http.StatusInternalServerError, ctx) + nack(ctx, w, internalServerError(ctx), http.StatusInternalServerError) return } } diff --git a/pkg/response/response_test.go b/pkg/response/response_test.go index 7e62aca..96f1caa 100644 --- a/pkg/response/response_test.go +++ b/pkg/response/response_test.go @@ -126,21 +126,6 @@ func TestSendNack(t *testing.T) { } } -func TestSchemaValidationErr_Error(t *testing.T) { - // Create sample validation errors - validationErrors := []Error{ - {Paths: "name", Message: "Name is required"}, - {Paths: "email", Message: "Invalid email format"}, - } - err := SchemaValidationErr{Errors: validationErrors} - expected := "name: Name is required; email: Invalid email format" - if err.Error() != expected { - t.Errorf("err.Error() = %s, want %s", - err.Error(), expected) - - } -} - func compareJSON(expected, actual map[string]interface{}) bool { expectedBytes, _ := json.Marshal(expected) actualBytes, _ := json.Marshal(actual) @@ -234,7 +219,7 @@ func TestNack_1(t *testing.T) { return } - nack(w, tt.err, tt.status, ctx) + nack(ctx, w, tt.err, tt.status) if !tt.useBadWrite { recorder, ok := w.(*httptest.ResponseRecorder) if !ok {