diff --git a/.github/workflows/build-and-deploy-plugins.yml b/.github/workflows/build-and-deploy-plugins.yml new file mode 100644 index 0000000..79c33e0 --- /dev/null +++ b/.github/workflows/build-and-deploy-plugins.yml @@ -0,0 +1,116 @@ +name: Build and Upload Plugins + +on: + workflow_dispatch: + inputs: + target_branch: + description: 'Branch to deploy' + required: true + default: 'beckn-onix-v1.0-develop' + + +jobs: + build-and-upload: + runs-on: ubuntu-latest + env: + GCS_BUCKET: ${{ secrets.GCS_BUCKET }} + PLUGIN_OUTPUT_DIR: ./generated + ZIP_FILE: plugins_bundle.zip + + steps: + - name: Checkout this repo + uses: actions/checkout@v4 + with: + ref: ${{ github.event.inputs.target_branch }} + + - name: Show selected branch + run: echo "Deploying branch:${{ github.event.inputs.target_branch }}" + + + - name: Clone GitHub and Gerrit plugin repos + run: | + # Example GitHub clone + git clone -b beckn-onix-v1.0-develop https://${{ secrets.PAT_GITHUB }}:@github.com/beckn/beckn-onix.git github-repo + + # Example Gerrit clone + git clone https://${{ secrets.GERRIT_USERNAME }}:${{ secrets.GERRIT_PAT }}@open-networks.googlesource.com/onix-dev gerrit-repo + + - name: List directory structure + run: | + echo "📂 Contents of root:" + ls -alh + + echo "📂 Contents of GitHub repo:" + ls -alh github-repo + + echo "📂 Deep list of GitHub repo:" + find github-repo + + echo "📂 Contents of Gerrit repo:" + ls -alh gerrit-repo + + echo "📂 Deep list of Gerrit repo:" + find gerrit-repo + + + - name: Build Go plugins in Docker + run: | + set -e + mkdir -p $PLUGIN_OUTPUT_DIR + + BUILD_CMDS="" + + # GitHub plugins + for dir in github-repo/pkg/plugin/implementation/*; do + if [ -d "$dir/cmd" ]; then + plugin=$(basename "$dir") + BUILD_CMDS+="cd github-repo && go build -buildmode=plugin -buildvcs=false -o ../${PLUGIN_OUTPUT_DIR}/${plugin}.so ./pkg/plugin/implementation/${plugin}/cmd && cd - && " + fi + done + + # Gerrit plugins — build in their own repo/module context + for dir in gerrit-repo/plugins/*; do + if [ -d "$dir/cmd" ]; then + plugin=$(basename "$dir") + BUILD_CMDS+="cd gerrit-repo && go build -buildmode=plugin -buildvcs=false -o ../${PLUGIN_OUTPUT_DIR}/${plugin}.so ./plugins/${plugin}/cmd && cd - && " + fi + done + + BUILD_CMDS=${BUILD_CMDS%" && "} + echo "🛠️ Running build commands inside Docker:" + echo "$BUILD_CMDS" + + docker run --rm -v "$(pwd)":/app -w /app golang:1.24-bullseye sh -c "$BUILD_CMDS" + + - name: List built plugin files + run: | + echo "Looking in $PLUGIN_OUTPUT_DIR" + ls -lh $PLUGIN_OUTPUT_DIR || echo "⚠️ Directory does not exist" + find $PLUGIN_OUTPUT_DIR -name '*.so' || echo "⚠️ No .so files found" + + echo "Creating zip archive..." + cd "$PLUGIN_OUTPUT_DIR" + zip -r "../$ZIP_FILE" *.so + echo "Created $ZIP_FILE" + cd .. + + - name: List zip output + run: | + ls -lh plugins_bundle.zip + + + - name: Authenticate to GCP + run: | + echo '${{ secrets.GOOGLE_APPLICATION_CREDENTIALS_JSON }}' > gcloud-key.json + gcloud auth activate-service-account --key-file=gcloud-key.json + gcloud config set project trusty-relic-370809 + env: + GOOGLE_APPLICATION_CREDENTIALS: gcloud-key.json + + - name: Upload to GCS + run: | + gsutil -m cp -r $ZIP_FILE gs://${GCS_BUCKET}/plugins/ + + - name: Cleanup + run: | + rm -rf $PLUGIN_OUTPUT_DIR $ZIP_FILE gcloud-key.json diff --git a/.github/workflows/deploy-to-gke-BS.yml b/.github/workflows/deploy-to-gke-BS.yml new file mode 100644 index 0000000..d068829 --- /dev/null +++ b/.github/workflows/deploy-to-gke-BS.yml @@ -0,0 +1,57 @@ +name: CI/CD to GKE updated + +on: + #push: + workflow_dispatch: + +jobs: + deploy: + name: Build and Deploy to GKE + runs-on: ubuntu-latest + + steps: + - name: Checkout Code + uses: actions/checkout@v3 + + - name: Authenticate to Google Cloud + uses: google-github-actions/auth@v2 + with: + credentials_json: '${{ secrets.GOOGLE_APPLICATION_CREDENTIALS_JSON }}' + + - name: Set up gcloud CLI + uses: google-github-actions/setup-gcloud@v1 + with: + project_id: ${{ secrets.GCP_PROJECT }} + export_default_credentials: true + + - name: Install GKE Auth Plugin + run: gcloud components install gke-gcloud-auth-plugin --quiet + + - name: Configure Docker to use Artifact Registry + run: gcloud auth configure-docker ${{ secrets.GCP_REGION }}-docker.pkg.dev + + - name: Build Docker Image + run: | + IMAGE_NAME=${{ secrets.GCP_REGION }}-docker.pkg.dev/${{ secrets.GCP_PROJECT }}/${{ secrets.GCP_REPO }}/beckn-onix:${{ github.sha }} + docker build -f Dockerfile.adapter -t $IMAGE_NAME . + docker push $IMAGE_NAME + + - name: Get GKE Credentials + run: | + gcloud container clusters get-credentials ${{ secrets.GKE_CLUSTER }} \ + --zone ${{ secrets.GCP_REGION }} \ + --project ${{ secrets.GCP_PROJECT }} + + - name: Deploy to GKE using Kubernetes Manifests + run: | + IMAGE_NAME=${{ secrets.GCP_REGION }}-docker.pkg.dev/${{ secrets.GCP_PROJECT }}/${{ secrets.GCP_REPO }}/beckn-onix:${{ github.sha }} + + # Replace image in deployment YAML + sed -i "s|image: .*|image: $IMAGE_NAME|g" Deployment/deployment.yaml + + # Apply Kubernetes manifests + kubectl apply -f Deployment/deployment.yaml --namespace=onix-adapter + kubectl apply -f Deployment/service.yaml --namespace=onix-adapter + + # Wait for rollout to complete + kubectl rollout status Deployment/onix-demo-adapter --namespace=onix-adapter diff --git a/.github/workflows/deploy-to-gke.yml b/.github/workflows/deploy-to-gke.yml new file mode 100644 index 0000000..bcb43ef --- /dev/null +++ b/.github/workflows/deploy-to-gke.yml @@ -0,0 +1,44 @@ +name: Deploy to GKE + +on: + workflow_dispatch: + inputs: + service_name: + description: 'Name of the Kubernetes service to deploy' + required: true + type: string + cluster_name: + description: 'Name of the GKE cluster' + required: true + type: string + +jobs: + deploy: + runs-on: ubuntu-latest + + env: + PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }} + REGION: ${{ secrets.GCP_REGION }} + GKE_CLUSTER: ${{ github.event.inputs.cluster_name }} + SERVICE_NAME: ${{ github.event.inputs.service_name }} + + steps: + - name: Checkout source + uses: actions/checkout@v3 + + - name: Authenticate to Google Cloud + uses: google-github-actions/auth@v2 + with: + credentials_json: ${{ secrets.GCP_SA_KEY }} + + - name: Set up GKE credentials + uses: google-github-actions/get-gke-credentials@v1 + with: + cluster_name: ${{ env.GKE_CLUSTER }} + location: ${{ env.REGION }} + project_id: ${{ env.PROJECT_ID }} + + - name: Deploy to GKE + run: | + echo "Deploying service $SERVICE_NAME to cluster $GKE_CLUSTER" + kubectl set image deployment/$SERVICE_NAME $SERVICE_NAME=gcr.io/$PROJECT_ID/$SERVICE_NAME:latest --record diff --git a/.github/workflows/onix-gcp-terraform-deploy.yml b/.github/workflows/onix-gcp-terraform-deploy.yml new file mode 100644 index 0000000..478979b --- /dev/null +++ b/.github/workflows/onix-gcp-terraform-deploy.yml @@ -0,0 +1,66 @@ +name: Terraform Deploy to GCP + +on: + push: + workflow_dispatch: # Manual triggerr + +jobs: + plan: + name: Terraform Plan Only + runs-on: ubuntu-latest + + steps: + - name: Checkout this repository + uses: actions/checkout@v3 + + - name: Clone Terraform repo from Gerrit + run: | + git clone https://${{ secrets.GERRIT_USERNAME }}:${{ secrets.GERRIT_PAT }}@open-networks.googlesource.com/onix-dev gerrit-repo + echo "==== Contents of Terraform-dir ====" + pwd + cd gerrit-repo/Terraform-CICD + pwd + ls -la + + - name: Authenticate to Google Cloud + run: echo '${{ secrets.GOOGLE_APPLICATION_CREDENTIALS_JSON }}' > gcp-key.json + + - name: Set up Terraform + uses: hashicorp/setup-terraform@v3 + with: + terraform_version: 1.5.0 + + - name: Write GCP credentials to file + run: echo '${{ secrets.GOOGLE_APPLICATION_CREDENTIALS_JSON }}' > gcp-key.json + + - name: Export GCP credentials environment variable + run: echo "GOOGLE_APPLICATION_CREDENTIALS=$GITHUB_WORKSPACE/gcp-key.json" >> $GITHUB_ENV + + - name: Create backend.tf and Terraform Init + working-directory: ./gerrit-repo/Terraform-CICD + env: + GCS_BUCKET: beckn-cicd-tf-state-bucket + run: | + cat < backend.tf + terraform { + backend "gcs" { + bucket = "${GCS_BUCKET}" + prefix = "terraform/state" + credentials = "${{ github.workspace }}/gcp-key.json" + } + } + EOF + + terraform init + + - name: Terraform Plan + working-directory: ./gerrit-repo/Terraform-CICD + run: terraform plan + + - name: Terraform Apply + working-directory: ./gerrit-repo/Terraform-CICD + run: terraform apply -var="subnet_name=onix-gke-subnet" -auto-approve + + - name: Clean up credentials + run: rm -f gcp-key.json + diff --git a/Deployment/deployment.yaml b/Deployment/deployment.yaml new file mode 100644 index 0000000..561fdb5 --- /dev/null +++ b/Deployment/deployment.yaml @@ -0,0 +1,40 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: onix-demo-adapter + namespace: onix-adapter #------ +spec: + replicas: 1 + selector: + matchLabels: + app: onix-adapter + template: + metadata: + labels: + app: onix-adapter + annotations: + gke-gcsfuse/volumes: "true" + spec: + serviceAccountName: "onix-adapter-ksa" #----------- + containers: + - name: onix-adapter + image: "asia-south1-docker.pkg.dev/trusty-relic-370809/onix-adapter-cicd/beckn-onix:latest" #------ + ports: + - containerPort: 8080 + env: + - name: CONFIG_FILE + value: "/mnt/gcs/configs/onix-adapter.yaml" # Updated to GCS path + + volumeMounts: + - name: gcs-bucket + mountPath: /mnt/gcs + readOnly: false + + volumes: + - name: gcs-bucket + csi: + driver: gcsfuse.csi.storage.gke.io + readOnly: false + volumeAttributes: + bucketName: "beckn-cicd-bucket" #---------- + mountOptions: "implicit-dirs" diff --git a/Deployment/service.yaml b/Deployment/service.yaml new file mode 100644 index 0000000..c4be0d8 --- /dev/null +++ b/Deployment/service.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Service +metadata: + name: onix-adapter-service + namespace: onix-adapter # Namespace +spec: + selector: + app: onix-adapter # This should match the app name in deployment.yaml + ports: + - protocol: TCP + port: 80 + targetPort: 8080 + type: LoadBalancer #NodePort or LoadBalancer diff --git a/go.mod b/go.mod index 2e23b33..b7e02e5 100644 --- a/go.mod +++ b/go.mod @@ -49,6 +49,7 @@ require ( github.com/google/uuid v1.6.0 github.com/hashicorp/go-retryablehttp v0.7.7 github.com/hashicorp/vault/api v1.16.0 + github.com/rabbitmq/amqp091-go v1.10.0 github.com/rs/zerolog v1.34.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index c909290..e8fb854 100644 --- a/go.sum +++ b/go.sum @@ -75,6 +75,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= @@ -94,6 +96,8 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03 h1:m1h+vudopHsI67FPT9MOncyndWhTcdUoBtI1R1uajGY= github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03/go.mod h1:8sheVFH84v3PCyFY/O02mIgSQY9I6wMYPWsq7mDnEZY= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= diff --git a/pkg/plugin/definition/keymanager.go b/pkg/plugin/definition/keymanager.go index 749ad0a..8bcb5bb 100644 --- a/pkg/plugin/definition/keymanager.go +++ b/pkg/plugin/definition/keymanager.go @@ -11,7 +11,7 @@ type KeyManager interface { GenerateKeyset() (*model.Keyset, error) InsertKeyset(ctx context.Context, keyID string, keyset *model.Keyset) error Keyset(ctx context.Context, keyID string) (*model.Keyset, error) - LookupNPKeys(ctx context.Context, subscriberID, uniqueKeyID string) (string, string, error) + LookupNPKeys(ctx context.Context, subscriberID, uniqueKeyID string) (signingPublicKey string, encrPublicKey string, err error) DeleteKeyset(ctx context.Context, keyID string) error } diff --git a/pkg/plugin/definition/publisher.go b/pkg/plugin/definition/publisher.go index 1e744da..55ed217 100644 --- a/pkg/plugin/definition/publisher.go +++ b/pkg/plugin/definition/publisher.go @@ -8,6 +8,7 @@ type Publisher interface { Publish(context.Context, string, []byte) error } +// PublisherProvider is the interface for creating new Publisher instances. type PublisherProvider interface { // New initializes a new publisher instance with the given configuration. New(ctx context.Context, config map[string]string) (Publisher, func() error, error) diff --git a/pkg/plugin/implementation/publisher/cmd/plugin.go b/pkg/plugin/implementation/publisher/cmd/plugin.go new file mode 100644 index 0000000..ccf87fa --- /dev/null +++ b/pkg/plugin/implementation/publisher/cmd/plugin.go @@ -0,0 +1,37 @@ +package main + +import ( + "context" + + "github.com/beckn/beckn-onix/pkg/log" + "github.com/beckn/beckn-onix/pkg/plugin/definition" + "github.com/beckn/beckn-onix/pkg/plugin/implementation/publisher" +) + +// publisherProvider implements the PublisherProvider interface. +// It is responsible for creating a new Publisher instance. +type publisherProvider struct{} + +// New creates a new Publisher instance based on the provided configuration. +func (p *publisherProvider) New(ctx context.Context, config map[string]string) (definition.Publisher, func() error, error) { + cfg := &publisher.Config{ + Addr: config["addr"], + Exchange: config["exchange"], + RoutingKey: config["routing_key"], + Durable: config["durable"] == "true", + UseTLS: config["use_tls"] == "true", + } + log.Debugf(ctx, "Publisher config mapped: %+v", cfg) + + pub, cleanup, err := publisher.New(cfg) + if err != nil { + log.Errorf(ctx, err, "Failed to create publisher instance") + return nil, nil, err + } + + log.Infof(ctx, "Publisher instance created successfully") + return pub, cleanup, nil +} + +// Provider is the instance of publisherProvider that implements the PublisherProvider interface. +var Provider = publisherProvider{} diff --git a/pkg/plugin/implementation/publisher/cmd/plugin_test.go b/pkg/plugin/implementation/publisher/cmd/plugin_test.go new file mode 100644 index 0000000..e3f9837 --- /dev/null +++ b/pkg/plugin/implementation/publisher/cmd/plugin_test.go @@ -0,0 +1,106 @@ +package main + +import ( + "context" + "errors" + "strings" + "testing" + + "github.com/beckn/beckn-onix/pkg/plugin/implementation/publisher" + "github.com/rabbitmq/amqp091-go" +) + +type mockChannel struct{} + +func (m *mockChannel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) error { + return nil +} +func (m *mockChannel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error { + return nil +} +func (m *mockChannel) Close() error { + return nil +} + +func TestPublisherProvider_New_Success(t *testing.T) { + // Save original dialFunc and channelFunc + originalDialFunc := publisher.DialFunc + originalChannelFunc := publisher.ChannelFunc + defer func() { + publisher.DialFunc = originalDialFunc + publisher.ChannelFunc = originalChannelFunc + }() + + // Override mocks + publisher.DialFunc = func(url string) (*amqp091.Connection, error) { + return nil, nil + } + publisher.ChannelFunc = func(conn *amqp091.Connection) (publisher.Channel, error) { + return &mockChannel{}, nil + } + + t.Setenv("RABBITMQ_USERNAME", "guest") + t.Setenv("RABBITMQ_PASSWORD", "guest") + + config := map[string]string{ + "addr": "localhost", + "exchange": "test-exchange", + "routing_key": "test.key", + "durable": "true", + "use_tls": "false", + } + + ctx := context.Background() + pub, cleanup, err := Provider.New(ctx, config) + + if err != nil { + t.Fatalf("Provider.New returned error: %v", err) + } + if pub == nil { + t.Fatal("Expected non-nil publisher") + } + if cleanup == nil { + t.Fatal("Expected non-nil cleanup function") + } + + if err := cleanup(); err != nil { + t.Errorf("Cleanup returned error: %v", err) + } +} + +func TestPublisherProvider_New_Failure(t *testing.T) { + // Save and restore dialFunc + originalDialFunc := publisher.DialFunc + defer func() { publisher.DialFunc = originalDialFunc }() + + // Simulate dial failure + publisher.DialFunc = func(url string) (*amqp091.Connection, error) { + return nil, errors.New("dial failed") + } + + t.Setenv("RABBITMQ_USERNAME", "guest") + t.Setenv("RABBITMQ_PASSWORD", "guest") + + config := map[string]string{ + "addr": "localhost", + "exchange": "test-exchange", + "routing_key": "test.key", + "durable": "true", + } + + ctx := context.Background() + pub, cleanup, err := Provider.New(ctx, config) + + if err == nil { + t.Fatal("Expected error from Provider.New but got nil") + } + if !strings.Contains(err.Error(), "dial failed") { + t.Errorf("Expected 'dial failed' error, got: %v", err) + } + if pub != nil { + t.Errorf("Expected nil publisher, got: %v", pub) + } + if cleanup != nil { + t.Error("Expected nil cleanup, got non-nil") + } +} diff --git a/pkg/plugin/implementation/publisher/publisher.go b/pkg/plugin/implementation/publisher/publisher.go new file mode 100644 index 0000000..db3e577 --- /dev/null +++ b/pkg/plugin/implementation/publisher/publisher.go @@ -0,0 +1,196 @@ +package publisher + +import ( + "context" + "errors" + "fmt" + "net/url" + "os" + "strings" + + "github.com/beckn/beckn-onix/pkg/log" + "github.com/beckn/beckn-onix/pkg/model" + "github.com/rabbitmq/amqp091-go" +) + +// Config holds the configuration required to establish a connection with RabbitMQ. +type Config struct { + Addr string + Exchange string + RoutingKey string + Durable bool + UseTLS bool +} + +// Channel defines the interface for publishing messages to RabbitMQ. +type Channel interface { + PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) error + ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error + Close() error +} + +// Publisher manages the RabbitMQ connection and channel to publish messages. +type Publisher struct { + Conn *amqp091.Connection + Channel Channel + Config *Config +} + +// Error variables representing different failure scenarios. +var ( + ErrEmptyConfig = errors.New("empty config") + ErrAddrMissing = errors.New("missing required field 'Addr'") + ErrExchangeMissing = errors.New("missing required field 'Exchange'") + ErrCredentialMissing = errors.New("missing RabbitMQ credentials in environment") + ErrConnectionFailed = errors.New("failed to connect to RabbitMQ") + ErrChannelFailed = errors.New("failed to open channel") + ErrExchangeDeclare = errors.New("failed to declare exchange") +) + +// Validate checks whether the provided Config is valid for connecting to RabbitMQ. +func Validate(cfg *Config) error { + if cfg == nil { + return model.NewBadReqErr(fmt.Errorf("config is nil")) + } + if strings.TrimSpace(cfg.Addr) == "" { + return model.NewBadReqErr(fmt.Errorf("missing config.Addr")) + } + if strings.TrimSpace(cfg.Exchange) == "" { + return model.NewBadReqErr(fmt.Errorf("missing config.Exchange")) + } + return nil +} + +// GetConnURL constructs the RabbitMQ connection URL using the config and environment credentials. +func GetConnURL(cfg *Config) (string, error) { + user := os.Getenv("RABBITMQ_USERNAME") + pass := os.Getenv("RABBITMQ_PASSWORD") + if user == "" || pass == "" { + return "", model.NewBadReqErr(fmt.Errorf("missing RabbitMQ credentials in environment")) + } + parts := strings.SplitN(strings.TrimSpace(cfg.Addr), "/", 2) + hostPort := parts[0] + vhost := "/" + if len(parts) > 1 { + vhost = parts[1] + } + + if !strings.Contains(hostPort, ":") { + if cfg.UseTLS { + hostPort += ":5671" + } else { + hostPort += ":5672" + } + } + + encodedUser := url.QueryEscape(user) + encodedPass := url.QueryEscape(pass) + encodedVHost := url.QueryEscape(vhost) + protocol := "amqp" + if cfg.UseTLS { + protocol = "amqps" + } + + connURL := fmt.Sprintf("%s://%s:%s@%s/%s", protocol, encodedUser, encodedPass, hostPort, encodedVHost) + log.Debugf(context.Background(), "Generated RabbitMQ connection details: protocol=%s, hostPort=%s, vhost=%s", protocol, hostPort, vhost) + + return connURL, nil +} + +// Publish sends a message to the configured RabbitMQ exchange with the specified routing key. +// If routingKey is empty, the default routing key from Config is used. +func (p *Publisher) Publish(ctx context.Context, routingKey string, msg []byte) error { + if routingKey == "" { + routingKey = p.Config.RoutingKey + } + log.Debugf(ctx, "Attempting to publish message. Exchange: %s, RoutingKey: %s", p.Config.Exchange, routingKey) + err := p.Channel.PublishWithContext( + ctx, + p.Config.Exchange, + routingKey, + false, + false, + amqp091.Publishing{ + ContentType: "application/json", + Body: msg, + }, + ) + + if err != nil { + log.Errorf(ctx, err, "Publish failed for Exchange: %s, RoutingKey: %s", p.Config.Exchange, routingKey) + return model.NewBadReqErr(fmt.Errorf("publish message failed: %w", err)) + } + + log.Infof(ctx, "Message published successfully to Exchange: %s, RoutingKey: %s", p.Config.Exchange, routingKey) + return nil +} + +// DialFunc is a function variable used to establish a connection to RabbitMQ. +var DialFunc = amqp091.Dial + +// ChannelFunc is a function variable used to open a channel on the given RabbitMQ connection. +var ChannelFunc = func(conn *amqp091.Connection) (Channel, error) { + return conn.Channel() +} + +// New initializes a new Publisher with the given config, opens a connection, +// channel, and declares the exchange. Returns the publisher and a cleanup function. +func New(cfg *Config) (*Publisher, func() error, error) { + // Step 1: Validate config + if err := Validate(cfg); err != nil { + return nil, nil, err + } + + // Step 2: Build connection URL + connURL, err := GetConnURL(cfg) + if err != nil { + return nil, nil, fmt.Errorf("%w: %v", ErrConnectionFailed, err) + } + + // Step 3: Dial connection + conn, err := DialFunc(connURL) + if err != nil { + return nil, nil, fmt.Errorf("%w: %v", ErrConnectionFailed, err) + } + + // Step 4: Open channel + ch, err := ChannelFunc(conn) + if err != nil { + conn.Close() + return nil, nil, fmt.Errorf("%w: %v", ErrChannelFailed, err) + } + + // Step 5: Declare exchange + if err := ch.ExchangeDeclare( + cfg.Exchange, + "topic", + cfg.Durable, + false, + false, + false, + nil, + ); err != nil { + ch.Close() + conn.Close() + return nil, nil, fmt.Errorf("%w: %v", ErrExchangeDeclare, err) + } + + // Step 6: Construct publisher + pub := &Publisher{ + Conn: conn, + Channel: ch, + Config: cfg, + } + + cleanup := func() error { + if ch != nil { + _ = ch.Close() + } + if conn != nil { + return conn.Close() + } + return nil + } + + return pub, cleanup, nil +} diff --git a/pkg/plugin/implementation/publisher/publisher_test.go b/pkg/plugin/implementation/publisher/publisher_test.go new file mode 100644 index 0000000..82b8404 --- /dev/null +++ b/pkg/plugin/implementation/publisher/publisher_test.go @@ -0,0 +1,362 @@ +package publisher + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/rabbitmq/amqp091-go" +) + +func TestGetConnURLSuccess(t *testing.T) { + tests := []struct { + name string + config *Config + }{ + { + name: "Valid config with connection address", + config: &Config{ + Addr: "localhost:5672", + UseTLS: false, + }, + }, + + { + name: "Valid config with vhost", + config: &Config{ + Addr: "localhost:5672/myvhost", + UseTLS: false, + }, + }, + { + name: "Addr with leading and trailing spaces", + config: &Config{ + Addr: " localhost:5672/myvhost ", + UseTLS: false, + }, + }, + } + + // Set valid credentials + t.Setenv("RABBITMQ_USERNAME", "guest") + t.Setenv("RABBITMQ_PASSWORD", "guest") + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + url, err := GetConnURL(tt.config) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if url == "" { + t.Error("expected non-empty URL, got empty string") + } + }) + } +} + +func TestGetConnURLFailure(t *testing.T) { + tests := []struct { + name string + username string + password string + config *Config + wantErr bool + }{ + { + name: "Missing credentials", + username: "", + password: "", + config: &Config{Addr: "localhost:5672"}, + wantErr: true, + }, + { + name: "Missing config address", + username: "guest", + password: "guest", + config: &Config{}, // this won't error unless Validate() is called separately + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.username != "" { + t.Setenv("RABBITMQ_USERNAME", tt.username) + } + + if tt.password != "" { + t.Setenv("RABBITMQ_PASSWORD", tt.password) + } + + url, err := GetConnURL(tt.config) + if (err != nil) != tt.wantErr { + t.Errorf("unexpected error. gotErr = %v, wantErr = %v", err != nil, tt.wantErr) + } + + if err == nil && url == "" { + t.Errorf("expected non-empty URL, got empty string") + } + }) + } +} + +func TestValidateSuccess(t *testing.T) { + tests := []struct { + name string + config *Config + }{ + { + name: "Valid config with Addr and Exchange", + config: &Config{ + Addr: "localhost:5672", + Exchange: "ex", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := Validate(tt.config); err != nil { + t.Errorf("expected no error, got: %v", err) + } + }) + } +} + +func TestValidateFailure(t *testing.T) { + tests := []struct { + name string + config *Config + expectedErrr string + }{ + { + name: "Nil config", + config: nil, + expectedErrr: "config is nil", + }, + { + name: "Missing Addr", + config: &Config{Exchange: "ex"}, + expectedErrr: "missing config.Addr", + }, + { + name: "Missing Exchange", + config: &Config{Addr: "localhost:5672"}, + expectedErrr: "missing config.Exchange", + }, + { + name: "Empty Addr and Exchange", + config: &Config{Addr: " ", Exchange: " "}, + expectedErrr: "missing config.Addr", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := Validate(tt.config) + if err == nil { + t.Errorf("expected error for invalid config, got nil") + return + } + if !strings.Contains(err.Error(), tt.expectedErrr) { + t.Errorf("expected error to contain %q, got: %v", tt.expectedErrr, err) + } + }) + } +} + +type mockChannelForPublish struct { + published bool + exchange string + key string + body []byte + fail bool +} + +func (m *mockChannelForPublish) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) error { + if m.fail { + return fmt.Errorf("simulated publish failure") + } + m.published = true + m.exchange = exchange + m.key = key + m.body = msg.Body + return nil +} + +func (m *mockChannelForPublish) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error { + return nil +} + +func (m *mockChannelForPublish) Close() error { + return nil +} + +func TestPublishSuccess(t *testing.T) { + mockCh := &mockChannelForPublish{} + + p := &Publisher{ + Channel: mockCh, + Config: &Config{ + Exchange: "mock.exchange", + RoutingKey: "mock.key", + }, + } + + err := p.Publish(context.Background(), "", []byte(`{"test": true}`)) + if err != nil { + t.Errorf("expected no error, got: %v", err) + } + + if !mockCh.published { + t.Error("expected message to be published, but it wasn't") + } + + if mockCh.exchange != "mock.exchange" || mockCh.key != "mock.key" { + t.Errorf("unexpected exchange or key. got (%s, %s)", mockCh.exchange, mockCh.key) + } +} + +func TestPublishFailure(t *testing.T) { + mockCh := &mockChannelForPublish{fail: true} + + p := &Publisher{ + Channel: mockCh, + Config: &Config{ + Exchange: "mock.exchange", + RoutingKey: "mock.key", + }, + } + + err := p.Publish(context.Background(), "", []byte(`{"test": true}`)) + if err == nil { + t.Error("expected error from failed publish, got nil") + } +} + +type mockChannel struct{} + +func (m *mockChannel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) error { + return nil +} +func (m *mockChannel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error { + return nil +} +func (m *mockChannel) Close() error { + return nil +} + +func TestNewPublisherSucess(t *testing.T) { + originalDialFunc := DialFunc + originalChannelFunc := ChannelFunc + defer func() { + DialFunc = originalDialFunc + ChannelFunc = originalChannelFunc + }() + + // mockedConn := &mockConnection{} + + DialFunc = func(url string) (*amqp091.Connection, error) { + return nil, nil + } + + ChannelFunc = func(conn *amqp091.Connection) (Channel, error) { + return &mockChannel{}, nil + } + + cfg := &Config{ + Addr: "localhost", + Exchange: "test-ex", + Durable: true, + RoutingKey: "test.key", + } + + t.Setenv("RABBITMQ_USERNAME", "user") + t.Setenv("RABBITMQ_PASSWORD", "pass") + + pub, cleanup, err := New(cfg) + if err != nil { + t.Fatalf("New() failed: %v", err) + } + if pub == nil { + t.Fatal("Publisher should not be nil") + } + if cleanup == nil { + t.Fatal("Cleanup should not be nil") + } + if err := cleanup(); err != nil { + t.Errorf("Cleanup failed: %v", err) + } +} + +func TestNewPublisherFailures(t *testing.T) { + tests := []struct { + name string + cfg *Config + dialFunc func(url string) (*amqp091.Connection, error) // Mocked dial function + envVars map[string]string + expectedError string + }{ + { + name: "ValidateFailure", + cfg: &Config{}, // invalid config + expectedError: "missing config.Addr", + }, + { + name: "GetConnURLFailure", + cfg: &Config{ + Addr: "localhost", + Exchange: "test-ex", + Durable: true, + RoutingKey: "test.key", + }, + envVars: map[string]string{ + "RABBITMQ_USERNAME": "", + "RABBITMQ_PASSWORD": "", + }, + expectedError: "missing RabbitMQ credentials in environment", + }, + { + name: "ConnectionFailure", + cfg: &Config{ + Addr: "localhost", + Exchange: "test-ex", + Durable: true, + RoutingKey: "test.key", + }, + dialFunc: func(url string) (*amqp091.Connection, error) { + return nil, fmt.Errorf("simulated connection failure") + }, + envVars: map[string]string{ + "RABBITMQ_USERNAME": "user", + "RABBITMQ_PASSWORD": "pass", + }, + expectedError: "failed to connect to RabbitMQ", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Set environment variables + for key, value := range tt.envVars { + t.Setenv(key, value) + } + + // Mock dialFunc if needed + originalDialFunc := DialFunc + if tt.dialFunc != nil { + DialFunc = tt.dialFunc + defer func() { + DialFunc = originalDialFunc + }() + } + + _, _, err := New(tt.cfg) + + if err == nil || (tt.expectedError != "" && !strings.Contains(err.Error(), tt.expectedError)) { + t.Errorf("Test %s failed: expected error containing %v, got: %v", tt.name, tt.expectedError, err) + } + }) + } +}