diff --git a/go.mod b/go.mod index 12fad60..29412d4 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( require ( github.com/hashicorp/go-retryablehttp v0.7.7 + github.com/rabbitmq/amqp091-go v1.10.0 github.com/rs/zerolog v1.34.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index 821e117..d6ddf7e 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,8 @@ github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsK github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= diff --git a/pkg/plugin/implementation/publisher/cmd/plugin.go b/pkg/plugin/implementation/publisher/cmd/plugin.go new file mode 100644 index 0000000..647edaf --- /dev/null +++ b/pkg/plugin/implementation/publisher/cmd/plugin.go @@ -0,0 +1,96 @@ +package main + +import ( + "context" + "fmt" + + "github.com/beckn/beckn-onix/pkg/log" + "github.com/beckn/beckn-onix/pkg/plugin/definition" + "github.com/beckn/beckn-onix/pkg/plugin/implementation/publisher" + "github.com/rabbitmq/amqp091-go" +) + +// publisherProvider implements the PublisherProvider interface. +// It is responsible for creating a new Publisher instance. +type publisherProvider struct{} + +// New creates a new Publisher instance based on the provided configuration map. +// It also returns a cleanup function to close resources and any potential errors encountered. +func (p *publisherProvider) New(ctx context.Context, config map[string]string) (definition.Publisher, func() error, error) { + // Step 1: Map config + cfg := &publisher.Config{ + Addr: config["addr"], + Exchange: config["exchange"], + RoutingKey: config["routing_key"], + Durable: config["durable"] == "true", + UseTLS: config["use_tls"] == "true", + } + log.Debugf(ctx, "Publisher config mapped: %+v", cfg) + + // Step 2: Validate + if err := publisher.Validate(cfg); err != nil { + log.Errorf(ctx, err, "Publisher config validation failed") + return nil, nil, err + } + log.Infof(ctx, "Publisher config validated successfully") + + // Step 3:URL + connURL, err := publisher.GetConnURL(cfg) + if err != nil { + log.Errorf(ctx, err, "Failed to build RabbitMQ connection URL") + return nil, nil, fmt.Errorf("failed to build connection URL: %w", err) + } + log.Debugf(ctx, "RabbitMQ connection URL built: %s", connURL) + // Step 4: Connect + conn, err := amqp091.Dial(connURL) + if err != nil { + log.Errorf(ctx, err, "Failed to connect to RabbitMQ") + return nil, nil, fmt.Errorf("%w: %v", publisher.ErrConnectionFailed, err) + } + log.Infof(ctx, "Connected to RabbitMQ") + + ch, err := conn.Channel() + if err != nil { + conn.Close() + log.Errorf(ctx, err, "Failed to open RabbitMQ channel") + return nil, nil, fmt.Errorf("%w: %v", publisher.ErrChannelFailed, err) + } + log.Infof(ctx, "RabbitMQ channel opened successfully") + + // Step 5: Declare Exchange + err = ch.ExchangeDeclare( + cfg.Exchange, + "topic", + cfg.Durable, + false, + false, + false, + nil, + ) + if err != nil { + ch.Close() + conn.Close() + log.Errorf(ctx, err, "Failed to declare exchange: %s", cfg.Exchange) + return nil, nil, fmt.Errorf("%w: %v", publisher.ErrExchangeDeclare, err) + } + log.Infof(ctx, "RabbitMQ exchange declared successfully: %s", cfg.Exchange) + + // Step 6: Create publisher instance + publisher := &publisher.Publisher{ + Conn: conn, + Channel: ch, + Config: cfg, + } + + cleanup := func() error { + log.Infof(ctx, "Cleaning up RabbitMQ resources") + _ = ch.Close() + return conn.Close() + } + + log.Infof(ctx, "Publisher instance created successfully") + return publisher, cleanup, nil +} + +// Provider is the instance of publisherProvider that implements the PublisherProvider interface. +var Provider = publisherProvider{} diff --git a/pkg/plugin/implementation/publisher/publisher.go b/pkg/plugin/implementation/publisher/publisher.go new file mode 100644 index 0000000..a861caf --- /dev/null +++ b/pkg/plugin/implementation/publisher/publisher.go @@ -0,0 +1,119 @@ +package publisher + +import ( + "context" + "errors" + "fmt" + "net/url" + "os" + "strings" + + "github.com/beckn/beckn-onix/pkg/log" + "github.com/beckn/beckn-onix/pkg/model" + "github.com/rabbitmq/amqp091-go" +) + +// Config holds the configuration required to establish a connection with RabbitMQ. +type Config struct { + Addr string + Exchange string + RoutingKey string + Durable bool + UseTLS bool +} + +// Publisher manages the RabbitMQ connection and channel to publish messages. +type Publisher struct { + Conn *amqp091.Connection + Channel *amqp091.Channel + Config *Config +} + +// Error variables representing different failure scenarios. +var ( + ErrEmptyConfig = errors.New("empty config") + ErrAddrMissing = errors.New("missing required field 'Addr'") + ErrExchangeMissing = errors.New("missing required field 'Exchange'") + ErrCredentialMissing = errors.New("missing RabbitMQ credentials in environment") + ErrConnectionFailed = errors.New("failed to connect to RabbitMQ") + ErrChannelFailed = errors.New("failed to open channel") + ErrExchangeDeclare = errors.New("failed to declare exchange") +) + +// Validate checks whether the provided Config is valid for connecting to RabbitMQ. +func Validate(cfg *Config) error { + if cfg == nil { + return model.NewBadReqErr(fmt.Errorf("config is nil")) + } + if strings.TrimSpace(cfg.Addr) == "" { + return model.NewBadReqErr(fmt.Errorf("missing config.Addr")) + } + if strings.TrimSpace(cfg.Exchange) == "" { + return model.NewBadReqErr(fmt.Errorf("missing config.Exchange")) + } + return nil +} + +// GetConnURL constructs the RabbitMQ connection URL using the config and environment credentials. +func GetConnURL(cfg *Config) (string, error) { + user := os.Getenv("RABBITMQ_USERNAME") + pass := os.Getenv("RABBITMQ_PASSWORD") + if user == "" || pass == "" { + return "", model.NewBadReqErr(fmt.Errorf("missing RabbitMQ credentials in environment")) + } + + parts := strings.SplitN(cfg.Addr, "/", 2) + hostPort := parts[0] + vhost := "/" + if len(parts) > 1 { + vhost = parts[1] + } + + if !strings.Contains(hostPort, ":") { + if cfg.UseTLS { + hostPort += ":5671" + } else { + hostPort += ":5672" + } + } + + encodedUser := url.QueryEscape(user) + encodedPass := url.QueryEscape(pass) + encodedVHost := url.QueryEscape(vhost) + protocol := "amqp" + if cfg.UseTLS { + protocol = "amqps" + } + + connURL := fmt.Sprintf("%s://%s:%s@%s/%s", protocol, encodedUser, encodedPass, hostPort, encodedVHost) + log.Debugf(context.Background(), "Generated RabbitMQ connection URL: %s", connURL) + return connURL, nil +} + +// Publish sends a message to the configured RabbitMQ exchange with the specified routing key. +// If routingKey is empty, the default routing key from Config is used. +func (p *Publisher) Publish(ctx context.Context, routingKey string, msg []byte) error { + if routingKey == "" { + routingKey = p.Config.RoutingKey + } + log.Debugf(ctx, "Attempting to publish message. Exchange: %s, RoutingKey: %s", p.Config.Exchange, routingKey) + err := p.Channel.PublishWithContext( + ctx, + p.Config.Exchange, + routingKey, + false, + false, + amqp091.Publishing{ + ContentType: "application/json", + Body: msg, + }, + ) + + if err != nil { + log.Errorf(ctx, err, "Publish failed for Exchange: %s, RoutingKey: %s", p.Config.Exchange, routingKey) + return model.NewBadReqErr(fmt.Errorf("publish message failed: %w", err)) + } + + log.Infof(ctx, "Message published successfully to Exchange: %s, RoutingKey: %s", p.Config.Exchange, routingKey) + return nil +}