Publisher plugin implementation
This commit is contained in:
96
pkg/plugin/implementation/publisher/cmd/plugin.go
Normal file
96
pkg/plugin/implementation/publisher/cmd/plugin.go
Normal file
@@ -0,0 +1,96 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/beckn/beckn-onix/pkg/log"
|
||||
"github.com/beckn/beckn-onix/pkg/plugin/definition"
|
||||
"github.com/beckn/beckn-onix/pkg/plugin/implementation/publisher"
|
||||
"github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
// publisherProvider implements the PublisherProvider interface.
|
||||
// It is responsible for creating a new Publisher instance.
|
||||
type publisherProvider struct{}
|
||||
|
||||
// New creates a new Publisher instance based on the provided configuration map.
|
||||
// It also returns a cleanup function to close resources and any potential errors encountered.
|
||||
func (p *publisherProvider) New(ctx context.Context, config map[string]string) (definition.Publisher, func() error, error) {
|
||||
// Step 1: Map config
|
||||
cfg := &publisher.Config{
|
||||
Addr: config["addr"],
|
||||
Exchange: config["exchange"],
|
||||
RoutingKey: config["routing_key"],
|
||||
Durable: config["durable"] == "true",
|
||||
UseTLS: config["use_tls"] == "true",
|
||||
}
|
||||
log.Debugf(ctx, "Publisher config mapped: %+v", cfg)
|
||||
|
||||
// Step 2: Validate
|
||||
if err := publisher.Validate(cfg); err != nil {
|
||||
log.Errorf(ctx, err, "Publisher config validation failed")
|
||||
return nil, nil, err
|
||||
}
|
||||
log.Infof(ctx, "Publisher config validated successfully")
|
||||
|
||||
// Step 3:URL
|
||||
connURL, err := publisher.GetConnURL(cfg)
|
||||
if err != nil {
|
||||
log.Errorf(ctx, err, "Failed to build RabbitMQ connection URL")
|
||||
return nil, nil, fmt.Errorf("failed to build connection URL: %w", err)
|
||||
}
|
||||
log.Debugf(ctx, "RabbitMQ connection URL built: %s", connURL)
|
||||
// Step 4: Connect
|
||||
conn, err := amqp091.Dial(connURL)
|
||||
if err != nil {
|
||||
log.Errorf(ctx, err, "Failed to connect to RabbitMQ")
|
||||
return nil, nil, fmt.Errorf("%w: %v", publisher.ErrConnectionFailed, err)
|
||||
}
|
||||
log.Infof(ctx, "Connected to RabbitMQ")
|
||||
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
log.Errorf(ctx, err, "Failed to open RabbitMQ channel")
|
||||
return nil, nil, fmt.Errorf("%w: %v", publisher.ErrChannelFailed, err)
|
||||
}
|
||||
log.Infof(ctx, "RabbitMQ channel opened successfully")
|
||||
|
||||
// Step 5: Declare Exchange
|
||||
err = ch.ExchangeDeclare(
|
||||
cfg.Exchange,
|
||||
"topic",
|
||||
cfg.Durable,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
ch.Close()
|
||||
conn.Close()
|
||||
log.Errorf(ctx, err, "Failed to declare exchange: %s", cfg.Exchange)
|
||||
return nil, nil, fmt.Errorf("%w: %v", publisher.ErrExchangeDeclare, err)
|
||||
}
|
||||
log.Infof(ctx, "RabbitMQ exchange declared successfully: %s", cfg.Exchange)
|
||||
|
||||
// Step 6: Create publisher instance
|
||||
publisher := &publisher.Publisher{
|
||||
Conn: conn,
|
||||
Channel: ch,
|
||||
Config: cfg,
|
||||
}
|
||||
|
||||
cleanup := func() error {
|
||||
log.Infof(ctx, "Cleaning up RabbitMQ resources")
|
||||
_ = ch.Close()
|
||||
return conn.Close()
|
||||
}
|
||||
|
||||
log.Infof(ctx, "Publisher instance created successfully")
|
||||
return publisher, cleanup, nil
|
||||
}
|
||||
|
||||
// Provider is the instance of publisherProvider that implements the PublisherProvider interface.
|
||||
var Provider = publisherProvider{}
|
||||
119
pkg/plugin/implementation/publisher/publisher.go
Normal file
119
pkg/plugin/implementation/publisher/publisher.go
Normal file
@@ -0,0 +1,119 @@
|
||||
package publisher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/beckn/beckn-onix/pkg/log"
|
||||
"github.com/beckn/beckn-onix/pkg/model"
|
||||
"github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
// Config holds the configuration required to establish a connection with RabbitMQ.
|
||||
type Config struct {
|
||||
Addr string
|
||||
Exchange string
|
||||
RoutingKey string
|
||||
Durable bool
|
||||
UseTLS bool
|
||||
}
|
||||
|
||||
// Publisher manages the RabbitMQ connection and channel to publish messages.
|
||||
type Publisher struct {
|
||||
Conn *amqp091.Connection
|
||||
Channel *amqp091.Channel
|
||||
Config *Config
|
||||
}
|
||||
|
||||
// Error variables representing different failure scenarios.
|
||||
var (
|
||||
ErrEmptyConfig = errors.New("empty config")
|
||||
ErrAddrMissing = errors.New("missing required field 'Addr'")
|
||||
ErrExchangeMissing = errors.New("missing required field 'Exchange'")
|
||||
ErrCredentialMissing = errors.New("missing RabbitMQ credentials in environment")
|
||||
ErrConnectionFailed = errors.New("failed to connect to RabbitMQ")
|
||||
ErrChannelFailed = errors.New("failed to open channel")
|
||||
ErrExchangeDeclare = errors.New("failed to declare exchange")
|
||||
)
|
||||
|
||||
// Validate checks whether the provided Config is valid for connecting to RabbitMQ.
|
||||
func Validate(cfg *Config) error {
|
||||
if cfg == nil {
|
||||
return model.NewBadReqErr(fmt.Errorf("config is nil"))
|
||||
}
|
||||
if strings.TrimSpace(cfg.Addr) == "" {
|
||||
return model.NewBadReqErr(fmt.Errorf("missing config.Addr"))
|
||||
}
|
||||
if strings.TrimSpace(cfg.Exchange) == "" {
|
||||
return model.NewBadReqErr(fmt.Errorf("missing config.Exchange"))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetConnURL constructs the RabbitMQ connection URL using the config and environment credentials.
|
||||
func GetConnURL(cfg *Config) (string, error) {
|
||||
user := os.Getenv("RABBITMQ_USERNAME")
|
||||
pass := os.Getenv("RABBITMQ_PASSWORD")
|
||||
if user == "" || pass == "" {
|
||||
return "", model.NewBadReqErr(fmt.Errorf("missing RabbitMQ credentials in environment"))
|
||||
}
|
||||
|
||||
parts := strings.SplitN(cfg.Addr, "/", 2)
|
||||
hostPort := parts[0]
|
||||
vhost := "/"
|
||||
if len(parts) > 1 {
|
||||
vhost = parts[1]
|
||||
}
|
||||
|
||||
if !strings.Contains(hostPort, ":") {
|
||||
if cfg.UseTLS {
|
||||
hostPort += ":5671"
|
||||
} else {
|
||||
hostPort += ":5672"
|
||||
}
|
||||
}
|
||||
|
||||
encodedUser := url.QueryEscape(user)
|
||||
encodedPass := url.QueryEscape(pass)
|
||||
encodedVHost := url.QueryEscape(vhost)
|
||||
protocol := "amqp"
|
||||
if cfg.UseTLS {
|
||||
protocol = "amqps"
|
||||
}
|
||||
|
||||
connURL := fmt.Sprintf("%s://%s:%s@%s/%s", protocol, encodedUser, encodedPass, hostPort, encodedVHost)
|
||||
log.Debugf(context.Background(), "Generated RabbitMQ connection URL: %s", connURL)
|
||||
return connURL, nil
|
||||
}
|
||||
|
||||
// Publish sends a message to the configured RabbitMQ exchange with the specified routing key.
|
||||
// If routingKey is empty, the default routing key from Config is used.
|
||||
func (p *Publisher) Publish(ctx context.Context, routingKey string, msg []byte) error {
|
||||
if routingKey == "" {
|
||||
routingKey = p.Config.RoutingKey
|
||||
}
|
||||
log.Debugf(ctx, "Attempting to publish message. Exchange: %s, RoutingKey: %s", p.Config.Exchange, routingKey)
|
||||
err := p.Channel.PublishWithContext(
|
||||
ctx,
|
||||
p.Config.Exchange,
|
||||
routingKey,
|
||||
false,
|
||||
false,
|
||||
amqp091.Publishing{
|
||||
ContentType: "application/json",
|
||||
Body: msg,
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf(ctx, err, "Publish failed for Exchange: %s, RoutingKey: %s", p.Config.Exchange, routingKey)
|
||||
return model.NewBadReqErr(fmt.Errorf("publish message failed: %w", err))
|
||||
}
|
||||
|
||||
log.Infof(ctx, "Message published successfully to Exchange: %s, RoutingKey: %s", p.Config.Exchange, routingKey)
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user