Resolved PR Review comments

This commit is contained in:
MohitKatare-protean
2025-05-13 12:30:34 +05:30
parent 2e0b5834d7
commit e5cccc30f8
4 changed files with 344 additions and 77 deletions

View File

@@ -2,22 +2,18 @@ package main
import ( import (
"context" "context"
"fmt"
"github.com/beckn/beckn-onix/pkg/log" "github.com/beckn/beckn-onix/pkg/log"
"github.com/beckn/beckn-onix/pkg/plugin/definition" "github.com/beckn/beckn-onix/pkg/plugin/definition"
"github.com/beckn/beckn-onix/pkg/plugin/implementation/publisher" "github.com/beckn/beckn-onix/pkg/plugin/implementation/publisher"
"github.com/rabbitmq/amqp091-go"
) )
// publisherProvider implements the PublisherProvider interface. // publisherProvider implements the PublisherProvider interface.
// It is responsible for creating a new Publisher instance. // It is responsible for creating a new Publisher instance.
type publisherProvider struct{} type publisherProvider struct{}
// New creates a new Publisher instance based on the provided configuration map. // New creates a new Publisher instance based on the provided configuration.
// It also returns a cleanup function to close resources and any potential errors encountered.
func (p *publisherProvider) New(ctx context.Context, config map[string]string) (definition.Publisher, func() error, error) { func (p *publisherProvider) New(ctx context.Context, config map[string]string) (definition.Publisher, func() error, error) {
// Step 1: Map config
cfg := &publisher.Config{ cfg := &publisher.Config{
Addr: config["addr"], Addr: config["addr"],
Exchange: config["exchange"], Exchange: config["exchange"],
@@ -27,69 +23,14 @@ func (p *publisherProvider) New(ctx context.Context, config map[string]string) (
} }
log.Debugf(ctx, "Publisher config mapped: %+v", cfg) log.Debugf(ctx, "Publisher config mapped: %+v", cfg)
// Step 2: Validate pub, cleanup, err := publisher.New(cfg)
if err := publisher.Validate(cfg); err != nil { if err != nil {
log.Errorf(ctx, err, "Publisher config validation failed") log.Errorf(ctx, err, "Failed to create publisher instance")
return nil, nil, err return nil, nil, err
} }
log.Infof(ctx, "Publisher config validated successfully")
// Step 3:URL
connURL, err := publisher.GetConnURL(cfg)
if err != nil {
log.Errorf(ctx, err, "Failed to build RabbitMQ connection URL")
return nil, nil, fmt.Errorf("failed to build connection URL: %w", err)
}
log.Debugf(ctx, "RabbitMQ connection URL built: %s", connURL)
// Step 4: Connect
conn, err := amqp091.Dial(connURL)
if err != nil {
log.Errorf(ctx, err, "Failed to connect to RabbitMQ")
return nil, nil, fmt.Errorf("%w: %v", publisher.ErrConnectionFailed, err)
}
log.Infof(ctx, "Connected to RabbitMQ")
ch, err := conn.Channel()
if err != nil {
conn.Close()
log.Errorf(ctx, err, "Failed to open RabbitMQ channel")
return nil, nil, fmt.Errorf("%w: %v", publisher.ErrChannelFailed, err)
}
log.Infof(ctx, "RabbitMQ channel opened successfully")
// Step 5: Declare Exchange
err = ch.ExchangeDeclare(
cfg.Exchange,
"topic",
cfg.Durable,
false,
false,
false,
nil,
)
if err != nil {
ch.Close()
conn.Close()
log.Errorf(ctx, err, "Failed to declare exchange: %s", cfg.Exchange)
return nil, nil, fmt.Errorf("%w: %v", publisher.ErrExchangeDeclare, err)
}
log.Infof(ctx, "RabbitMQ exchange declared successfully: %s", cfg.Exchange)
// Step 6: Create publisher instance
publisher := &publisher.Publisher{
Conn: conn,
Channel: ch,
Config: cfg,
}
cleanup := func() error {
log.Infof(ctx, "Cleaning up RabbitMQ resources")
_ = ch.Close()
return conn.Close()
}
log.Infof(ctx, "Publisher instance created successfully") log.Infof(ctx, "Publisher instance created successfully")
return publisher, cleanup, nil return pub, cleanup, nil
} }
// Provider is the instance of publisherProvider that implements the PublisherProvider interface. // Provider is the instance of publisherProvider that implements the PublisherProvider interface.

View File

@@ -0,0 +1,106 @@
package main
import (
"context"
"errors"
"strings"
"testing"
"github.com/beckn/beckn-onix/pkg/plugin/implementation/publisher"
"github.com/rabbitmq/amqp091-go"
)
type mockChannel struct{}
func (m *mockChannel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) error {
return nil
}
func (m *mockChannel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error {
return nil
}
func (m *mockChannel) Close() error {
return nil
}
func TestPublisherProvider_New_Success(t *testing.T) {
// Save original dialFunc and channelFunc
originalDialFunc := publisher.DialFunc
originalChannelFunc := publisher.ChannelFunc
defer func() {
publisher.DialFunc = originalDialFunc
publisher.ChannelFunc = originalChannelFunc
}()
// Override mocks
publisher.DialFunc = func(url string) (*amqp091.Connection, error) {
return nil, nil
}
publisher.ChannelFunc = func(conn *amqp091.Connection) (publisher.Channel, error) {
return &mockChannel{}, nil
}
t.Setenv("RABBITMQ_USERNAME", "guest")
t.Setenv("RABBITMQ_PASSWORD", "guest")
config := map[string]string{
"addr": "localhost",
"exchange": "test-exchange",
"routing_key": "test.key",
"durable": "true",
"use_tls": "false",
}
ctx := context.Background()
pub, cleanup, err := Provider.New(ctx, config)
if err != nil {
t.Fatalf("Provider.New returned error: %v", err)
}
if pub == nil {
t.Fatal("Expected non-nil publisher")
}
if cleanup == nil {
t.Fatal("Expected non-nil cleanup function")
}
if err := cleanup(); err != nil {
t.Errorf("Cleanup returned error: %v", err)
}
}
func TestPublisherProvider_New_Failure(t *testing.T) {
// Save and restore dialFunc
originalDialFunc := publisher.DialFunc
defer func() { publisher.DialFunc = originalDialFunc }()
// Simulate dial failure
publisher.DialFunc = func(url string) (*amqp091.Connection, error) {
return nil, errors.New("dial failed")
}
t.Setenv("RABBITMQ_USERNAME", "guest")
t.Setenv("RABBITMQ_PASSWORD", "guest")
config := map[string]string{
"addr": "localhost",
"exchange": "test-exchange",
"routing_key": "test.key",
"durable": "true",
}
ctx := context.Background()
pub, cleanup, err := Provider.New(ctx, config)
if err == nil {
t.Fatal("Expected error from Provider.New but got nil")
}
if !strings.Contains(err.Error(), "dial failed") {
t.Errorf("Expected 'dial failed' error, got: %v", err)
}
if pub != nil {
t.Errorf("Expected nil publisher, got: %v", pub)
}
if cleanup != nil {
t.Error("Expected nil cleanup, got non-nil")
}
}

View File

@@ -25,6 +25,8 @@ type Config struct {
// Channel defines the interface for publishing messages to RabbitMQ. // Channel defines the interface for publishing messages to RabbitMQ.
type Channel interface { type Channel interface {
PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) error PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) error
ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error
Close() error
} }
// Publisher manages the RabbitMQ connection and channel to publish messages. // Publisher manages the RabbitMQ connection and channel to publish messages.
@@ -122,3 +124,73 @@ func (p *Publisher) Publish(ctx context.Context, routingKey string, msg []byte)
log.Infof(ctx, "Message published successfully to Exchange: %s, RoutingKey: %s", p.Config.Exchange, routingKey) log.Infof(ctx, "Message published successfully to Exchange: %s, RoutingKey: %s", p.Config.Exchange, routingKey)
return nil return nil
} }
// DialFunc is a function variable used to establish a connection to RabbitMQ.
var DialFunc = amqp091.Dial
// ChannelFunc is a function variable used to open a channel on the given RabbitMQ connection.
var ChannelFunc = func(conn *amqp091.Connection) (Channel, error) {
return conn.Channel()
}
// New initializes a new Publisher with the given config, opens a connection,
// channel, and declares the exchange. Returns the publisher and a cleanup function.
func New(cfg *Config) (*Publisher, func() error, error) {
// Step 1: Validate config
if err := Validate(cfg); err != nil {
return nil, nil, err
}
// Step 2: Build connection URL
connURL, err := GetConnURL(cfg)
if err != nil {
return nil, nil, fmt.Errorf("%w: %v", ErrConnectionFailed, err)
}
// Step 3: Dial connection
conn, err := DialFunc(connURL)
if err != nil {
return nil, nil, fmt.Errorf("%w: %v", ErrConnectionFailed, err)
}
// Step 4: Open channel
ch, err := ChannelFunc(conn)
if err != nil {
conn.Close()
return nil, nil, fmt.Errorf("%w: %v", ErrChannelFailed, err)
}
// Step 5: Declare exchange
if err := ch.ExchangeDeclare(
cfg.Exchange,
"topic",
cfg.Durable,
false,
false,
false,
nil,
); err != nil {
ch.Close()
conn.Close()
return nil, nil, fmt.Errorf("%w: %v", ErrExchangeDeclare, err)
}
// Step 6: Construct publisher
pub := &Publisher{
Conn: conn,
Channel: ch,
Config: cfg,
}
cleanup := func() error {
if ch != nil {
_ = ch.Close()
}
if conn != nil {
return conn.Close()
}
return nil
}
return pub, cleanup, nil
}

