Merge pull request #522 from Beckn-One/509-DeDI-registry-plugin

ISSUE [509] - feat: Add DeDi registry plugin implementation for lookup
This commit is contained in:
Anand Parthasarathy
2025-09-23 21:53:15 +05:30
committed by GitHub
7 changed files with 870 additions and 0 deletions

View File

@@ -14,6 +14,7 @@ plugins=(
"simplekeymanager"
"publisher"
"registry"
"dediregistry"
"reqpreprocessor"
"router"
"schemavalidator"

View File

@@ -0,0 +1,153 @@
# DeDi Registry Plugin
A Beckn-ONIX registry type plugin for integrating with DeDi registry services. Implements the `RegistryLookup` interface to provide participant information and public keys.
## Overview
The DeDi Registry plugin enables Beckn-ONIX to lookup DeDi registries for participant records, converting DeDi API responses to standard Beckn Subscription format for seamless integration with existing registry infrastructure.
## Features
- **RegistryLookup Interface**: Implements standard Beckn registry interface
- **DeDi API Integration**: GET requests to DeDi registry endpoints with Bearer authentication
- **Data Conversion**: Converts DeDi responses to Beckn Subscription format
- **HTTP Retry Logic**: Built-in retry mechanism using retryablehttp client
- **Timeout Control**: Configurable request timeouts
## Configuration
```yaml
plugins:
dediRegistry:
id: dediregistry
config:
baseURL: "https://dedi-registry.example.com"
apiKey: "your-api-key"
namespaceID: "beckn-network"
registryName: "participants"
recordName: "participant-id"
timeout: "30" # seconds
```
### Configuration Parameters
| Parameter | Required | Description | Default |
|-----------|----------|-------------|---------|
| `baseURL` | Yes | DeDi registry API base URL | - |
| `apiKey` | Yes | API key for authentication | - |
| `namespaceID` | Yes | DeDi namespace identifier | - |
| `registryName` | Yes | Registry name to query | - |
| `recordName` | Yes | Record name/identifier | - |
| `timeout` | No | Request timeout in seconds | 30 |
## Usage
### In Module Configuration
```yaml
modules:
- name: bapTxnReceiver
handler:
plugins:
dediRegistry:
id: dediregistry
config:
baseURL: "https://dedi-registry.example.com"
apiKey: "your-api-key"
namespaceID: "beckn-network"
registryName: "participants"
recordName: "participant-id"
```
### In Code
```go
// Load DeDi registry plugin
dediRegistry, err := manager.Registry(ctx, &plugin.Config{
ID: "dediregistry",
Config: map[string]string{
"baseURL": "https://dedi-registry.example.com",
"apiKey": "your-api-key",
"namespaceID": "beckn-network",
"registryName": "participants",
"recordName": "participant-id",
},
})
// Or use specific method
dediRegistry, err := manager.DeDiRegistry(ctx, config)
// Lookup participant (returns Beckn Subscription format)
subscription := &model.Subscription{}
results, err := dediRegistry.Lookup(ctx, subscription)
if err != nil {
return err
}
// Extract public key from first result
if len(results) > 0 {
publicKey := results[0].SigningPublicKey
subscriberID := results[0].SubscriberID
}
```
## API Response Structure
The plugin expects DeDi registry responses in this format:
```json
{
"message": "success",
"data": {
"namespace": "beckn",
"schema": {
"entity_name": "participant.example.com",
"entity_url": "https://participant.example.com",
"publicKey": "base64-encoded-public-key",
"keyType": "ed25519",
"keyFormat": "base64"
},
"state": "active",
"created_at": "2023-01-01T00:00:00Z",
"updated_at": "2023-01-01T00:00:00Z"
}
}
```
### Converted to Beckn Format
The plugin converts this to standard Beckn Subscription format:
```json
{
"subscriber_id": "participant.example.com",
"url": "https://participant.example.com",
"signing_public_key": "base64-encoded-public-key",
"status": "active",
"created": "2023-01-01T00:00:00Z",
"updated": "2023-01-01T00:00:00Z"
}
```
## Testing
Run plugin tests:
```bash
go test ./pkg/plugin/implementation/dediregistry -v
```
## Dependencies
- `github.com/hashicorp/go-retryablehttp`: HTTP client with retry logic
- Standard Go libraries for HTTP and JSON handling
## Integration Notes
- **Registry Type Plugin**: Implements `RegistryLookup` interface, not a separate plugin category
- **Interchangeable**: Can be used alongside or instead of standard registry plugin
- **Manager Integration**: Available via `manager.Registry()` or `manager.DeDiRegistry()` methods
- **Data Conversion**: Automatically converts DeDi format to Beckn Subscription format
- **Interface Compliance**: Implements `RegistryLookup` interface with `Lookup()` method only
- **Build Integration**: Included in `build-plugins.sh` script, compiles to `dediregistry.so`

View File

@@ -0,0 +1,51 @@
package main
import (
"context"
"errors"
"strconv"
"github.com/beckn-one/beckn-onix/pkg/log"
"github.com/beckn-one/beckn-onix/pkg/plugin/definition"
"github.com/beckn-one/beckn-onix/pkg/plugin/implementation/dediregistry"
)
// dediRegistryProvider implements the RegistryLookupProvider interface for the DeDi registry plugin.
type dediRegistryProvider struct{}
// New creates a new DeDi registry plugin instance.
func (d dediRegistryProvider) New(ctx context.Context, config map[string]string) (definition.RegistryLookup, func() error, error) {
if ctx == nil {
return nil, nil, errors.New("context cannot be nil")
}
// Create dediregistry.Config directly from map - validation is handled by dediregistry.New
dediConfig := &dediregistry.Config{
BaseURL: config["baseURL"],
ApiKey: config["apiKey"],
NamespaceID: config["namespaceID"],
RegistryName: config["registryName"],
RecordName: config["recordName"],
}
// Parse timeout if provided
if timeoutStr, exists := config["timeout"]; exists && timeoutStr != "" {
if timeout, err := strconv.Atoi(timeoutStr); err == nil {
dediConfig.Timeout = timeout
}
}
log.Debugf(ctx, "DeDi Registry config mapped: %+v", dediConfig)
dediClient, closer, err := dediregistry.New(ctx, dediConfig)
if err != nil {
log.Errorf(ctx, err, "Failed to create DeDi registry instance")
return nil, nil, err
}
log.Infof(ctx, "DeDi Registry instance created successfully")
return dediClient, closer, nil
}
// Provider is the exported plugin instance
var Provider = dediRegistryProvider{}

View File

@@ -0,0 +1,97 @@
package main
import (
"context"
"testing"
)
func TestDediRegistryProvider_New(t *testing.T) {
ctx := context.Background()
provider := dediRegistryProvider{}
config := map[string]string{
"baseURL": "https://test.com",
"apiKey": "test-key",
"namespaceID": "test-namespace",
"registryName": "test-registry",
"recordName": "test-record",
"timeout": "30",
}
dediRegistry, closer, err := provider.New(ctx, config)
if err != nil {
t.Errorf("New() error = %v", err)
return
}
if dediRegistry == nil {
t.Error("New() returned nil dediRegistry")
}
if closer == nil {
t.Error("New() returned nil closer")
}
// Test cleanup
if err := closer(); err != nil {
t.Errorf("closer() error = %v", err)
}
}
func TestDediRegistryProvider_New_InvalidConfig(t *testing.T) {
ctx := context.Background()
provider := dediRegistryProvider{}
tests := []struct {
name string
config map[string]string
}{
{
name: "missing baseURL",
config: map[string]string{"apiKey": "test-key"},
},
{
name: "missing apiKey",
config: map[string]string{"baseURL": "https://test.com"},
},
{
name: "empty config",
config: map[string]string{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, _, err := provider.New(ctx, tt.config)
if err == nil {
t.Errorf("New() with %s should return error", tt.name)
}
})
}
}
func TestDediRegistryProvider_New_InvalidTimeout(t *testing.T) {
ctx := context.Background()
provider := dediRegistryProvider{}
config := map[string]string{
"baseURL": "https://test.com",
"apiKey": "test-key",
"namespaceID": "test-namespace",
"registryName": "test-registry",
"recordName": "test-record",
"timeout": "invalid",
}
// Invalid timeout should be ignored, not cause error
dediRegistry, closer, err := provider.New(ctx, config)
if err != nil {
t.Errorf("New() with invalid timeout should not return error, got: %v", err)
}
if dediRegistry == nil {
t.Error("New() should return valid registry even with invalid timeout")
}
if closer != nil {
closer()
}
}

View File

@@ -0,0 +1,186 @@
package dediregistry
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"github.com/beckn-one/beckn-onix/pkg/log"
"github.com/beckn-one/beckn-onix/pkg/model"
"github.com/hashicorp/go-retryablehttp"
)
// Config holds configuration parameters for the DeDi registry client.
type Config struct {
BaseURL string `yaml:"baseURL" json:"baseURL"`
ApiKey string `yaml:"apiKey" json:"apiKey"`
NamespaceID string `yaml:"namespaceID" json:"namespaceID"`
RegistryName string `yaml:"registryName" json:"registryName"`
RecordName string `yaml:"recordName" json:"recordName"`
Timeout int `yaml:"timeout" json:"timeout"`
}
// DeDiRegistryClient encapsulates the logic for calling the DeDi registry endpoints.
type DeDiRegistryClient struct {
config *Config
client *retryablehttp.Client
}
// validate checks if the provided DeDi registry configuration is valid.
func validate(cfg *Config) error {
if cfg == nil {
return fmt.Errorf("DeDi registry config cannot be nil")
}
if cfg.BaseURL == "" {
return fmt.Errorf("baseURL cannot be empty")
}
if cfg.ApiKey == "" {
return fmt.Errorf("apiKey cannot be empty")
}
if cfg.NamespaceID == "" {
return fmt.Errorf("namespaceID cannot be empty")
}
if cfg.RegistryName == "" {
return fmt.Errorf("registryName cannot be empty")
}
if cfg.RecordName == "" {
return fmt.Errorf("recordName cannot be empty")
}
return nil
}
// New creates a new instance of DeDiRegistryClient.
func New(ctx context.Context, cfg *Config) (*DeDiRegistryClient, func() error, error) {
log.Debugf(ctx, "Initializing DeDi Registry client with config: %+v", cfg)
if err := validate(cfg); err != nil {
return nil, nil, err
}
retryClient := retryablehttp.NewClient()
// Configure timeout if provided
if cfg.Timeout > 0 {
retryClient.HTTPClient.Timeout = time.Duration(cfg.Timeout) * time.Second
}
client := &DeDiRegistryClient{
config: cfg,
client: retryClient,
}
// Cleanup function
closer := func() error {
log.Debugf(ctx, "Cleaning up DeDi Registry client resources")
if client.client != nil {
client.client.HTTPClient.CloseIdleConnections()
}
return nil
}
log.Infof(ctx, "DeDi Registry client connection established successfully")
return client, closer, nil
}
// Lookup implements RegistryLookup interface - calls the DeDi lookup endpoint and returns Subscription.
func (c *DeDiRegistryClient) Lookup(ctx context.Context, req *model.Subscription) ([]model.Subscription, error) {
lookupURL := fmt.Sprintf("%s/dedi/lookup/%s/%s/%s",
c.config.BaseURL, c.config.NamespaceID, c.config.RegistryName, c.config.RecordName)
httpReq, err := retryablehttp.NewRequest("GET", lookupURL, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
httpReq.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.config.ApiKey))
httpReq = httpReq.WithContext(ctx)
log.Debugf(ctx, "Making DeDi lookup request to: %s", lookupURL)
resp, err := c.client.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("failed to send DeDi lookup request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
log.Errorf(ctx, nil, "DeDi lookup request failed with status: %s, response: %s", resp.Status, string(body))
return nil, fmt.Errorf("DeDi lookup request failed with status: %s", resp.Status)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
// Parse response using local variables
var responseData map[string]interface{}
err = json.Unmarshal(body, &responseData)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
}
log.Debugf(ctx, "DeDi lookup request successful")
// Extract data using local variables
data, ok := responseData["data"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid response format: missing data field")
}
schema, ok := data["schema"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid response format: missing schema field")
}
// Extract values using type assertions with error checking
entityName, ok := schema["entity_name"].(string)
if !ok || entityName == "" {
return nil, fmt.Errorf("invalid or missing entity_name in response")
}
entityURL, ok := schema["entity_url"].(string)
if !ok || entityURL == "" {
return nil, fmt.Errorf("invalid or missing entity_url in response")
}
publicKey, ok := schema["publicKey"].(string)
if !ok || publicKey == "" {
return nil, fmt.Errorf("invalid or missing publicKey in response")
}
state, _ := data["state"].(string)
createdAt, _ := data["created_at"].(string)
updatedAt, _ := data["updated_at"].(string)
// Convert to Subscription format
subscription := model.Subscription{
Subscriber: model.Subscriber{
SubscriberID: entityName,
URL: entityURL,
Domain: req.Domain,
Type: req.Type,
},
SigningPublicKey: publicKey,
Status: state,
Created: parseTime(createdAt),
Updated: parseTime(updatedAt),
}
return []model.Subscription{subscription}, nil
}
// parseTime converts string timestamp to time.Time
func parseTime(timeStr string) time.Time {
if timeStr == "" {
return time.Time{}
}
parsedTime, err := time.Parse(time.RFC3339, timeStr)
if err != nil {
return time.Time{}
}
return parsedTime
}

