Resolved PR review comments
Resolved merge conflict with the develop branch
This commit is contained in:
116
.github/workflows/build-and-deploy-plugins.yml
vendored
Normal file
116
.github/workflows/build-and-deploy-plugins.yml
vendored
Normal file
@@ -0,0 +1,116 @@
|
||||
name: Build and Upload Plugins
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
target_branch:
|
||||
description: 'Branch to deploy'
|
||||
required: true
|
||||
default: 'beckn-onix-v1.0-develop'
|
||||
|
||||
|
||||
jobs:
|
||||
build-and-upload:
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
GCS_BUCKET: ${{ secrets.GCS_BUCKET }}
|
||||
PLUGIN_OUTPUT_DIR: ./generated
|
||||
ZIP_FILE: plugins_bundle.zip
|
||||
|
||||
steps:
|
||||
- name: Checkout this repo
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
ref: ${{ github.event.inputs.target_branch }}
|
||||
|
||||
- name: Show selected branch
|
||||
run: echo "Deploying branch:${{ github.event.inputs.target_branch }}"
|
||||
|
||||
|
||||
- name: Clone GitHub and Gerrit plugin repos
|
||||
run: |
|
||||
# Example GitHub clone
|
||||
git clone -b beckn-onix-v1.0-develop https://${{ secrets.PAT_GITHUB }}:@github.com/beckn/beckn-onix.git github-repo
|
||||
|
||||
# Example Gerrit clone
|
||||
git clone https://${{ secrets.GERRIT_USERNAME }}:${{ secrets.GERRIT_PAT }}@open-networks.googlesource.com/onix-dev gerrit-repo
|
||||
|
||||
- name: List directory structure
|
||||
run: |
|
||||
echo "📂 Contents of root:"
|
||||
ls -alh
|
||||
|
||||
echo "📂 Contents of GitHub repo:"
|
||||
ls -alh github-repo
|
||||
|
||||
echo "📂 Deep list of GitHub repo:"
|
||||
find github-repo
|
||||
|
||||
echo "📂 Contents of Gerrit repo:"
|
||||
ls -alh gerrit-repo
|
||||
|
||||
echo "📂 Deep list of Gerrit repo:"
|
||||
find gerrit-repo
|
||||
|
||||
|
||||
- name: Build Go plugins in Docker
|
||||
run: |
|
||||
set -e
|
||||
mkdir -p $PLUGIN_OUTPUT_DIR
|
||||
|
||||
BUILD_CMDS=""
|
||||
|
||||
# GitHub plugins
|
||||
for dir in github-repo/pkg/plugin/implementation/*; do
|
||||
if [ -d "$dir/cmd" ]; then
|
||||
plugin=$(basename "$dir")
|
||||
BUILD_CMDS+="cd github-repo && go build -buildmode=plugin -buildvcs=false -o ../${PLUGIN_OUTPUT_DIR}/${plugin}.so ./pkg/plugin/implementation/${plugin}/cmd && cd - && "
|
||||
fi
|
||||
done
|
||||
|
||||
# Gerrit plugins — build in their own repo/module context
|
||||
for dir in gerrit-repo/plugins/*; do
|
||||
if [ -d "$dir/cmd" ]; then
|
||||
plugin=$(basename "$dir")
|
||||
BUILD_CMDS+="cd gerrit-repo && go build -buildmode=plugin -buildvcs=false -o ../${PLUGIN_OUTPUT_DIR}/${plugin}.so ./plugins/${plugin}/cmd && cd - && "
|
||||
fi
|
||||
done
|
||||
|
||||
BUILD_CMDS=${BUILD_CMDS%" && "}
|
||||
echo "🛠️ Running build commands inside Docker:"
|
||||
echo "$BUILD_CMDS"
|
||||
|
||||
docker run --rm -v "$(pwd)":/app -w /app golang:1.24-bullseye sh -c "$BUILD_CMDS"
|
||||
|
||||
- name: List built plugin files
|
||||
run: |
|
||||
echo "Looking in $PLUGIN_OUTPUT_DIR"
|
||||
ls -lh $PLUGIN_OUTPUT_DIR || echo "⚠️ Directory does not exist"
|
||||
find $PLUGIN_OUTPUT_DIR -name '*.so' || echo "⚠️ No .so files found"
|
||||
|
||||
echo "Creating zip archive..."
|
||||
cd "$PLUGIN_OUTPUT_DIR"
|
||||
zip -r "../$ZIP_FILE" *.so
|
||||
echo "Created $ZIP_FILE"
|
||||
cd ..
|
||||
|
||||
- name: List zip output
|
||||
run: |
|
||||
ls -lh plugins_bundle.zip
|
||||
|
||||
|
||||
- name: Authenticate to GCP
|
||||
run: |
|
||||
echo '${{ secrets.GOOGLE_APPLICATION_CREDENTIALS_JSON }}' > gcloud-key.json
|
||||
gcloud auth activate-service-account --key-file=gcloud-key.json
|
||||
gcloud config set project trusty-relic-370809
|
||||
env:
|
||||
GOOGLE_APPLICATION_CREDENTIALS: gcloud-key.json
|
||||
|
||||
- name: Upload to GCS
|
||||
run: |
|
||||
gsutil -m cp -r $ZIP_FILE gs://${GCS_BUCKET}/plugins/
|
||||
|
||||
- name: Cleanup
|
||||
run: |
|
||||
rm -rf $PLUGIN_OUTPUT_DIR $ZIP_FILE gcloud-key.json
|
||||
57
.github/workflows/deploy-to-gke-BS.yml
vendored
Normal file
57
.github/workflows/deploy-to-gke-BS.yml
vendored
Normal file
@@ -0,0 +1,57 @@
|
||||
name: CI/CD to GKE updated
|
||||
|
||||
on:
|
||||
#push:
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
deploy:
|
||||
name: Build and Deploy to GKE
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Checkout Code
|
||||
uses: actions/checkout@v3
|
||||
|
||||
- name: Authenticate to Google Cloud
|
||||
uses: google-github-actions/auth@v2
|
||||
with:
|
||||
credentials_json: '${{ secrets.GOOGLE_APPLICATION_CREDENTIALS_JSON }}'
|
||||
|
||||
- name: Set up gcloud CLI
|
||||
uses: google-github-actions/setup-gcloud@v1
|
||||
with:
|
||||
project_id: ${{ secrets.GCP_PROJECT }}
|
||||
export_default_credentials: true
|
||||
|
||||
- name: Install GKE Auth Plugin
|
||||
run: gcloud components install gke-gcloud-auth-plugin --quiet
|
||||
|
||||
- name: Configure Docker to use Artifact Registry
|
||||
run: gcloud auth configure-docker ${{ secrets.GCP_REGION }}-docker.pkg.dev
|
||||
|
||||
- name: Build Docker Image
|
||||
run: |
|
||||
IMAGE_NAME=${{ secrets.GCP_REGION }}-docker.pkg.dev/${{ secrets.GCP_PROJECT }}/${{ secrets.GCP_REPO }}/beckn-onix:${{ github.sha }}
|
||||
docker build -f Dockerfile.adapter -t $IMAGE_NAME .
|
||||
docker push $IMAGE_NAME
|
||||
|
||||
- name: Get GKE Credentials
|
||||
run: |
|
||||
gcloud container clusters get-credentials ${{ secrets.GKE_CLUSTER }} \
|
||||
--zone ${{ secrets.GCP_REGION }} \
|
||||
--project ${{ secrets.GCP_PROJECT }}
|
||||
|
||||
- name: Deploy to GKE using Kubernetes Manifests
|
||||
run: |
|
||||
IMAGE_NAME=${{ secrets.GCP_REGION }}-docker.pkg.dev/${{ secrets.GCP_PROJECT }}/${{ secrets.GCP_REPO }}/beckn-onix:${{ github.sha }}
|
||||
|
||||
# Replace image in deployment YAML
|
||||
sed -i "s|image: .*|image: $IMAGE_NAME|g" Deployment/deployment.yaml
|
||||
|
||||
# Apply Kubernetes manifests
|
||||
kubectl apply -f Deployment/deployment.yaml --namespace=onix-adapter
|
||||
kubectl apply -f Deployment/service.yaml --namespace=onix-adapter
|
||||
|
||||
# Wait for rollout to complete
|
||||
kubectl rollout status Deployment/onix-demo-adapter --namespace=onix-adapter
|
||||
44
.github/workflows/deploy-to-gke.yml
vendored
Normal file
44
.github/workflows/deploy-to-gke.yml
vendored
Normal file
@@ -0,0 +1,44 @@
|
||||
name: Deploy to GKE
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
service_name:
|
||||
description: 'Name of the Kubernetes service to deploy'
|
||||
required: true
|
||||
type: string
|
||||
cluster_name:
|
||||
description: 'Name of the GKE cluster'
|
||||
required: true
|
||||
type: string
|
||||
|
||||
jobs:
|
||||
deploy:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
env:
|
||||
PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }}
|
||||
REGION: ${{ secrets.GCP_REGION }}
|
||||
GKE_CLUSTER: ${{ github.event.inputs.cluster_name }}
|
||||
SERVICE_NAME: ${{ github.event.inputs.service_name }}
|
||||
|
||||
steps:
|
||||
- name: Checkout source
|
||||
uses: actions/checkout@v3
|
||||
|
||||
- name: Authenticate to Google Cloud
|
||||
uses: google-github-actions/auth@v2
|
||||
with:
|
||||
credentials_json: ${{ secrets.GCP_SA_KEY }}
|
||||
|
||||
- name: Set up GKE credentials
|
||||
uses: google-github-actions/get-gke-credentials@v1
|
||||
with:
|
||||
cluster_name: ${{ env.GKE_CLUSTER }}
|
||||
location: ${{ env.REGION }}
|
||||
project_id: ${{ env.PROJECT_ID }}
|
||||
|
||||
- name: Deploy to GKE
|
||||
run: |
|
||||
echo "Deploying service $SERVICE_NAME to cluster $GKE_CLUSTER"
|
||||
kubectl set image deployment/$SERVICE_NAME $SERVICE_NAME=gcr.io/$PROJECT_ID/$SERVICE_NAME:latest --record
|
||||
66
.github/workflows/onix-gcp-terraform-deploy.yml
vendored
Normal file
66
.github/workflows/onix-gcp-terraform-deploy.yml
vendored
Normal file
@@ -0,0 +1,66 @@
|
||||
name: Terraform Deploy to GCP
|
||||
|
||||
on:
|
||||
push:
|
||||
workflow_dispatch: # Manual triggerr
|
||||
|
||||
jobs:
|
||||
plan:
|
||||
name: Terraform Plan Only
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Checkout this repository
|
||||
uses: actions/checkout@v3
|
||||
|
||||
- name: Clone Terraform repo from Gerrit
|
||||
run: |
|
||||
git clone https://${{ secrets.GERRIT_USERNAME }}:${{ secrets.GERRIT_PAT }}@open-networks.googlesource.com/onix-dev gerrit-repo
|
||||
echo "==== Contents of Terraform-dir ===="
|
||||
pwd
|
||||
cd gerrit-repo/Terraform-CICD
|
||||
pwd
|
||||
ls -la
|
||||
|
||||
- name: Authenticate to Google Cloud
|
||||
run: echo '${{ secrets.GOOGLE_APPLICATION_CREDENTIALS_JSON }}' > gcp-key.json
|
||||
|
||||
- name: Set up Terraform
|
||||
uses: hashicorp/setup-terraform@v3
|
||||
with:
|
||||
terraform_version: 1.5.0
|
||||
|
||||
- name: Write GCP credentials to file
|
||||
run: echo '${{ secrets.GOOGLE_APPLICATION_CREDENTIALS_JSON }}' > gcp-key.json
|
||||
|
||||
- name: Export GCP credentials environment variable
|
||||
run: echo "GOOGLE_APPLICATION_CREDENTIALS=$GITHUB_WORKSPACE/gcp-key.json" >> $GITHUB_ENV
|
||||
|
||||
- name: Create backend.tf and Terraform Init
|
||||
working-directory: ./gerrit-repo/Terraform-CICD
|
||||
env:
|
||||
GCS_BUCKET: beckn-cicd-tf-state-bucket
|
||||
run: |
|
||||
cat <<EOF > backend.tf
|
||||
terraform {
|
||||
backend "gcs" {
|
||||
bucket = "${GCS_BUCKET}"
|
||||
prefix = "terraform/state"
|
||||
credentials = "${{ github.workspace }}/gcp-key.json"
|
||||
}
|
||||
}
|
||||
EOF
|
||||
|
||||
terraform init
|
||||
|
||||
- name: Terraform Plan
|
||||
working-directory: ./gerrit-repo/Terraform-CICD
|
||||
run: terraform plan
|
||||
|
||||
- name: Terraform Apply
|
||||
working-directory: ./gerrit-repo/Terraform-CICD
|
||||
run: terraform apply -var="subnet_name=onix-gke-subnet" -auto-approve
|
||||
|
||||
- name: Clean up credentials
|
||||
run: rm -f gcp-key.json
|
||||
|
||||
40
Deployment/deployment.yaml
Normal file
40
Deployment/deployment.yaml
Normal file
@@ -0,0 +1,40 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: onix-demo-adapter
|
||||
namespace: onix-adapter #------
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app: onix-adapter
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: onix-adapter
|
||||
annotations:
|
||||
gke-gcsfuse/volumes: "true"
|
||||
spec:
|
||||
serviceAccountName: "onix-adapter-ksa" #-----------
|
||||
containers:
|
||||
- name: onix-adapter
|
||||
image: "asia-south1-docker.pkg.dev/trusty-relic-370809/onix-adapter-cicd/beckn-onix:latest" #------
|
||||
ports:
|
||||
- containerPort: 8080
|
||||
env:
|
||||
- name: CONFIG_FILE
|
||||
value: "/mnt/gcs/configs/onix-adapter.yaml" # Updated to GCS path
|
||||
|
||||
volumeMounts:
|
||||
- name: gcs-bucket
|
||||
mountPath: /mnt/gcs
|
||||
readOnly: false
|
||||
|
||||
volumes:
|
||||
- name: gcs-bucket
|
||||
csi:
|
||||
driver: gcsfuse.csi.storage.gke.io
|
||||
readOnly: false
|
||||
volumeAttributes:
|
||||
bucketName: "beckn-cicd-bucket" #----------
|
||||
mountOptions: "implicit-dirs"
|
||||
13
Deployment/service.yaml
Normal file
13
Deployment/service.yaml
Normal file
@@ -0,0 +1,13 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: onix-adapter-service
|
||||
namespace: onix-adapter # Namespace
|
||||
spec:
|
||||
selector:
|
||||
app: onix-adapter # This should match the app name in deployment.yaml
|
||||
ports:
|
||||
- protocol: TCP
|
||||
port: 80
|
||||
targetPort: 8080
|
||||
type: LoadBalancer #NodePort or LoadBalancer
|
||||
1
go.mod
1
go.mod
@@ -49,6 +49,7 @@ require (
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/hashicorp/go-retryablehttp v0.7.7
|
||||
github.com/hashicorp/vault/api v1.16.0
|
||||
github.com/rabbitmq/amqp091-go v1.10.0
|
||||
github.com/rs/zerolog v1.34.0
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
||||
gopkg.in/yaml.v2 v2.4.0
|
||||
|
||||
4
go.sum
4
go.sum
@@ -75,6 +75,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
|
||||
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
||||
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
||||
@@ -94,6 +96,8 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03 h1:m1h+vudopHsI67FPT9MOncyndWhTcdUoBtI1R1uajGY=
|
||||
github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03/go.mod h1:8sheVFH84v3PCyFY/O02mIgSQY9I6wMYPWsq7mDnEZY=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34=
|
||||
golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc=
|
||||
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
|
||||
|
||||
@@ -11,7 +11,7 @@ type KeyManager interface {
|
||||
GenerateKeyset() (*model.Keyset, error)
|
||||
InsertKeyset(ctx context.Context, keyID string, keyset *model.Keyset) error
|
||||
Keyset(ctx context.Context, keyID string) (*model.Keyset, error)
|
||||
LookupNPKeys(ctx context.Context, subscriberID, uniqueKeyID string) (string, string, error)
|
||||
LookupNPKeys(ctx context.Context, subscriberID, uniqueKeyID string) (signingPublicKey string, encrPublicKey string, err error)
|
||||
DeleteKeyset(ctx context.Context, keyID string) error
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ type Publisher interface {
|
||||
Publish(context.Context, string, []byte) error
|
||||
}
|
||||
|
||||
// PublisherProvider is the interface for creating new Publisher instances.
|
||||
type PublisherProvider interface {
|
||||
// New initializes a new publisher instance with the given configuration.
|
||||
New(ctx context.Context, config map[string]string) (Publisher, func() error, error)
|
||||
|
||||
37
pkg/plugin/implementation/publisher/cmd/plugin.go
Normal file
37
pkg/plugin/implementation/publisher/cmd/plugin.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/beckn/beckn-onix/pkg/log"
|
||||
"github.com/beckn/beckn-onix/pkg/plugin/definition"
|
||||
"github.com/beckn/beckn-onix/pkg/plugin/implementation/publisher"
|
||||
)
|
||||
|
||||
// publisherProvider implements the PublisherProvider interface.
|
||||
// It is responsible for creating a new Publisher instance.
|
||||
type publisherProvider struct{}
|
||||
|
||||
// New creates a new Publisher instance based on the provided configuration.
|
||||
func (p *publisherProvider) New(ctx context.Context, config map[string]string) (definition.Publisher, func() error, error) {
|
||||
cfg := &publisher.Config{
|
||||
Addr: config["addr"],
|
||||
Exchange: config["exchange"],
|
||||
RoutingKey: config["routing_key"],
|
||||
Durable: config["durable"] == "true",
|
||||
UseTLS: config["use_tls"] == "true",
|
||||
}
|
||||
log.Debugf(ctx, "Publisher config mapped: %+v", cfg)
|
||||
|
||||
pub, cleanup, err := publisher.New(cfg)
|
||||
if err != nil {
|
||||
log.Errorf(ctx, err, "Failed to create publisher instance")
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
log.Infof(ctx, "Publisher instance created successfully")
|
||||
return pub, cleanup, nil
|
||||
}
|
||||
|
||||
// Provider is the instance of publisherProvider that implements the PublisherProvider interface.
|
||||
var Provider = publisherProvider{}
|
||||
106
pkg/plugin/implementation/publisher/cmd/plugin_test.go
Normal file
106
pkg/plugin/implementation/publisher/cmd/plugin_test.go
Normal file
@@ -0,0 +1,106 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/beckn/beckn-onix/pkg/plugin/implementation/publisher"
|
||||
"github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
type mockChannel struct{}
|
||||
|
||||
func (m *mockChannel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) error {
|
||||
return nil
|
||||
}
|
||||
func (m *mockChannel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error {
|
||||
return nil
|
||||
}
|
||||
func (m *mockChannel) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestPublisherProvider_New_Success(t *testing.T) {
|
||||
// Save original dialFunc and channelFunc
|
||||
originalDialFunc := publisher.DialFunc
|
||||
originalChannelFunc := publisher.ChannelFunc
|
||||
defer func() {
|
||||
publisher.DialFunc = originalDialFunc
|
||||
publisher.ChannelFunc = originalChannelFunc
|
||||
}()
|
||||
|
||||
// Override mocks
|
||||
publisher.DialFunc = func(url string) (*amqp091.Connection, error) {
|
||||
return nil, nil
|
||||
}
|
||||
publisher.ChannelFunc = func(conn *amqp091.Connection) (publisher.Channel, error) {
|
||||
return &mockChannel{}, nil
|
||||
}
|
||||
|
||||
t.Setenv("RABBITMQ_USERNAME", "guest")
|
||||
t.Setenv("RABBITMQ_PASSWORD", "guest")
|
||||
|
||||
config := map[string]string{
|
||||
"addr": "localhost",
|
||||
"exchange": "test-exchange",
|
||||
"routing_key": "test.key",
|
||||
"durable": "true",
|
||||
"use_tls": "false",
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
pub, cleanup, err := Provider.New(ctx, config)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Provider.New returned error: %v", err)
|
||||
}
|
||||
if pub == nil {
|
||||
t.Fatal("Expected non-nil publisher")
|
||||
}
|
||||
if cleanup == nil {
|
||||
t.Fatal("Expected non-nil cleanup function")
|
||||
}
|
||||
|
||||
if err := cleanup(); err != nil {
|
||||
t.Errorf("Cleanup returned error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPublisherProvider_New_Failure(t *testing.T) {
|
||||
// Save and restore dialFunc
|
||||
originalDialFunc := publisher.DialFunc
|
||||
defer func() { publisher.DialFunc = originalDialFunc }()
|
||||
|
||||
// Simulate dial failure
|
||||
publisher.DialFunc = func(url string) (*amqp091.Connection, error) {
|
||||
return nil, errors.New("dial failed")
|
||||
}
|
||||
|
||||
t.Setenv("RABBITMQ_USERNAME", "guest")
|
||||
t.Setenv("RABBITMQ_PASSWORD", "guest")
|
||||
|
||||
config := map[string]string{
|
||||
"addr": "localhost",
|
||||
"exchange": "test-exchange",
|
||||
"routing_key": "test.key",
|
||||
"durable": "true",
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
pub, cleanup, err := Provider.New(ctx, config)
|
||||
|
||||
if err == nil {
|
||||
t.Fatal("Expected error from Provider.New but got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "dial failed") {
|
||||
t.Errorf("Expected 'dial failed' error, got: %v", err)
|
||||
}
|
||||
if pub != nil {
|
||||
t.Errorf("Expected nil publisher, got: %v", pub)
|
||||
}
|
||||
if cleanup != nil {
|
||||
t.Error("Expected nil cleanup, got non-nil")
|
||||
}
|
||||
}
|
||||
196
pkg/plugin/implementation/publisher/publisher.go
Normal file
196
pkg/plugin/implementation/publisher/publisher.go
Normal file
@@ -0,0 +1,196 @@
|
||||
package publisher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/beckn/beckn-onix/pkg/log"
|
||||
"github.com/beckn/beckn-onix/pkg/model"
|
||||
"github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
// Config holds the configuration required to establish a connection with RabbitMQ.
|
||||
type Config struct {
|
||||
Addr string
|
||||
Exchange string
|
||||
RoutingKey string
|
||||
Durable bool
|
||||
UseTLS bool
|
||||
}
|
||||
|
||||
// Channel defines the interface for publishing messages to RabbitMQ.
|
||||
type Channel interface {
|
||||
PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) error
|
||||
ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
// Publisher manages the RabbitMQ connection and channel to publish messages.
|
||||
type Publisher struct {
|
||||
Conn *amqp091.Connection
|
||||
Channel Channel
|
||||
Config *Config
|
||||
}
|
||||
|
||||
// Error variables representing different failure scenarios.
|
||||
var (
|
||||
ErrEmptyConfig = errors.New("empty config")
|
||||
ErrAddrMissing = errors.New("missing required field 'Addr'")
|
||||
ErrExchangeMissing = errors.New("missing required field 'Exchange'")
|
||||
ErrCredentialMissing = errors.New("missing RabbitMQ credentials in environment")
|
||||
ErrConnectionFailed = errors.New("failed to connect to RabbitMQ")
|
||||
ErrChannelFailed = errors.New("failed to open channel")
|
||||
ErrExchangeDeclare = errors.New("failed to declare exchange")
|
||||
)
|
||||
|
||||
// Validate checks whether the provided Config is valid for connecting to RabbitMQ.
|
||||
func Validate(cfg *Config) error {
|
||||
if cfg == nil {
|
||||
return model.NewBadReqErr(fmt.Errorf("config is nil"))
|
||||
}
|
||||
if strings.TrimSpace(cfg.Addr) == "" {
|
||||
return model.NewBadReqErr(fmt.Errorf("missing config.Addr"))
|
||||
}
|
||||
if strings.TrimSpace(cfg.Exchange) == "" {
|
||||
return model.NewBadReqErr(fmt.Errorf("missing config.Exchange"))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetConnURL constructs the RabbitMQ connection URL using the config and environment credentials.
|
||||
func GetConnURL(cfg *Config) (string, error) {
|
||||
user := os.Getenv("RABBITMQ_USERNAME")
|
||||
pass := os.Getenv("RABBITMQ_PASSWORD")
|
||||
if user == "" || pass == "" {
|
||||
return "", model.NewBadReqErr(fmt.Errorf("missing RabbitMQ credentials in environment"))
|
||||
}
|
||||
parts := strings.SplitN(strings.TrimSpace(cfg.Addr), "/", 2)
|
||||
hostPort := parts[0]
|
||||
vhost := "/"
|
||||
if len(parts) > 1 {
|
||||
vhost = parts[1]
|
||||
}
|
||||
|
||||
if !strings.Contains(hostPort, ":") {
|
||||
if cfg.UseTLS {
|
||||
hostPort += ":5671"
|
||||
} else {
|
||||
hostPort += ":5672"
|
||||
}
|
||||
}
|
||||
|
||||
encodedUser := url.QueryEscape(user)
|
||||
encodedPass := url.QueryEscape(pass)
|
||||
encodedVHost := url.QueryEscape(vhost)
|
||||
protocol := "amqp"
|
||||
if cfg.UseTLS {
|
||||
protocol = "amqps"
|
||||
}
|
||||
|
||||
connURL := fmt.Sprintf("%s://%s:%s@%s/%s", protocol, encodedUser, encodedPass, hostPort, encodedVHost)
|
||||
log.Debugf(context.Background(), "Generated RabbitMQ connection details: protocol=%s, hostPort=%s, vhost=%s", protocol, hostPort, vhost)
|
||||
|
||||
return connURL, nil
|
||||
}
|
||||
|
||||
// Publish sends a message to the configured RabbitMQ exchange with the specified routing key.
|
||||
// If routingKey is empty, the default routing key from Config is used.
|
||||
func (p *Publisher) Publish(ctx context.Context, routingKey string, msg []byte) error {
|
||||
if routingKey == "" {
|
||||
routingKey = p.Config.RoutingKey
|
||||
}
|
||||
log.Debugf(ctx, "Attempting to publish message. Exchange: %s, RoutingKey: %s", p.Config.Exchange, routingKey)
|
||||
err := p.Channel.PublishWithContext(
|
||||
ctx,
|
||||
p.Config.Exchange,
|
||||
routingKey,
|
||||
false,
|
||||
false,
|
||||
amqp091.Publishing{
|
||||
ContentType: "application/json",
|
||||
Body: msg,
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf(ctx, err, "Publish failed for Exchange: %s, RoutingKey: %s", p.Config.Exchange, routingKey)
|
||||
return model.NewBadReqErr(fmt.Errorf("publish message failed: %w", err))
|
||||
}
|
||||
|
||||
log.Infof(ctx, "Message published successfully to Exchange: %s, RoutingKey: %s", p.Config.Exchange, routingKey)
|
||||
return nil
|
||||
}
|
||||
|
||||
// DialFunc is a function variable used to establish a connection to RabbitMQ.
|
||||
var DialFunc = amqp091.Dial
|
||||
|
||||
// ChannelFunc is a function variable used to open a channel on the given RabbitMQ connection.
|
||||
var ChannelFunc = func(conn *amqp091.Connection) (Channel, error) {
|
||||
return conn.Channel()
|
||||
}
|
||||
|
||||
// New initializes a new Publisher with the given config, opens a connection,
|
||||
// channel, and declares the exchange. Returns the publisher and a cleanup function.
|
||||
func New(cfg *Config) (*Publisher, func() error, error) {
|
||||
// Step 1: Validate config
|
||||
if err := Validate(cfg); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Step 2: Build connection URL
|
||||
connURL, err := GetConnURL(cfg)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("%w: %v", ErrConnectionFailed, err)
|
||||
}
|
||||
|
||||
// Step 3: Dial connection
|
||||
conn, err := DialFunc(connURL)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("%w: %v", ErrConnectionFailed, err)
|
||||
}
|
||||
|
||||
// Step 4: Open channel
|
||||
ch, err := ChannelFunc(conn)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return nil, nil, fmt.Errorf("%w: %v", ErrChannelFailed, err)
|
||||
}
|
||||
|
||||
// Step 5: Declare exchange
|
||||
if err := ch.ExchangeDeclare(
|
||||
cfg.Exchange,
|
||||
"topic",
|
||||
cfg.Durable,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
); err != nil {
|
||||
ch.Close()
|
||||
conn.Close()
|
||||
return nil, nil, fmt.Errorf("%w: %v", ErrExchangeDeclare, err)
|
||||
}
|
||||
|
||||
// Step 6: Construct publisher
|
||||
pub := &Publisher{
|
||||
Conn: conn,
|
||||
Channel: ch,
|
||||
Config: cfg,
|
||||
}
|
||||
|
||||
cleanup := func() error {
|
||||
if ch != nil {
|
||||
_ = ch.Close()
|
||||
}
|
||||
if conn != nil {
|
||||
return conn.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return pub, cleanup, nil
|
||||
}
|
||||
362
pkg/plugin/implementation/publisher/publisher_test.go
Normal file
362
pkg/plugin/implementation/publisher/publisher_test.go
Normal file
@@ -0,0 +1,362 @@
|
||||
package publisher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
func TestGetConnURLSuccess(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
config *Config
|
||||
}{
|
||||
{
|
||||
name: "Valid config with connection address",
|
||||
config: &Config{
|
||||
Addr: "localhost:5672",
|
||||
UseTLS: false,
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
name: "Valid config with vhost",
|
||||
config: &Config{
|
||||
Addr: "localhost:5672/myvhost",
|
||||
UseTLS: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Addr with leading and trailing spaces",
|
||||
config: &Config{
|
||||
Addr: " localhost:5672/myvhost ",
|
||||
UseTLS: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Set valid credentials
|
||||
t.Setenv("RABBITMQ_USERNAME", "guest")
|
||||
t.Setenv("RABBITMQ_PASSWORD", "guest")
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
url, err := GetConnURL(tt.config)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if url == "" {
|
||||
t.Error("expected non-empty URL, got empty string")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetConnURLFailure(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
username string
|
||||
password string
|
||||
config *Config
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "Missing credentials",
|
||||
username: "",
|
||||
password: "",
|
||||
config: &Config{Addr: "localhost:5672"},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "Missing config address",
|
||||
username: "guest",
|
||||
password: "guest",
|
||||
config: &Config{}, // this won't error unless Validate() is called separately
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if tt.username != "" {
|
||||
t.Setenv("RABBITMQ_USERNAME", tt.username)
|
||||
}
|
||||
|
||||
if tt.password != "" {
|
||||
t.Setenv("RABBITMQ_PASSWORD", tt.password)
|
||||
}
|
||||
|
||||
url, err := GetConnURL(tt.config)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("unexpected error. gotErr = %v, wantErr = %v", err != nil, tt.wantErr)
|
||||
}
|
||||
|
||||
if err == nil && url == "" {
|
||||
t.Errorf("expected non-empty URL, got empty string")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateSuccess(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
config *Config
|
||||
}{
|
||||
{
|
||||
name: "Valid config with Addr and Exchange",
|
||||
config: &Config{
|
||||
Addr: "localhost:5672",
|
||||
Exchange: "ex",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if err := Validate(tt.config); err != nil {
|
||||
t.Errorf("expected no error, got: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateFailure(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
config *Config
|
||||
expectedErrr string
|
||||
}{
|
||||
{
|
||||
name: "Nil config",
|
||||
config: nil,
|
||||
expectedErrr: "config is nil",
|
||||
},
|
||||
{
|
||||
name: "Missing Addr",
|
||||
config: &Config{Exchange: "ex"},
|
||||
expectedErrr: "missing config.Addr",
|
||||
},
|
||||
{
|
||||
name: "Missing Exchange",
|
||||
config: &Config{Addr: "localhost:5672"},
|
||||
expectedErrr: "missing config.Exchange",
|
||||
},
|
||||
{
|
||||
name: "Empty Addr and Exchange",
|
||||
config: &Config{Addr: " ", Exchange: " "},
|
||||
expectedErrr: "missing config.Addr",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := Validate(tt.config)
|
||||
if err == nil {
|
||||
t.Errorf("expected error for invalid config, got nil")
|
||||
return
|
||||
}
|
||||
if !strings.Contains(err.Error(), tt.expectedErrr) {
|
||||
t.Errorf("expected error to contain %q, got: %v", tt.expectedErrr, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type mockChannelForPublish struct {
|
||||
published bool
|
||||
exchange string
|
||||
key string
|
||||
body []byte
|
||||
fail bool
|
||||
}
|
||||
|
||||
func (m *mockChannelForPublish) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) error {
|
||||
if m.fail {
|
||||
return fmt.Errorf("simulated publish failure")
|
||||
}
|
||||
m.published = true
|
||||
m.exchange = exchange
|
||||
m.key = key
|
||||
m.body = msg.Body
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockChannelForPublish) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockChannelForPublish) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestPublishSuccess(t *testing.T) {
|
||||
mockCh := &mockChannelForPublish{}
|
||||
|
||||
p := &Publisher{
|
||||
Channel: mockCh,
|
||||
Config: &Config{
|
||||
Exchange: "mock.exchange",
|
||||
RoutingKey: "mock.key",
|
||||
},
|
||||
}
|
||||
|
||||
err := p.Publish(context.Background(), "", []byte(`{"test": true}`))
|
||||
if err != nil {
|
||||
t.Errorf("expected no error, got: %v", err)
|
||||
}
|
||||
|
||||
if !mockCh.published {
|
||||
t.Error("expected message to be published, but it wasn't")
|
||||
}
|
||||
|
||||
if mockCh.exchange != "mock.exchange" || mockCh.key != "mock.key" {
|
||||
t.Errorf("unexpected exchange or key. got (%s, %s)", mockCh.exchange, mockCh.key)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPublishFailure(t *testing.T) {
|
||||
mockCh := &mockChannelForPublish{fail: true}
|
||||
|
||||
p := &Publisher{
|
||||
Channel: mockCh,
|
||||
Config: &Config{
|
||||
Exchange: "mock.exchange",
|
||||
RoutingKey: "mock.key",
|
||||
},
|
||||
}
|
||||
|
||||
err := p.Publish(context.Background(), "", []byte(`{"test": true}`))
|
||||
if err == nil {
|
||||
t.Error("expected error from failed publish, got nil")
|
||||
}
|
||||
}
|
||||
|
||||
type mockChannel struct{}
|
||||
|
||||
func (m *mockChannel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) error {
|
||||
return nil
|
||||
}
|
||||
func (m *mockChannel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error {
|
||||
return nil
|
||||
}
|
||||
func (m *mockChannel) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestNewPublisherSucess(t *testing.T) {
|
||||
originalDialFunc := DialFunc
|
||||
originalChannelFunc := ChannelFunc
|
||||
defer func() {
|
||||
DialFunc = originalDialFunc
|
||||
ChannelFunc = originalChannelFunc
|
||||
}()
|
||||
|
||||
// mockedConn := &mockConnection{}
|
||||
|
||||
DialFunc = func(url string) (*amqp091.Connection, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
ChannelFunc = func(conn *amqp091.Connection) (Channel, error) {
|
||||
return &mockChannel{}, nil
|
||||
}
|
||||
|
||||
cfg := &Config{
|
||||
Addr: "localhost",
|
||||
Exchange: "test-ex",
|
||||
Durable: true,
|
||||
RoutingKey: "test.key",
|
||||
}
|
||||
|
||||
t.Setenv("RABBITMQ_USERNAME", "user")
|
||||
t.Setenv("RABBITMQ_PASSWORD", "pass")
|
||||
|
||||
pub, cleanup, err := New(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("New() failed: %v", err)
|
||||
}
|
||||
if pub == nil {
|
||||
t.Fatal("Publisher should not be nil")
|
||||
}
|
||||
if cleanup == nil {
|
||||
t.Fatal("Cleanup should not be nil")
|
||||
}
|
||||
if err := cleanup(); err != nil {
|
||||
t.Errorf("Cleanup failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewPublisherFailures(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
cfg *Config
|
||||
dialFunc func(url string) (*amqp091.Connection, error) // Mocked dial function
|
||||
envVars map[string]string
|
||||
expectedError string
|
||||
}{
|
||||
{
|
||||
name: "ValidateFailure",
|
||||
cfg: &Config{}, // invalid config
|
||||
expectedError: "missing config.Addr",
|
||||
},
|
||||
{
|
||||
name: "GetConnURLFailure",
|
||||
cfg: &Config{
|
||||
Addr: "localhost",
|
||||
Exchange: "test-ex",
|
||||
Durable: true,
|
||||
RoutingKey: "test.key",
|
||||
},
|
||||
envVars: map[string]string{
|
||||
"RABBITMQ_USERNAME": "",
|
||||
"RABBITMQ_PASSWORD": "",
|
||||
},
|
||||
expectedError: "missing RabbitMQ credentials in environment",
|
||||
},
|
||||
{
|
||||
name: "ConnectionFailure",
|
||||
cfg: &Config{
|
||||
Addr: "localhost",
|
||||
Exchange: "test-ex",
|
||||
Durable: true,
|
||||
RoutingKey: "test.key",
|
||||
},
|
||||
dialFunc: func(url string) (*amqp091.Connection, error) {
|
||||
return nil, fmt.Errorf("simulated connection failure")
|
||||
},
|
||||
envVars: map[string]string{
|
||||
"RABBITMQ_USERNAME": "user",
|
||||
"RABBITMQ_PASSWORD": "pass",
|
||||
},
|
||||
expectedError: "failed to connect to RabbitMQ",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// Set environment variables
|
||||
for key, value := range tt.envVars {
|
||||
t.Setenv(key, value)
|
||||
}
|
||||
|
||||
// Mock dialFunc if needed
|
||||
originalDialFunc := DialFunc
|
||||
if tt.dialFunc != nil {
|
||||
DialFunc = tt.dialFunc
|
||||
defer func() {
|
||||
DialFunc = originalDialFunc
|
||||
}()
|
||||
}
|
||||
|
||||
_, _, err := New(tt.cfg)
|
||||
|
||||
if err == nil || (tt.expectedError != "" && !strings.Contains(err.Error(), tt.expectedError)) {
|
||||
t.Errorf("Test %s failed: expected error containing %v, got: %v", tt.name, tt.expectedError, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user