diff --git a/pkg/plugin/implementation/publisher/cmd/plugin.go b/pkg/plugin/implementation/publisher/cmd/plugin.go index 647edaf..ccf87fa 100644 --- a/pkg/plugin/implementation/publisher/cmd/plugin.go +++ b/pkg/plugin/implementation/publisher/cmd/plugin.go @@ -2,22 +2,18 @@ package main import ( "context" - "fmt" "github.com/beckn/beckn-onix/pkg/log" "github.com/beckn/beckn-onix/pkg/plugin/definition" "github.com/beckn/beckn-onix/pkg/plugin/implementation/publisher" - "github.com/rabbitmq/amqp091-go" ) // publisherProvider implements the PublisherProvider interface. // It is responsible for creating a new Publisher instance. type publisherProvider struct{} -// New creates a new Publisher instance based on the provided configuration map. -// It also returns a cleanup function to close resources and any potential errors encountered. +// New creates a new Publisher instance based on the provided configuration. func (p *publisherProvider) New(ctx context.Context, config map[string]string) (definition.Publisher, func() error, error) { - // Step 1: Map config cfg := &publisher.Config{ Addr: config["addr"], Exchange: config["exchange"], @@ -27,69 +23,14 @@ func (p *publisherProvider) New(ctx context.Context, config map[string]string) ( } log.Debugf(ctx, "Publisher config mapped: %+v", cfg) - // Step 2: Validate - if err := publisher.Validate(cfg); err != nil { - log.Errorf(ctx, err, "Publisher config validation failed") + pub, cleanup, err := publisher.New(cfg) + if err != nil { + log.Errorf(ctx, err, "Failed to create publisher instance") return nil, nil, err } - log.Infof(ctx, "Publisher config validated successfully") - - // Step 3:URL - connURL, err := publisher.GetConnURL(cfg) - if err != nil { - log.Errorf(ctx, err, "Failed to build RabbitMQ connection URL") - return nil, nil, fmt.Errorf("failed to build connection URL: %w", err) - } - log.Debugf(ctx, "RabbitMQ connection URL built: %s", connURL) - // Step 4: Connect - conn, err := amqp091.Dial(connURL) - if err != nil { - log.Errorf(ctx, err, "Failed to connect to RabbitMQ") - return nil, nil, fmt.Errorf("%w: %v", publisher.ErrConnectionFailed, err) - } - log.Infof(ctx, "Connected to RabbitMQ") - - ch, err := conn.Channel() - if err != nil { - conn.Close() - log.Errorf(ctx, err, "Failed to open RabbitMQ channel") - return nil, nil, fmt.Errorf("%w: %v", publisher.ErrChannelFailed, err) - } - log.Infof(ctx, "RabbitMQ channel opened successfully") - - // Step 5: Declare Exchange - err = ch.ExchangeDeclare( - cfg.Exchange, - "topic", - cfg.Durable, - false, - false, - false, - nil, - ) - if err != nil { - ch.Close() - conn.Close() - log.Errorf(ctx, err, "Failed to declare exchange: %s", cfg.Exchange) - return nil, nil, fmt.Errorf("%w: %v", publisher.ErrExchangeDeclare, err) - } - log.Infof(ctx, "RabbitMQ exchange declared successfully: %s", cfg.Exchange) - - // Step 6: Create publisher instance - publisher := &publisher.Publisher{ - Conn: conn, - Channel: ch, - Config: cfg, - } - - cleanup := func() error { - log.Infof(ctx, "Cleaning up RabbitMQ resources") - _ = ch.Close() - return conn.Close() - } log.Infof(ctx, "Publisher instance created successfully") - return publisher, cleanup, nil + return pub, cleanup, nil } // Provider is the instance of publisherProvider that implements the PublisherProvider interface. diff --git a/pkg/plugin/implementation/publisher/cmd/plugin_test.go b/pkg/plugin/implementation/publisher/cmd/plugin_test.go new file mode 100644 index 0000000..e3f9837 --- /dev/null +++ b/pkg/plugin/implementation/publisher/cmd/plugin_test.go @@ -0,0 +1,106 @@ +package main + +import ( + "context" + "errors" + "strings" + "testing" + + "github.com/beckn/beckn-onix/pkg/plugin/implementation/publisher" + "github.com/rabbitmq/amqp091-go" +) + +type mockChannel struct{} + +func (m *mockChannel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) error { + return nil +} +func (m *mockChannel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error { + return nil +} +func (m *mockChannel) Close() error { + return nil +} + +func TestPublisherProvider_New_Success(t *testing.T) { + // Save original dialFunc and channelFunc + originalDialFunc := publisher.DialFunc + originalChannelFunc := publisher.ChannelFunc + defer func() { + publisher.DialFunc = originalDialFunc + publisher.ChannelFunc = originalChannelFunc + }() + + // Override mocks + publisher.DialFunc = func(url string) (*amqp091.Connection, error) { + return nil, nil + } + publisher.ChannelFunc = func(conn *amqp091.Connection) (publisher.Channel, error) { + return &mockChannel{}, nil + } + + t.Setenv("RABBITMQ_USERNAME", "guest") + t.Setenv("RABBITMQ_PASSWORD", "guest") + + config := map[string]string{ + "addr": "localhost", + "exchange": "test-exchange", + "routing_key": "test.key", + "durable": "true", + "use_tls": "false", + } + + ctx := context.Background() + pub, cleanup, err := Provider.New(ctx, config) + + if err != nil { + t.Fatalf("Provider.New returned error: %v", err) + } + if pub == nil { + t.Fatal("Expected non-nil publisher") + } + if cleanup == nil { + t.Fatal("Expected non-nil cleanup function") + } + + if err := cleanup(); err != nil { + t.Errorf("Cleanup returned error: %v", err) + } +} + +func TestPublisherProvider_New_Failure(t *testing.T) { + // Save and restore dialFunc + originalDialFunc := publisher.DialFunc + defer func() { publisher.DialFunc = originalDialFunc }() + + // Simulate dial failure + publisher.DialFunc = func(url string) (*amqp091.Connection, error) { + return nil, errors.New("dial failed") + } + + t.Setenv("RABBITMQ_USERNAME", "guest") + t.Setenv("RABBITMQ_PASSWORD", "guest") + + config := map[string]string{ + "addr": "localhost", + "exchange": "test-exchange", + "routing_key": "test.key", + "durable": "true", + } + + ctx := context.Background() + pub, cleanup, err := Provider.New(ctx, config) + + if err == nil { + t.Fatal("Expected error from Provider.New but got nil") + } + if !strings.Contains(err.Error(), "dial failed") { + t.Errorf("Expected 'dial failed' error, got: %v", err) + } + if pub != nil { + t.Errorf("Expected nil publisher, got: %v", pub) + } + if cleanup != nil { + t.Error("Expected nil cleanup, got non-nil") + } +} diff --git a/pkg/plugin/implementation/publisher/publisher.go b/pkg/plugin/implementation/publisher/publisher.go index 5623775..60f531d 100644 --- a/pkg/plugin/implementation/publisher/publisher.go +++ b/pkg/plugin/implementation/publisher/publisher.go @@ -25,6 +25,8 @@ type Config struct { // Channel defines the interface for publishing messages to RabbitMQ. type Channel interface { PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) error + ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error + Close() error } // Publisher manages the RabbitMQ connection and channel to publish messages. @@ -122,3 +124,73 @@ func (p *Publisher) Publish(ctx context.Context, routingKey string, msg []byte) log.Infof(ctx, "Message published successfully to Exchange: %s, RoutingKey: %s", p.Config.Exchange, routingKey) return nil } + +// DialFunc is a function variable used to establish a connection to RabbitMQ. +var DialFunc = amqp091.Dial + +// ChannelFunc is a function variable used to open a channel on the given RabbitMQ connection. +var ChannelFunc = func(conn *amqp091.Connection) (Channel, error) { + return conn.Channel() +} + +// New initializes a new Publisher with the given config, opens a connection, +// channel, and declares the exchange. Returns the publisher and a cleanup function. +func New(cfg *Config) (*Publisher, func() error, error) { + // Step 1: Validate config + if err := Validate(cfg); err != nil { + return nil, nil, err + } + + // Step 2: Build connection URL + connURL, err := GetConnURL(cfg) + if err != nil { + return nil, nil, fmt.Errorf("%w: %v", ErrConnectionFailed, err) + } + + // Step 3: Dial connection + conn, err := DialFunc(connURL) + if err != nil { + return nil, nil, fmt.Errorf("%w: %v", ErrConnectionFailed, err) + } + + // Step 4: Open channel + ch, err := ChannelFunc(conn) + if err != nil { + conn.Close() + return nil, nil, fmt.Errorf("%w: %v", ErrChannelFailed, err) + } + + // Step 5: Declare exchange + if err := ch.ExchangeDeclare( + cfg.Exchange, + "topic", + cfg.Durable, + false, + false, + false, + nil, + ); err != nil { + ch.Close() + conn.Close() + return nil, nil, fmt.Errorf("%w: %v", ErrExchangeDeclare, err) + } + + // Step 6: Construct publisher + pub := &Publisher{ + Conn: conn, + Channel: ch, + Config: cfg, + } + + cleanup := func() error { + if ch != nil { + _ = ch.Close() + } + if conn != nil { + return conn.Close() + } + return nil + } + + return pub, cleanup, nil +} diff --git a/pkg/plugin/implementation/publisher/publisher_test.go b/pkg/plugin/implementation/publisher/publisher_test.go index 5002f29..b54fa9b 100644 --- a/pkg/plugin/implementation/publisher/publisher_test.go +++ b/pkg/plugin/implementation/publisher/publisher_test.go @@ -2,8 +2,9 @@ package publisher import ( "context" - "errors" + "fmt" "os" + "strings" "testing" "github.com/rabbitmq/amqp091-go" @@ -153,32 +154,35 @@ func TestValidateFailure(t *testing.T) { } } -type mockChannel struct { +type mockChannelForPublish struct { published bool - args amqp091.Publishing exchange string key string + body []byte fail bool } -func (m *mockChannel) PublishWithContext( - _ context.Context, - exchange, key string, - mandatory, immediate bool, - msg amqp091.Publishing, -) error { +func (m *mockChannelForPublish) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) error { if m.fail { - return errors.New("mock publish failure") + return fmt.Errorf("simulated publish failure") } m.published = true - m.args = msg m.exchange = exchange m.key = key + m.body = msg.Body + return nil +} + +func (m *mockChannelForPublish) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error { + return nil +} + +func (m *mockChannelForPublish) Close() error { return nil } func TestPublishSuccess(t *testing.T) { - mockCh := &mockChannel{} + mockCh := &mockChannelForPublish{} p := &Publisher{ Channel: mockCh, @@ -203,7 +207,7 @@ func TestPublishSuccess(t *testing.T) { } func TestPublishFailure(t *testing.T) { - mockCh := &mockChannel{fail: true} + mockCh := &mockChannelForPublish{fail: true} p := &Publisher{ Channel: mockCh, @@ -218,3 +222,147 @@ func TestPublishFailure(t *testing.T) { t.Error("expected error from failed publish, got nil") } } + +type mockChannel struct{} + +func (m *mockChannel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) error { + return nil +} +func (m *mockChannel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error { + return nil +} +func (m *mockChannel) Close() error { + return nil +} + +type mockConnection struct{} + +func (m *mockConnection) Close() error { + return nil +} + +func TestNewPublisherSucess(t *testing.T) { + originalDialFunc := DialFunc + originalChannelFunc := ChannelFunc + defer func() { + DialFunc = originalDialFunc + ChannelFunc = originalChannelFunc + }() + + // mockedConn := &mockConnection{} + + DialFunc = func(url string) (*amqp091.Connection, error) { + return nil, nil + } + + ChannelFunc = func(conn *amqp091.Connection) (Channel, error) { + return &mockChannel{}, nil + } + + cfg := &Config{ + Addr: "localhost", + Exchange: "test-ex", + Durable: true, + RoutingKey: "test.key", + } + + t.Setenv("RABBITMQ_USERNAME", "user") + t.Setenv("RABBITMQ_PASSWORD", "pass") + + pub, cleanup, err := New(cfg) + if err != nil { + t.Fatalf("New() failed: %v", err) + } + if pub == nil { + t.Fatal("Publisher should not be nil") + } + if cleanup == nil { + t.Fatal("Cleanup should not be nil") + } + if err := cleanup(); err != nil { + t.Errorf("Cleanup failed: %v", err) + } +} + +type mockChannelFailDeclare struct{} + +func (m *mockChannelFailDeclare) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) error { + return nil +} +func (m *mockChannelFailDeclare) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error { + return fmt.Errorf("simulated exchange declare error") +} +func (m *mockChannelFailDeclare) Close() error { + return nil +} + +func TestNewPublisherFailures(t *testing.T) { + tests := []struct { + name string + cfg *Config + dialFunc func(url string) (*amqp091.Connection, error) // Mocked dial function + envVars map[string]string + expectedError string + }{ + { + name: "ValidateFailure", + cfg: &Config{}, // invalid config + expectedError: "missing config.Addr", + }, + { + name: "GetConnURLFailure", + cfg: &Config{ + Addr: "localhost", + Exchange: "test-ex", + Durable: true, + RoutingKey: "test.key", + }, + envVars: map[string]string{ + "RABBITMQ_USERNAME": "", + "RABBITMQ_PASSWORD": "", + }, + expectedError: "missing RabbitMQ credentials in environment", + }, + { + name: "ConnectionFailure", + cfg: &Config{ + Addr: "localhost", + Exchange: "test-ex", + Durable: true, + RoutingKey: "test.key", + }, + dialFunc: func(url string) (*amqp091.Connection, error) { + return nil, fmt.Errorf("simulated connection failure") + }, + envVars: map[string]string{ + "RABBITMQ_USERNAME": "user", + "RABBITMQ_PASSWORD": "pass", + }, + expectedError: "failed to connect to RabbitMQ", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Set environment variables + for key, value := range tt.envVars { + t.Setenv(key, value) + } + + // Mock dialFunc if needed + originalDialFunc := DialFunc + if tt.dialFunc != nil { + DialFunc = tt.dialFunc + defer func() { + DialFunc = originalDialFunc + }() + } + + _, _, err := New(tt.cfg) + + if err == nil || (tt.expectedError != "" && !strings.Contains(err.Error(), tt.expectedError)) { + t.Errorf("Test %s failed: expected error containing %v, got: %v", tt.name, tt.expectedError, err) + } + }) + } +}