View File

@@ -0,0 +1,361 @@
package dediregistry
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/beckn-one/beckn-onix/pkg/model"
)
func TestValidate(t *testing.T) {
tests := []struct {
name string
config *Config
wantErr bool
}{
{
name: "nil config",
config: nil,
wantErr: true,
},
{
name: "empty baseURL",
config: &Config{
BaseURL: "",
ApiKey: "test-key",
NamespaceID: "test-namespace",
RegistryName: "test-registry",
RecordName: "test-record",
},
wantErr: true,
},
{
name: "empty apiKey",
config: &Config{
BaseURL: "https://test.com",
ApiKey: "",
NamespaceID: "test-namespace",
RegistryName: "test-registry",
RecordName: "test-record",
},
wantErr: true,
},
{
name: "empty namespaceID",
config: &Config{
BaseURL: "https://test.com",
ApiKey: "test-key",
NamespaceID: "",
RegistryName: "test-registry",
RecordName: "test-record",
},
wantErr: true,
},
{
name: "empty registryName",
config: &Config{
BaseURL: "https://test.com",
ApiKey: "test-key",
NamespaceID: "test-namespace",
RegistryName: "",
RecordName: "test-record",
},
wantErr: true,
},
{
name: "empty recordName",
config: &Config{
BaseURL: "https://test.com",
ApiKey: "test-key",
NamespaceID: "test-namespace",
RegistryName: "test-registry",
RecordName: "",
},
wantErr: true,
},
{
name: "valid config",
config: &Config{
BaseURL: "https://test.com",
ApiKey: "test-key",
NamespaceID: "test-namespace",
RegistryName: "test-registry",
RecordName: "test-record",
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validate(tt.config)
if (err != nil) != tt.wantErr {
t.Errorf("validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestNew(t *testing.T) {
ctx := context.Background()
validConfig := &Config{
BaseURL: "https://test.com",
ApiKey: "test-key",
NamespaceID: "test-namespace",
RegistryName: "test-registry",
RecordName: "test-record",
Timeout: 30,
}
client, closer, err := New(ctx, validConfig)
if err != nil {
t.Errorf("New() error = %v", err)
return
}
if client == nil {
t.Error("New() returned nil client")
}
if closer == nil {
t.Error("New() returned nil closer")
}
// Test cleanup
if err := closer(); err != nil {
t.Errorf("closer() error = %v", err)
}
}
func TestLookup(t *testing.T) {
ctx := context.Background()
// Test successful lookup
t.Run("successful lookup", func(t *testing.T) {
// Mock server with successful response
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Verify request method and path
if r.Method != "GET" {
t.Errorf("Expected GET request, got %s", r.Method)
}
if r.URL.Path != "/dedi/lookup/test-namespace/test-registry/test-record" {
t.Errorf("Unexpected path: %s", r.URL.Path)
}
// Verify Authorization header
if auth := r.Header.Get("Authorization"); auth != "Bearer test-key" {
t.Errorf("Expected Bearer test-key, got %s", auth)
}
// Return mock response using map structure
response := map[string]interface{}{
"message": "success",
"data": map[string]interface{}{
"schema": map[string]interface{}{
"entity_name": "test.example.com",
"entity_url": "https://test.example.com",
"publicKey": "test-public-key",
},
"state": "active",
"created_at": "2023-01-01T00:00:00Z",
"updated_at": "2023-01-01T00:00:00Z",
},
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}))
defer server.Close()
config := &Config{
BaseURL: server.URL,
ApiKey: "test-key",
NamespaceID: "test-namespace",
RegistryName: "test-registry",
RecordName: "test-record",
Timeout: 30,
}
client, closer, err := New(ctx, config)
if err != nil {
t.Fatalf("New() error = %v", err)
}
defer closer()
req := &model.Subscription{}
results, err := client.Lookup(ctx, req)
if err != nil {
t.Errorf("Lookup() error = %v", err)
return
}
if len(results) != 1 {
t.Errorf("Expected 1 result, got %d", len(results))
return
}
subscription := results[0]
if subscription.Subscriber.SubscriberID != "test.example.com" {
t.Errorf("Expected subscriber_id test.example.com, got %s", subscription.Subscriber.SubscriberID)
}
if subscription.SigningPublicKey != "test-public-key" {
t.Errorf("Expected signing_public_key test-public-key, got %s", subscription.SigningPublicKey)
}
if subscription.Status != "active" {
t.Errorf("Expected status active, got %s", subscription.Status)
}
})
// Test HTTP error response
t.Run("http error response", func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("Record not found"))
}))
defer server.Close()
config := &Config{
BaseURL: server.URL,
ApiKey: "test-key",
NamespaceID: "test-namespace",
RegistryName: "test-registry",
RecordName: "test-record",
}
client, closer, err := New(ctx, config)
if err != nil {
t.Fatalf("New() error = %v", err)
}
defer closer()
req := &model.Subscription{}
_, err = client.Lookup(ctx, req)
if err == nil {
t.Error("Expected error for 404 response, got nil")
}
})
// Test missing required fields
t.Run("missing entity_name", func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
response := map[string]interface{}{
"data": map[string]interface{}{
"schema": map[string]interface{}{
"entity_url": "https://test.example.com",
"publicKey": "test-public-key",
},
},
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}))
defer server.Close()
config := &Config{
BaseURL: server.URL,
ApiKey: "test-key",
NamespaceID: "test-namespace",
RegistryName: "test-registry",
RecordName: "test-record",
}
client, closer, err := New(ctx, config)
if err != nil {
t.Fatalf("New() error = %v", err)
}
defer closer()
req := &model.Subscription{}
_, err = client.Lookup(ctx, req)
if err == nil {
t.Error("Expected error for missing entity_name, got nil")
}
})
// Test invalid JSON response
t.Run("invalid json response", func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte("invalid json"))
}))
defer server.Close()
config := &Config{
BaseURL: server.URL,
ApiKey: "test-key",
NamespaceID: "test-namespace",
RegistryName: "test-registry",
RecordName: "test-record",
}
client, closer, err := New(ctx, config)
if err != nil {
t.Fatalf("New() error = %v", err)
}
defer closer()
req := &model.Subscription{}
_, err = client.Lookup(ctx, req)
if err == nil {
t.Error("Expected error for invalid JSON, got nil")
}
})
// Test network error
t.Run("network error", func(t *testing.T) {
config := &Config{
BaseURL: "http://invalid-url-that-does-not-exist.local",
ApiKey: "test-key",
NamespaceID: "test-namespace",
RegistryName: "test-registry",
RecordName: "test-record",
Timeout: 1,
}
client, closer, err := New(ctx, config)
if err != nil {
t.Fatalf("New() error = %v", err)
}
defer closer()
req := &model.Subscription{}
_, err = client.Lookup(ctx, req)
if err == nil {
t.Error("Expected network error, got nil")
}
})
// Test missing data field
t.Run("missing data field", func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
response := map[string]interface{}{
"message": "success",
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}))
defer server.Close()
config := &Config{
BaseURL: server.URL,
ApiKey: "test-key",
NamespaceID: "test-namespace",
RegistryName: "test-registry",
RecordName: "test-record",
}
client, closer, err := New(ctx, config)
if err != nil {
t.Fatalf("New() error = %v", err)
}
defer closer()
req := &model.Subscription{}
_, err = client.Lookup(ctx, req)
if err == nil {
t.Error("Expected error for missing data field, got nil")
}
})
}

View File

@@ -382,6 +382,27 @@ func (m *Manager) Registry(ctx context.Context, cfg *Config) (definition.Registr
return registry, nil
}
// DeDiRegistry returns a RegistryLookup instance based on the provided configuration.
// It reuses the loaded provider.
func (m *Manager) DeDiRegistry(ctx context.Context, cfg *Config) (definition.RegistryLookup, error) {
rp, err := provider[definition.RegistryLookupProvider](m.plugins, cfg.ID)
if err != nil {
return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err)
}
registry, closer, err := rp.New(ctx, cfg.Config)
if err != nil {
return nil, err
}
if closer != nil {
m.closers = append(m.closers, func() {
if err := closer(); err != nil {
panic(err)
}
})
}
return registry, nil
}
// Validator implements handler.PluginManager.
func (m *Manager) Validator(ctx context.Context, cfg *Config) (definition.SchemaValidator, error) {
panic("unimplemented")