View File

@@ -2,8 +2,9 @@ package publisher
import ( import (
"context" "context"
"errors" "fmt"
"os" "os"
"strings"
"testing" "testing"
"github.com/rabbitmq/amqp091-go" "github.com/rabbitmq/amqp091-go"
@@ -153,32 +154,35 @@ func TestValidateFailure(t *testing.T) {
} }
} }
type mockChannel struct { type mockChannelForPublish struct {
published bool published bool
args amqp091.Publishing
exchange string exchange string
key string key string
body []byte
fail bool fail bool
} }
func (m *mockChannel) PublishWithContext( func (m *mockChannelForPublish) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) error {
_ context.Context,
exchange, key string,
mandatory, immediate bool,
msg amqp091.Publishing,
) error {
if m.fail { if m.fail {
return errors.New("mock publish failure") return fmt.Errorf("simulated publish failure")
} }
m.published = true m.published = true
m.args = msg
m.exchange = exchange m.exchange = exchange
m.key = key m.key = key
m.body = msg.Body
return nil
}
func (m *mockChannelForPublish) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error {
return nil
}
func (m *mockChannelForPublish) Close() error {
return nil return nil
} }
func TestPublishSuccess(t *testing.T) { func TestPublishSuccess(t *testing.T) {
mockCh := &mockChannel{} mockCh := &mockChannelForPublish{}
p := &Publisher{ p := &Publisher{
Channel: mockCh, Channel: mockCh,
@@ -203,7 +207,7 @@ func TestPublishSuccess(t *testing.T) {
} }
func TestPublishFailure(t *testing.T) { func TestPublishFailure(t *testing.T) {
mockCh := &mockChannel{fail: true} mockCh := &mockChannelForPublish{fail: true}
p := &Publisher{ p := &Publisher{
Channel: mockCh, Channel: mockCh,
@@ -218,3 +222,147 @@ func TestPublishFailure(t *testing.T) {
t.Error("expected error from failed publish, got nil") t.Error("expected error from failed publish, got nil")
} }
} }
type mockChannel struct{}
func (m *mockChannel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) error {
return nil
}
func (m *mockChannel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error {
return nil
}
func (m *mockChannel) Close() error {
return nil
}
type mockConnection struct{}
func (m *mockConnection) Close() error {
return nil
}
func TestNewPublisherSucess(t *testing.T) {
originalDialFunc := DialFunc
originalChannelFunc := ChannelFunc
defer func() {
DialFunc = originalDialFunc
ChannelFunc = originalChannelFunc
}()
// mockedConn := &mockConnection{}
DialFunc = func(url string) (*amqp091.Connection, error) {
return nil, nil
}
ChannelFunc = func(conn *amqp091.Connection) (Channel, error) {
return &mockChannel{}, nil
}
cfg := &Config{
Addr: "localhost",
Exchange: "test-ex",
Durable: true,
RoutingKey: "test.key",
}
t.Setenv("RABBITMQ_USERNAME", "user")
t.Setenv("RABBITMQ_PASSWORD", "pass")
pub, cleanup, err := New(cfg)
if err != nil {
t.Fatalf("New() failed: %v", err)
}
if pub == nil {
t.Fatal("Publisher should not be nil")
}
if cleanup == nil {
t.Fatal("Cleanup should not be nil")
}
if err := cleanup(); err != nil {
t.Errorf("Cleanup failed: %v", err)
}
}
type mockChannelFailDeclare struct{}
func (m *mockChannelFailDeclare) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) error {
return nil
}
func (m *mockChannelFailDeclare) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error {
return fmt.Errorf("simulated exchange declare error")
}
func (m *mockChannelFailDeclare) Close() error {
return nil
}
func TestNewPublisherFailures(t *testing.T) {
tests := []struct {
name string
cfg *Config
dialFunc func(url string) (*amqp091.Connection, error) // Mocked dial function
envVars map[string]string
expectedError string
}{
{
name: "ValidateFailure",
cfg: &Config{}, // invalid config
expectedError: "missing config.Addr",
},
{
name: "GetConnURLFailure",
cfg: &Config{
Addr: "localhost",
Exchange: "test-ex",
Durable: true,
RoutingKey: "test.key",
},
envVars: map[string]string{
"RABBITMQ_USERNAME": "",
"RABBITMQ_PASSWORD": "",
},
expectedError: "missing RabbitMQ credentials in environment",
},
{
name: "ConnectionFailure",
cfg: &Config{
Addr: "localhost",
Exchange: "test-ex",
Durable: true,
RoutingKey: "test.key",
},
dialFunc: func(url string) (*amqp091.Connection, error) {
return nil, fmt.Errorf("simulated connection failure")
},
envVars: map[string]string{
"RABBITMQ_USERNAME": "user",
"RABBITMQ_PASSWORD": "pass",
},
expectedError: "failed to connect to RabbitMQ",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Set environment variables
for key, value := range tt.envVars {
t.Setenv(key, value)
}
// Mock dialFunc if needed
originalDialFunc := DialFunc
if tt.dialFunc != nil {
DialFunc = tt.dialFunc
defer func() {
DialFunc = originalDialFunc
}()
}
_, _, err := New(tt.cfg)
if err == nil || (tt.expectedError != "" && !strings.Contains(err.Error(), tt.expectedError)) {
t.Errorf("Test %s failed: expected error containing %v, got: %v", tt.name, tt.expectedError, err)
}
})
}
}