Merge pull request #438 from beckn/feature/core

added updated code for core wiring
This commit is contained in:
MohitKatare-protean
2025-04-01 20:25:06 +05:30
committed by GitHub
49 changed files with 2466 additions and 450 deletions

View File

@@ -25,11 +25,13 @@ type destination struct {
Config map[string]string `yaml:"config"`
}
// Destination types for logging output.
const (
Stdout destinationType = "stdout"
File destinationType = "file"
)
// Log levels define the severity of log messages.
const (
DebugLevel level = "debug"
InfoLevel level = "info"
@@ -48,6 +50,7 @@ var logLevels = map[level]zerolog.Level{
PanicLevel: zerolog.PanicLevel,
}
// Config represents the configuration for logging.
type Config struct {
Level level `yaml:"level"`
Destinations []destination `yaml:"destinations"`
@@ -60,6 +63,7 @@ var (
once sync.Once
)
// Logger instance and configuration.
var (
ErrInvalidLogLevel = errors.New("invalid log level")
ErrLogDestinationNil = errors.New("log Destinations cant be empty")
@@ -163,6 +167,8 @@ func getLogger(config Config) (zerolog.Logger, error) {
return newLogger, nil
}
// InitLogger initializes the logger with the given configuration.
// It ensures that the logger is initialized only once using sync.Once.
func InitLogger(c Config) error {
var initErr error
once.Do(func() {
@@ -175,60 +181,74 @@ func InitLogger(c Config) error {
return initErr
}
// Debug logs a debug-level message with the provided context.
func Debug(ctx context.Context, msg string) {
logEvent(ctx, zerolog.DebugLevel, msg, nil)
}
// Debugf logs a formatted debug-level message with the provided context.
func Debugf(ctx context.Context, format string, v ...any) {
msg := fmt.Sprintf(format, v...)
logEvent(ctx, zerolog.DebugLevel, msg, nil)
}
// Info logs an info-level message with the provided context.
func Info(ctx context.Context, msg string) {
logEvent(ctx, zerolog.InfoLevel, msg, nil)
}
// Infof logs a formatted info-level message with the provided context.
func Infof(ctx context.Context, format string, v ...any) {
msg := fmt.Sprintf(format, v...)
logEvent(ctx, zerolog.InfoLevel, msg, nil)
}
// Warn logs a warning-level message with the provided context.
func Warn(ctx context.Context, msg string) {
logEvent(ctx, zerolog.WarnLevel, msg, nil)
}
// Warnf logs a formatted warning-level message with the provided context.
func Warnf(ctx context.Context, format string, v ...any) {
msg := fmt.Sprintf(format, v...)
logEvent(ctx, zerolog.WarnLevel, msg, nil)
}
// Error logs an error-level message along with an error object.
func Error(ctx context.Context, err error, msg string) {
logEvent(ctx, zerolog.ErrorLevel, msg, err)
}
// Errorf logs a formatted error-level message along with an error object.
func Errorf(ctx context.Context, err error, format string, v ...any) {
msg := fmt.Sprintf(format, v...)
logEvent(ctx, zerolog.ErrorLevel, msg, err)
}
// Fatal logs a fatal-level message along with an error object and exits the application.
func Fatal(ctx context.Context, err error, msg string) {
logEvent(ctx, zerolog.FatalLevel, msg, err)
}
// Fatalf logs a formatted fatal-level message along with an error object and exits the application.
func Fatalf(ctx context.Context, err error, format string, v ...any) {
msg := fmt.Sprintf(format, v...)
logEvent(ctx, zerolog.FatalLevel, msg, err)
}
// Panic logs a panic-level message along with an error object and panics.
func Panic(ctx context.Context, err error, msg string) {
logEvent(ctx, zerolog.PanicLevel, msg, err)
}
// Panicf logs a formatted panic-level message along with an error object and panics.
func Panicf(ctx context.Context, err error, format string, v ...any) {
msg := fmt.Sprintf(format, v...)
logEvent(ctx, zerolog.PanicLevel, msg, err)
}
// logEvent logs an event at the specified log level with an optional error message.
// It adds contextual information before logging the message.
func logEvent(ctx context.Context, level zerolog.Level, msg string, err error) {
event := logger.WithLevel(level)
@@ -239,6 +259,7 @@ func logEvent(ctx context.Context, level zerolog.Level, msg string, err error) {
event.Msg(msg)
}
// Request logs details of an incoming HTTP request, including method, URL, body, and remote address.
func Request(ctx context.Context, r *http.Request, body []byte) {
event := logger.Info()
addCtx(ctx, event)
@@ -249,6 +270,7 @@ func Request(ctx context.Context, r *http.Request, body []byte) {
Msg("HTTP Request")
}
// addCtx adds context values to the log event based on configured context keys.
func addCtx(ctx context.Context, event *zerolog.Event) {
for _, key := range cfg.ContextKeys {
val, ok := ctx.Value(key).(string)
@@ -260,6 +282,7 @@ func addCtx(ctx context.Context, event *zerolog.Event) {
}
}
// Response logs details of an outgoing HTTP response, including method, URL, status code, and response time.
func Response(ctx context.Context, r *http.Request, statusCode int, responseTime time.Duration) {
event := logger.Info()
addCtx(ctx, event)

View File

@@ -8,12 +8,15 @@ import (
"time"
)
// Subscriber represents a unique operational configuration of a trusted platform on a network.
type Subscriber struct {
SubscriberID string `json:"subscriber_id"`
URL string `json:"url" format:"uri"`
Type string `json:"type" enum:"BAP,BPP,BG"`
Domain string `json:"domain"`
}
// Subscription represents subscription details of a network participant.
type Subscription struct {
Subscriber `json:",inline"`
KeyID string `json:"key_id" format:"uuid"`
@@ -27,6 +30,7 @@ type Subscription struct {
Nonce string
}
// Authorization-related constants for headers.
const (
AuthHeaderSubscriber string = "Authorization"
AuthHeaderGateway string = "X-Gateway-Authorization"
@@ -36,14 +40,20 @@ const (
type contextKey string
// MsgIDKey is the context key used to store and retrieve the message ID in a request context.
const MsgIDKey = contextKey("message_id")
// Role defines the type of participant in the network.
type Role string
const (
RoleBAP Role = "bap"
RoleBPP Role = "bpp"
RoleGateway Role = "gateway"
// RoleBAP represents a Buyer App Participant (BAP) in the network.
RoleBAP Role = "bap"
// RoleBPP represents a Buyer Platform Participant (BPP) in the network.
RoleBPP Role = "bpp"
// RoleGateway represents a Gateway that facilitates communication in the network.
RoleGateway Role = "gateway"
// RoleRegistery represents the Registry that maintains network participant details.
RoleRegistery Role = "registery"
)
@@ -54,6 +64,7 @@ var validRoles = map[Role]bool{
RoleRegistery: true,
}
// UnmarshalYAML implements custom YAML unmarshalling for Role to ensure only valid values are accepted.
func (r *Role) UnmarshalYAML(unmarshal func(interface{}) error) error {
var roleName string
if err := unmarshal(&roleName); err != nil {
@@ -68,12 +79,23 @@ func (r *Role) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil
}
// Route represents a network route for message processing.
type Route struct {
Type string
URL *url.URL
Publisher string
TargetType string // "url" or "publisher"
PublisherID string // For message queues
URL *url.URL // For API calls
}
// Keyset represents a collection of cryptographic keys used for signing and encryption.
type Keyset struct {
UniqueKeyID string // UniqueKeyID is the identifier for the key pair.
SigningPrivate string // SigningPrivate is the private key used for signing operations.
SigningPublic string // SigningPublic is the public key corresponding to the signing private key.
EncrPrivate string // EncrPrivate is the private key used for encryption operations.
EncrPublic string // EncrPublic is the public key corresponding to the encryption private key.
}
// StepContext holds context information for a request processing step.
type StepContext struct {
context.Context
Request *http.Request
@@ -84,24 +106,36 @@ type StepContext struct {
RespHeader http.Header
}
// WithContext updates the existing StepContext with a new context.
func (ctx *StepContext) WithContext(newCtx context.Context) {
ctx.Context = newCtx
}
// Status represents the acknowledgment status in a response.
type Status string
const (
StatusACK Status = "ACK"
// StatusACK indicates a successful acknowledgment.
StatusACK Status = "ACK"
// StatusNACK indicates a negative acknowledgment or failure.
StatusNACK Status = "NACK"
)
// Ack represents an acknowledgment response.
type Ack struct {
// Status holds the acknowledgment status (ACK/NACK).
Status Status `json:"status"`
}
// Message represents the structure of a response message.
type Message struct {
Ack Ack `json:"ack"`
// Ack contains the acknowledgment status.
Ack Ack `json:"ack"`
// Error holds error details, if any, in the response.
Error *Error `json:"error,omitempty"`
}
// Response represents the main response structure.
type Response struct {
Message Message `json:"message"`
}

21
pkg/plugin/config.go Normal file
View File

@@ -0,0 +1,21 @@
package plugin
type PublisherCfg struct {
ID string `yaml:"id"`
Config map[string]string `yaml:"config"`
}
type ValidatorCfg struct {
ID string `yaml:"id"`
Config map[string]string `yaml:"config"`
}
type Config struct {
ID string `yaml:"id"`
Config map[string]string `yaml:"config"`
}
type ManagerConfig struct {
Root string `yaml:"root"`
RemoteRoot string `yaml:"remoteRoot"`
}

View File

@@ -0,0 +1,27 @@
package definition
import (
"context"
"time"
)
// Cache defines the general cache interface for caching plugins.
type Cache interface {
// Get retrieves a value from the cache based on the given key.
Get(ctx context.Context, key string) (string, error)
// Set stores a value in the cache with the given key and TTL (time-to-live) in seconds.
Set(ctx context.Context, key, value string, ttl time.Duration) error
// Delete removes a value from the cache based on the given key.
Delete(ctx context.Context, key string) error
// Clear removes all values from the cache.
Clear(ctx context.Context) error
}
// CacheProvider interface defines the contract for managing cache instances.
type CacheProvider interface {
// New initializes a new cache instance with the given configuration.
New(ctx context.Context, config map[string]string) (Cache, func() error, error)
}

View File

@@ -0,0 +1,23 @@
package definition
import (
"context"
"github.com/beckn/beckn-onix/pkg/model"
)
// KeyManager defines the interface for key management operations/methods.
type KeyManager interface {
GenerateKeyPairs() (*model.Keyset, error)
StorePrivateKeys(ctx context.Context, keyID string, keys *model.Keyset) error
SigningPrivateKey(ctx context.Context, keyID string) (string, string, error)
EncrPrivateKey(ctx context.Context, keyID string) (string, string, error)
SigningPublicKey(ctx context.Context, subscriberID, uniqueKeyID string) (string, error)
EncrPublicKey(ctx context.Context, subscriberID, uniqueKeyID string) (string, error)
DeletePrivateKeys(ctx context.Context, keyID string) error
}
// KeyManagerProvider initializes a new signer instance.
type KeyManagerProvider interface {
New(context.Context, Cache, RegistryLookup, map[string]string) (KeyManager, func() error, error)
}

View File

@@ -0,0 +1,10 @@
package definition
import (
"context"
"net/http"
)
type MiddlewareProvider interface {
New(ctx context.Context, cfg map[string]string) (func(http.Handler) http.Handler, error)
}

View File

@@ -5,12 +5,10 @@ import "context"
// Publisher defines the general publisher interface for messaging plugins.
type Publisher interface {
// Publish sends a message (as a byte slice) using the underlying messaging system.
Publish(ctx context.Context, msg []byte) error
Close() error // Important for releasing resources.
Publish(context.Context, string, []byte) error
}
type PublisherProvider interface {
// New initializes a new publisher instance with the given configuration.
New(ctx context.Context, config map[string]string) (Publisher, error)
New(ctx context.Context, config map[string]string) (Publisher, func(), error)
}

View File

@@ -0,0 +1,11 @@
package definition
import (
"context"
"github.com/beckn/beckn-onix/pkg/model"
)
type RegistryLookup interface {
Lookup(ctx context.Context, req *model.Subscription) ([]model.Subscription, error)
}

View File

@@ -3,14 +3,9 @@ package definition
import (
"context"
"net/url"
)
// Route defines the structure for the Route returned.
type Route struct {
TargetType string // "url" or "msgq" or "bap" or "bpp"
PublisherID string // For message queues
URL *url.URL // For API calls
}
"github.com/beckn/beckn-onix/pkg/model"
)
// RouterProvider initializes the a new Router instance with the given config.
type RouterProvider interface {
@@ -20,5 +15,5 @@ type RouterProvider interface {
// Router defines the interface for routing requests.
type Router interface {
// Route determines the routing destination based on the request context.
Route(ctx context.Context, url *url.URL, body []byte) (*Route, error)
Route(ctx context.Context, url *url.URL, body []byte) (*model.Route, error)
}

View File

@@ -1,22 +0,0 @@
package definition
import "context"
// Verifier defines the method for verifying signatures.
type Verifier interface {
// Verify checks the validity of the signature for the given body.
Verify(ctx context.Context, body []byte, header []byte, publicKeyBase64 string) (bool, error)
Close() error // Close for releasing resources
}
// VerifierProvider initializes a new Verifier instance with the given config.
type VerifierProvider interface {
// New creates a new Verifier instance based on the provided config.
New(ctx context.Context, config map[string]string) (Verifier, func() error, error)
}
// PublicKeyManager is the interface for key management plugin.
type PublicKeyManager interface {
// PublicKey retrieves the public key for the given subscriberID and keyID.
PublicKey(ctx context.Context, subscriberID string, keyID string) (string, error)
}

View File

@@ -8,7 +8,6 @@ type Signer interface {
// The signature is created with the given timestamps: createdAt (signature creation time)
// and expiresAt (signature expiration time).
Sign(ctx context.Context, body []byte, privateKeyBase64 string, createdAt, expiresAt int64) (string, error)
Close() error // Close for releasing resources
}
// SignerProvider initializes a new signer instance with the given config.
@@ -16,9 +15,3 @@ type SignerProvider interface {
// New creates a new signer instance based on the provided config.
New(ctx context.Context, config map[string]string) (Signer, func() error, error)
}
// PrivateKeyManager is the interface for key management plugin.
type PrivateKeyManager interface {
// PrivateKey retrieves the private key for the given subscriberID and keyID.
PrivateKey(ctx context.Context, subscriberID string, keyID string) (string, error)
}

View File

@@ -0,0 +1,15 @@
package definition
import "context"
// SignValidator defines the method for verifying signatures.
type SignValidator interface {
// Validate checks the validity of the signature for the given body.
Validate(ctx context.Context, body []byte, header string, publicKeyBase64 string) error
}
// SignValidatorProvider initializes a new Verifier instance with the given config.
type SignValidatorProvider interface {
// New creates a new Verifier instance based on the provided config.
New(ctx context.Context, config map[string]string) (SignValidator, func() error, error)
}

View File

@@ -0,0 +1,15 @@
package definition
import (
"context"
"github.com/beckn/beckn-onix/pkg/model"
)
type Step interface {
Run(ctx *model.StepContext) error
}
type StepProvider interface {
New(context.Context, map[string]string) (Step, func(), error)
}

View File

@@ -7,13 +7,13 @@ import (
decrypter "github.com/beckn/beckn-onix/pkg/plugin/implementation/decrypter"
)
// DecrypterProvider implements the definition.DecrypterProvider interface.
type DecrypterProvider struct{}
// decrypterProvider implements the definition.decrypterProvider interface.
type decrypterProvider struct{}
// New creates a new Decrypter instance using the provided configuration.
func (dp DecrypterProvider) New(ctx context.Context, config map[string]string) (definition.Decrypter, func() error, error) {
func (dp decrypterProvider) New(ctx context.Context, config map[string]string) (definition.Decrypter, func() error, error) {
return decrypter.New(ctx)
}
// Provider is the exported symbol that the plugin manager will look for.
var Provider definition.DecrypterProvider = DecrypterProvider{}
var Provider = decrypterProvider{}

View File

@@ -25,7 +25,7 @@ func TestDecrypterProviderSuccess(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
provider := DecrypterProvider{}
provider := decrypterProvider{}
decrypter, cleanup, err := provider.New(tt.ctx, tt.config)
// Check error.

View File

@@ -7,12 +7,12 @@ import (
"github.com/beckn/beckn-onix/pkg/plugin/implementation/encrypter"
)
// EncrypterProvider implements the definition.EncrypterProvider interface.
type EncrypterProvider struct{}
// encrypterProvider implements the definition.encrypterProvider interface.
type encrypterProvider struct{}
func (ep EncrypterProvider) New(ctx context.Context, config map[string]string) (definition.Encrypter, func() error, error) {
func (ep encrypterProvider) New(ctx context.Context, config map[string]string) (definition.Encrypter, func() error, error) {
return encrypter.New(ctx)
}
// Provider is the exported symbol that the plugin manager will look for.
var Provider definition.EncrypterProvider = EncrypterProvider{}
var Provider = encrypterProvider{}

View File

@@ -28,7 +28,7 @@ func TestEncrypterProviderSuccess(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create provider and encrypter.
provider := EncrypterProvider{}
provider := encrypterProvider{}
encrypter, cleanup, err := provider.New(tt.ctx, tt.config)
if err != nil {
t.Fatalf("EncrypterProvider.New() error = %v", err)

View File

@@ -5,17 +5,20 @@ import (
"net/http"
"strings"
requestpreprocessor "github.com/beckn/beckn-onix/pkg/plugin/implementation/requestPreProcessor"
"github.com/beckn/beckn-onix/pkg/plugin/implementation/reqpreprocessor"
)
type provider struct{}
func (p provider) New(ctx context.Context, c map[string]string) (func(http.Handler) http.Handler, error) {
config := &requestpreprocessor.Config{}
if contextKeysStr, ok := c["ContextKeys"]; ok {
config := &reqpreprocessor.Config{}
if contextKeysStr, ok := c["contextKeys"]; ok {
config.ContextKeys = strings.Split(contextKeysStr, ",")
}
return requestpreprocessor.NewUUIDSetter(config)
if role, ok := c["role"]; ok {
config.Role = role
}
return reqpreprocessor.NewPreProcessor(config)
}
var Provider = provider{}

View File

@@ -34,7 +34,7 @@ func TestProviderNew(t *testing.T) {
{
name: "With Check Keys",
config: map[string]string{
"ContextKeys": "message_id,transaction_id",
"contextKeys": "message_id,transaction_id",
},
expectedError: false,
expectedStatus: http.StatusOK,

View File

@@ -1,4 +1,4 @@
package requestpreprocessor
package reqpreprocessor
import (
"bytes"
@@ -9,24 +9,26 @@ import (
"io"
"net/http"
"github.com/beckn/beckn-onix/pkg/log"
"github.com/google/uuid"
)
// Config holds the configuration settings for the application.
type Config struct {
ContextKeys []string
Role string
ContextKeys []string // ContextKeys is a list of context keys used for request processing.
Role string // Role specifies the role of the entity (e.g., subscriber, gateway).
}
type becknRequest struct {
Context map[string]any `json:"context"`
}
type contextKeyType string
const contextKey = "context"
const subscriberIDKey contextKeyType = "subscriber_id"
const subscriberIDKey = "subscriber_id"
func NewUUIDSetter(cfg *Config) (func(http.Handler) http.Handler, error) {
// NewPreProcessor creates a middleware that processes incoming HTTP requests by extracting
// and modifying the request context based on the provided configuration.
func NewPreProcessor(cfg *Config) (func(http.Handler) http.Handler, error) {
if err := validateConfig(cfg); err != nil {
return nil, err
}
@@ -34,6 +36,7 @@ func NewUUIDSetter(cfg *Config) (func(http.Handler) http.Handler, error) {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
var req becknRequest
ctx := r.Context()
if err := json.Unmarshal(body, &req); err != nil {
http.Error(w, "Failed to decode request body", http.StatusBadRequest)
return
@@ -49,11 +52,15 @@ func NewUUIDSetter(cfg *Config) (func(http.Handler) http.Handler, error) {
case "bpp":
subID = req.Context["bpp_id"]
}
ctx := context.WithValue(r.Context(), subscriberIDKey, subID)
if subID != nil {
log.Debugf(ctx, "adding subscriberId to request:%s, %v", subscriberIDKey, subID)
// TODO: Add a ContextKey type in model and use it here instead of raw context key.
ctx = context.WithValue(ctx, subscriberIDKey, subID)
}
for _, key := range cfg.ContextKeys {
value := uuid.NewString()
updatedValue := update(req.Context, key, value)
ctx = context.WithValue(ctx, contextKeyType(key), updatedValue)
ctx = context.WithValue(ctx, key, updatedValue)
}
reqData := map[string]any{"context": req.Context}
updatedBody, _ := json.Marshal(reqData)

View File

@@ -1,4 +1,4 @@
package requestpreprocessor
package reqpreprocessor
import (
"bytes"
@@ -52,7 +52,7 @@ func TestNewUUIDSetterSuccessCases(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
middleware, err := NewUUIDSetter(tt.config)
middleware, err := NewPreProcessor(tt.config)
if err != nil {
t.Fatalf("Unexpected error while creating middleware: %v", err)
}
@@ -148,7 +148,7 @@ func TestNewUUIDSetterErrorCases(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
middleware, err := NewUUIDSetter(tt.config)
middleware, err := NewPreProcessor(tt.config)
if tt.config == nil {
if err == nil {
t.Error("Expected an error for nil config, but got none")

View File

@@ -4,8 +4,8 @@ import (
"context"
"errors"
definition "github.com/beckn/beckn-onix/pkg/plugin/definition"
router "github.com/beckn/beckn-onix/pkg/plugin/implementation/router"
"github.com/beckn/beckn-onix/pkg/plugin/definition"
"github.com/beckn/beckn-onix/pkg/plugin/implementation/router"
)
// RouterProvider provides instances of Router.

View File

@@ -9,7 +9,7 @@ import (
"path"
"strings"
definition "github.com/beckn/beckn-onix/pkg/plugin/definition"
"github.com/beckn/beckn-onix/pkg/model"
"gopkg.in/yaml.v3"
)
@@ -26,7 +26,7 @@ type routingConfig struct {
// Router implements Router interface.
type Router struct {
rules map[string]map[string]map[string]*definition.Route // domain -> version -> endpoint -> route
rules map[string]map[string]map[string]*model.Route // domain -> version -> endpoint -> route
}
// RoutingRule represents a single routing rule.
@@ -61,7 +61,7 @@ func New(ctx context.Context, config *Config) (*Router, func() error, error) {
return nil, nil, fmt.Errorf("config cannot be nil")
}
router := &Router{
rules: make(map[string]map[string]map[string]*definition.Route),
rules: make(map[string]map[string]map[string]*model.Route),
}
// Load rules at bootup
@@ -71,30 +71,6 @@ func New(ctx context.Context, config *Config) (*Router, func() error, error) {
return router, nil, nil
}
// parseTargetURL parses a URL string into a url.URL object with strict validation
func parseTargetURL(urlStr string) (*url.URL, error) {
if urlStr == "" {
return nil, nil
}
parsed, err := url.Parse(urlStr)
if err != nil {
return nil, fmt.Errorf("invalid URL '%s': %w", urlStr, err)
}
// Enforce scheme requirement
if parsed.Scheme == "" {
return nil, fmt.Errorf("URL '%s' must include a scheme (http/https)", urlStr)
}
// Optionally validate scheme is http or https
if parsed.Scheme != "https" {
return nil, fmt.Errorf("URL '%s' must use https scheme", urlStr)
}
return parsed, nil
}
// LoadRules reads and parses routing rules from the YAML configuration file.
func (r *Router) loadRules(configPath string) error {
if configPath == "" {
@@ -117,41 +93,41 @@ func (r *Router) loadRules(configPath string) error {
for _, rule := range config.RoutingRules {
// Initialize domain map if not exists
if _, ok := r.rules[rule.Domain]; !ok {
r.rules[rule.Domain] = make(map[string]map[string]*definition.Route)
r.rules[rule.Domain] = make(map[string]map[string]*model.Route)
}
// Initialize version map if not exists
if _, ok := r.rules[rule.Domain][rule.Version]; !ok {
r.rules[rule.Domain][rule.Version] = make(map[string]*definition.Route)
r.rules[rule.Domain][rule.Version] = make(map[string]*model.Route)
}
// Add all endpoints for this rule
for _, endpoint := range rule.Endpoints {
var route *definition.Route
var route *model.Route
switch rule.TargetType {
case targetTypePublisher:
route = &definition.Route{
route = &model.Route{
TargetType: rule.TargetType,
PublisherID: rule.Target.PublisherID,
}
case targetTypeURL:
parsedURL, err := parseTargetURL(rule.Target.URL)
parsedURL, err := url.Parse(rule.Target.URL)
if err != nil {
return fmt.Errorf("invalid URL in rule: %w", err)
}
route = &definition.Route{
route = &model.Route{
TargetType: rule.TargetType,
URL: parsedURL,
}
case targetTypeBPP, targetTypeBAP:
var parsedURL *url.URL
if rule.Target.URL != "" {
parsedURL, err = parseTargetURL(rule.Target.URL)
parsedURL, err = url.Parse(rule.Target.URL)
if err != nil {
return fmt.Errorf("invalid URL in rule: %w", err)
}
}
route = &definition.Route{
route = &model.Route{
TargetType: rule.TargetType,
URL: parsedURL,
}
@@ -177,7 +153,7 @@ func validateRules(rules []routingRule) error {
if rule.Target.URL == "" {
return fmt.Errorf("invalid rule: url is required for targetType 'url'")
}
if _, err := parseTargetURL(rule.Target.URL); err != nil {
if _, err := url.Parse(rule.Target.URL); err != nil {
return fmt.Errorf("invalid URL - %s: %w", rule.Target.URL, err)
}
case targetTypePublisher:
@@ -186,7 +162,7 @@ func validateRules(rules []routingRule) error {
}
case targetTypeBPP, targetTypeBAP:
if rule.Target.URL != "" {
if _, err := parseTargetURL(rule.Target.URL); err != nil {
if _, err := url.Parse(rule.Target.URL); err != nil {
return fmt.Errorf("invalid URL - %s defined in routing config for target type %s: %w", rule.Target.URL, rule.TargetType, err)
}
}
@@ -199,8 +175,7 @@ func validateRules(rules []routingRule) error {
}
// Route determines the routing destination based on the request context.
func (r *Router) Route(ctx context.Context, url *url.URL, body []byte) (*definition.Route, error) {
func (r *Router) Route(ctx context.Context, url *url.URL, body []byte) (*model.Route, error) {
// Parse the body to extract domain and version
var requestBody struct {
Context struct {
@@ -244,32 +219,32 @@ func (r *Router) Route(ctx context.Context, url *url.URL, body []byte) (*definit
}
// handleProtocolMapping handles both BPP and BAP routing with proper URL construction
func handleProtocolMapping(route *definition.Route, requestURI, endpoint string) (*definition.Route, error) {
uri := strings.TrimSpace(requestURI)
var targetURL *url.URL
if len(uri) != 0 {
parsedURL, err := parseTargetURL(uri)
if err != nil {
return nil, fmt.Errorf("invalid %s URI - %s in request body for %s: %w", strings.ToUpper(route.TargetType), uri, endpoint, err)
}
targetURL = parsedURL
}
// If no request URI, fall back to configured URL with endpoint appended
if targetURL == nil {
func handleProtocolMapping(route *model.Route, npURI, endpoint string) (*model.Route, error) {
target := strings.TrimSpace(npURI)
if len(target) == 0 {
if route.URL == nil {
return nil, fmt.Errorf("could not determine destination for endpoint '%s': neither request contained a %s URI nor was a default URL configured in routing rules", endpoint, strings.ToUpper(route.TargetType))
}
targetURL = &url.URL{
Scheme: route.URL.Scheme,
Host: route.URL.Host,
Path: path.Join(route.URL.Path, endpoint),
}
return &model.Route{
TargetType: targetTypeURL,
URL: &url.URL{
Scheme: route.URL.Scheme,
Host: route.URL.Host,
Path: path.Join(route.URL.Path, endpoint),
},
}, nil
}
targetURL, err := url.Parse(target)
if err != nil {
return nil, fmt.Errorf("invalid %s URI - %s in request body for %s: %w", strings.ToUpper(route.TargetType), target, endpoint, err)
}
return &definition.Route{
return &model.Route{
TargetType: targetTypeURL,
URL: targetURL,
URL: &url.URL{
Scheme: targetURL.Scheme,
Host: targetURL.Host,
Path: path.Join(targetURL.Path, endpoint),
},
}, nil
}

View File

@@ -4,8 +4,8 @@ import (
"context"
"errors"
definition "github.com/beckn/beckn-onix/pkg/plugin/definition"
schemaValidator "github.com/beckn/beckn-onix/pkg/plugin/implementation/schemaValidator"
"github.com/beckn/beckn-onix/pkg/plugin/definition"
"github.com/beckn/beckn-onix/pkg/plugin/implementation/schemavalidator"
)
// schemaValidatorProvider provides instances of schemaValidator.
@@ -24,10 +24,10 @@ func (vp schemaValidatorProvider) New(ctx context.Context, config map[string]str
}
// Create a new schemaValidator instance with the provided configuration
return schemaValidator.New(ctx, &schemaValidator.Config{
return schemavalidator.New(ctx, &schemavalidator.Config{
SchemaDir: schemaDir,
})
}
// Provider is the exported symbol that the plugin manager will look for.
var Provider definition.SchemaValidatorProvider = schemaValidatorProvider{}
var Provider = schemaValidatorProvider{}

View File

@@ -1,4 +1,4 @@
package schemaValidator
package schemavalidator
import (
"context"
@@ -10,7 +10,7 @@ import (
"path/filepath"
"strings"
response "github.com/beckn/beckn-onix/pkg/response"
"github.com/beckn/beckn-onix/pkg/model"
"github.com/santhosh-tekuri/jsonschema/v6"
)
@@ -23,8 +23,8 @@ type payload struct {
} `json:"context"`
}
// SchemaValidator implements the Validator interface.
type SchemaValidator struct {
// schemaValidator implements the Validator interface.
type schemaValidator struct {
config *Config
schemaCache map[string]*jsonschema.Schema
}
@@ -35,12 +35,12 @@ type Config struct {
}
// New creates a new ValidatorProvider instance.
func New(ctx context.Context, config *Config) (*SchemaValidator, func() error, error) {
func New(ctx context.Context, config *Config) (*schemaValidator, func() error, error) {
// Check if config is nil
if config == nil {
return nil, nil, fmt.Errorf("config cannot be nil")
}
v := &SchemaValidator{
v := &schemaValidator{
config: config,
schemaCache: make(map[string]*jsonschema.Schema),
}
@@ -53,7 +53,7 @@ func New(ctx context.Context, config *Config) (*SchemaValidator, func() error, e
}
// Validate validates the given data against the schema.
func (v *SchemaValidator) Validate(ctx context.Context, url *url.URL, data []byte) error {
func (v *schemaValidator) Validate(ctx context.Context, url *url.URL, data []byte) error {
var payloadData payload
err := json.Unmarshal(data, &payloadData)
if err != nil {
@@ -61,14 +61,14 @@ func (v *SchemaValidator) Validate(ctx context.Context, url *url.URL, data []byt
}
// Extract domain, version, and endpoint from the payload and uri.
cxt_domain := payloadData.Context.Domain
cxtDomain := payloadData.Context.Domain
version := payloadData.Context.Version
version = fmt.Sprintf("v%s", version)
endpoint := path.Base(url.String())
// ToDo Add debug log here
fmt.Println("Handling request for endpoint:", endpoint)
domain := strings.ToLower(cxt_domain)
domain := strings.ToLower(cxtDomain)
domain = strings.ReplaceAll(domain, ":", "_")
// Construct the schema file name.
@@ -89,20 +89,20 @@ func (v *SchemaValidator) Validate(ctx context.Context, url *url.URL, data []byt
// Handle schema validation errors
if validationErr, ok := err.(*jsonschema.ValidationError); ok {
// Convert validation errors into an array of SchemaValError
var schemaErrors []response.Error
var schemaErrors []model.Error
for _, cause := range validationErr.Causes {
// Extract the path and message from the validation error
path := strings.Join(cause.InstanceLocation, ".") // JSON path to the invalid field
message := cause.Error() // Validation error message
// Append the error to the schemaErrors array
schemaErrors = append(schemaErrors, response.Error{
schemaErrors = append(schemaErrors, model.Error{
Paths: path,
Message: message,
})
}
// Return the array of schema validation errors
return &response.SchemaValidationErr{Errors: schemaErrors}
return &model.SchemaValidationErr{Errors: schemaErrors}
}
// Return a generic error for non-validation errors
return fmt.Errorf("validation failed: %v", err)
@@ -117,7 +117,7 @@ type ValidatorProvider struct{}
// Initialise initialises the validator provider by compiling all the JSON schema files
// from the specified directory and storing them in a cache indexed by their schema filenames.
func (v *SchemaValidator) initialise() error {
func (v *schemaValidator) initialise() error {
schemaDir := v.config.SchemaDir
// Check if the directory exists and is accessible.
info, err := os.Stat(schemaDir)

View File

@@ -1,4 +1,4 @@
package schemaValidator
package schemavalidator
import (
"context"
@@ -272,7 +272,7 @@ func TestValidator_Initialise(t *testing.T) {
}
config := &Config{SchemaDir: schemaDir}
v := &SchemaValidator{
v := &schemaValidator{
config: config,
schemaCache: make(map[string]*jsonschema.Schema),
}

View File

@@ -1,25 +0,0 @@
package main
import (
"context"
"errors"
"github.com/beckn/beckn-onix/pkg/plugin/definition"
verifier "github.com/beckn/beckn-onix/pkg/plugin/implementation/signVerifier"
)
// VerifierProvider provides instances of Verifier.
type VerifierProvider struct{}
// New initializes a new Verifier instance.
func (vp VerifierProvider) New(ctx context.Context, config map[string]string) (definition.Verifier, func() error, error) {
if ctx == nil {
return nil, nil, errors.New("context cannot be nil")
}
return verifier.New(ctx, &verifier.Config{})
}
// Provider is the exported symbol that the plugin manager will look for.
var Provider definition.VerifierProvider = VerifierProvider{}

View File

@@ -21,4 +21,4 @@ func (p SignerProvider) New(ctx context.Context, config map[string]string) (defi
}
// Provider is the exported symbol that the plugin manager will look for.
var Provider definition.SignerProvider = SignerProvider{}
var Provider = SignerProvider{}

View File

@@ -23,7 +23,7 @@ type Signer struct {
func New(ctx context.Context, config *Config) (*Signer, func() error, error) {
s := &Signer{config: config}
return s, s.Close, nil
return s, nil, nil
}
// hash generates a signing string using BLAKE-512 hashing.
@@ -71,8 +71,3 @@ func (s *Signer) Sign(ctx context.Context, body []byte, privateKeyBase64 string,
return base64.StdEncoding.EncodeToString(signature), nil
}
// Close releases resources (mock implementation returning nil).
func (s *Signer) Close() error {
return nil
}

View File

@@ -0,0 +1,24 @@
package main
import (
"context"
"errors"
"github.com/beckn/beckn-onix/pkg/plugin/definition"
"github.com/beckn/beckn-onix/pkg/plugin/implementation/signvalidator"
)
// provider provides instances of Verifier.
type provider struct{}
// New initializes a new Verifier instance.
func (vp provider) New(ctx context.Context, config map[string]string) (definition.SignValidator, func() error, error) {
if ctx == nil {
return nil, nil, errors.New("context cannot be nil")
}
return signvalidator.New(ctx, &signvalidator.Config{})
}
// Provider is the exported symbol that the plugin manager will look for.
var Provider = provider{}

View File

@@ -7,7 +7,7 @@ import (
// TestVerifierProviderSuccess tests successful creation of a verifier.
func TestVerifierProviderSuccess(t *testing.T) {
provider := VerifierProvider{}
provider := provider{}
tests := []struct {
name string
@@ -52,7 +52,7 @@ func TestVerifierProviderSuccess(t *testing.T) {
// TestVerifierProviderFailure tests cases where verifier creation should fail.
func TestVerifierProviderFailure(t *testing.T) {
provider := VerifierProvider{}
provider := provider{}
tests := []struct {
name string

View File

@@ -1,4 +1,4 @@
package verifier
package signvalidator
import (
"context"
@@ -16,36 +16,36 @@ import (
type Config struct {
}
// Verifier implements the Verifier interface.
type Verifier struct {
// validator implements the validator interface.
type validator struct {
config *Config
}
// New creates a new Verifier instance.
func New(ctx context.Context, config *Config) (*Verifier, func() error, error) {
v := &Verifier{config: config}
func New(ctx context.Context, config *Config) (*validator, func() error, error) {
v := &validator{config: config}
return v, v.Close, nil
return v, nil, nil
}
// Verify checks the signature for the given payload and public key.
func (v *Verifier) Verify(ctx context.Context, body []byte, header []byte, publicKeyBase64 string) (bool, error) {
createdTimestamp, expiredTimestamp, signature, err := parseAuthHeader(string(header))
func (v *validator) Validate(ctx context.Context, body []byte, header string, publicKeyBase64 string) error {
createdTimestamp, expiredTimestamp, signature, err := parseAuthHeader(header)
if err != nil {
// TODO: Return appropriate error code when Error Code Handling Module is ready
return false, fmt.Errorf("error parsing header: %w", err)
return fmt.Errorf("error parsing header: %w", err)
}
signatureBytes, err := base64.StdEncoding.DecodeString(signature)
if err != nil {
// TODO: Return appropriate error code when Error Code Handling Module is ready
return false, fmt.Errorf("error decoding signature: %w", err)
return fmt.Errorf("error decoding signature: %w", err)
}
currentTime := time.Now().Unix()
if createdTimestamp > currentTime || currentTime > expiredTimestamp {
// TODO: Return appropriate error code when Error Code Handling Module is ready
return false, fmt.Errorf("signature is expired or not yet valid")
return fmt.Errorf("signature is expired or not yet valid")
}
createdTime := time.Unix(createdTimestamp, 0)
@@ -56,15 +56,15 @@ func (v *Verifier) Verify(ctx context.Context, body []byte, header []byte, publi
decodedPublicKey, err := base64.StdEncoding.DecodeString(publicKeyBase64)
if err != nil {
// TODO: Return appropriate error code when Error Code Handling Module is ready
return false, fmt.Errorf("error decoding public key: %w", err)
return fmt.Errorf("error decoding public key: %w", err)
}
if !ed25519.Verify(ed25519.PublicKey(decodedPublicKey), []byte(signingString), signatureBytes) {
// TODO: Return appropriate error code when Error Code Handling Module is ready
return false, fmt.Errorf("signature verification failed")
return fmt.Errorf("signature verification failed")
}
return true, nil
return nil
}
// parseAuthHeader extracts signature values from the Authorization header.
@@ -113,8 +113,3 @@ func hash(payload []byte, createdTimestamp, expiredTimestamp int64) string {
return fmt.Sprintf("(created): %d\n(expires): %d\ndigest: BLAKE-512=%s", createdTimestamp, expiredTimestamp, digestB64)
}
// Close releases resources (mock implementation returning nil).
func (v *Verifier) Close() error {
return nil
}

View File

@@ -1,4 +1,4 @@
package verifier
package signvalidator
import (
"context"
@@ -52,14 +52,11 @@ func TestVerifySuccess(t *testing.T) {
`", signature="` + signature + `"`
verifier, close, _ := New(context.Background(), &Config{})
valid, err := verifier.Verify(context.Background(), tt.body, []byte(header), publicKeyBase64)
err := verifier.Validate(context.Background(), tt.body, header, publicKeyBase64)
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)
}
if !valid {
t.Fatal("Expected signature verification to succeed")
}
if close != nil {
if err := close(); err != nil {
t.Fatalf("Test %q failed: cleanup function returned an error: %v", tt.name, err)
@@ -135,14 +132,11 @@ func TestVerifyFailure(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
verifier, close, _ := New(context.Background(), &Config{})
valid, err := verifier.Verify(context.Background(), tt.body, []byte(tt.header), tt.pubKey)
err := verifier.Validate(context.Background(), tt.body, tt.header, tt.pubKey)
if err == nil {
t.Fatal("Expected an error but got none")
}
if valid {
t.Fatal("Expected verification to fail")
}
if close != nil {
if err := close(); err != nil {
t.Fatalf("Test %q failed: cleanup function returned an error: %v", tt.name, err)

View File

@@ -1,171 +1,381 @@
package plugin
import (
"archive/zip"
"context"
"fmt"
"io"
"io/fs"
"net/http"
"os"
"path/filepath"
"plugin"
"strings"
"time"
"github.com/beckn/beckn-onix/pkg/log"
"github.com/beckn/beckn-onix/pkg/plugin/definition"
)
// Config represents the plugin manager configuration.
type Config struct {
Root string `yaml:"root"`
Signer PluginConfig `yaml:"signer"`
Verifier PluginConfig `yaml:"verifier"`
Decrypter PluginConfig `yaml:"decrypter"`
Encrypter PluginConfig `yaml:"encrypter"`
Publisher PluginConfig `yaml:"publisher"`
}
// TODO: Add unit tests for the plugin manager functions to ensure proper functionality and error handling.
// PluginConfig represents configuration details for a plugin.
type PluginConfig struct {
ID string `yaml:"id"`
Config map[string]string `yaml:"config"`
}
// Manager handles dynamic plugin loading and management.
// Manager is responsible for managing dynamically loaded plugins.
type Manager struct {
sp definition.SignerProvider
vp definition.VerifierProvider
dp definition.DecrypterProvider
ep definition.EncrypterProvider
pb definition.PublisherProvider
cfg *Config
plugins map[string]*plugin.Plugin // plugins holds the dynamically loaded plugins.
closers []func() // closers contains functions to release resources when the manager is closed.
}
// NewManager initializes a new Manager with the given configuration file.
func NewManager(ctx context.Context, cfg *Config) (*Manager, error) {
if cfg == nil {
return nil, fmt.Errorf("configuration cannot be nil")
}
// Load signer plugin.
sp, err := provider[definition.SignerProvider](cfg.Root, cfg.Signer.ID)
if err != nil {
return nil, fmt.Errorf("failed to load signer plugin: %w", err)
}
// Load publisher plugin.
pb, err := provider[definition.PublisherProvider](cfg.Root, cfg.Publisher.ID)
if err != nil {
return nil, fmt.Errorf("failed to load publisher plugin: %w", err)
}
// Load verifier plugin.
vp, err := provider[definition.VerifierProvider](cfg.Root, cfg.Verifier.ID)
if err != nil {
return nil, fmt.Errorf("failed to load Verifier plugin: %w", err)
}
// Load decrypter plugin.
dp, err := provider[definition.DecrypterProvider](cfg.Root, cfg.Decrypter.ID)
if err != nil {
return nil, fmt.Errorf("failed to load Decrypter plugin: %w", err)
}
// Load encryption plugin.
ep, err := provider[definition.EncrypterProvider](cfg.Root, cfg.Encrypter.ID)
if err != nil {
return nil, fmt.Errorf("failed to load encryption plugin: %w", err)
}
return &Manager{sp: sp, vp: vp, pb: pb, ep: ep, dp: dp, cfg: cfg}, nil
func validateMgrCfg(cfg *ManagerConfig) error {
return nil
}
// provider loads a plugin dynamically and retrieves its provider instance.
func provider[T any](root, id string) (T, error) {
// NewManager initializes a new Manager instance by loading plugins from the specified configuration.
func NewManager(ctx context.Context, cfg *ManagerConfig) (*Manager, func(), error) {
if err := validateMgrCfg(cfg); err != nil {
return nil, nil, fmt.Errorf("Invalid config: %w", err)
}
log.Debugf(ctx, "RemoteRoot : %s", cfg.RemoteRoot)
if len(cfg.RemoteRoot) != 0 {
log.Debugf(ctx, "Unzipping files from : %s to : %s", cfg.RemoteRoot, cfg.Root)
if err := unzip(cfg.RemoteRoot, cfg.Root); err != nil {
return nil, nil, err
}
}
plugins, err := plugins(ctx, cfg)
if err != nil {
return nil, nil, err
}
closers := []func(){}
return &Manager{plugins: plugins, closers: closers}, func() {
for _, closer := range closers {
closer()
}
}, nil
}
func plugins(ctx context.Context, cfg *ManagerConfig) (map[string]*plugin.Plugin, error) {
plugins := make(map[string]*plugin.Plugin)
err := filepath.WalkDir(cfg.Root, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
return nil // Skip directories
}
if strings.HasSuffix(d.Name(), ".so") {
id := strings.TrimSuffix(d.Name(), ".so") // Extract plugin ID
p, elapsed, err := loadPlugin(ctx, path, id)
if err != nil {
return err
}
plugins[id] = p
log.Debugf(ctx, "Loaded plugin: %s in %s", id, elapsed)
}
return nil
})
if err != nil {
return nil, err
}
return plugins, nil
}
// loadPlugin attempts to load a plugin from the given path and logs the execution time.
func loadPlugin(ctx context.Context, path, id string) (*plugin.Plugin, time.Duration, error) {
log.Debugf(ctx, "Loading plugin: %s", id)
start := time.Now()
p, err := plugin.Open(path)
if err != nil {
return nil, 0, fmt.Errorf("failed to open plugin %s: %w", id, err)
}
elapsed := time.Since(start)
return p, elapsed, nil
}
func provider[T any](plugins map[string]*plugin.Plugin, id string) (T, error) {
var zero T
if len(strings.TrimSpace(id)) == 0 {
return zero, nil
pgn, ok := plugins[id]
if !ok {
return zero, fmt.Errorf("plugin %s not found", id)
}
p, err := plugin.Open(pluginPath(root, id))
provider, err := pgn.Lookup("Provider")
if err != nil {
return zero, fmt.Errorf("failed to open plugin %s: %w", id, err)
return zero, fmt.Errorf("failed to lookup Provider for %s: %w", id, err)
}
log.Debugf(context.Background(), "Provider type: %T\n", provider)
symbol, err := p.Lookup("Provider")
if err != nil {
return zero, fmt.Errorf("failed to find Provider symbol in plugin %s: %w", id, err)
}
prov, ok := symbol.(*T)
pp, ok := provider.(T)
if !ok {
return zero, fmt.Errorf("failed to cast Provider for %s", id)
}
return *prov, nil
log.Debugf(context.Background(), "Casting successful for: %s", provider)
return pp, nil
}
// pluginPath constructs the path to the plugin shared object file.
func pluginPath(root, id string) string {
return filepath.Join(root, id+".so")
}
// Signer retrieves the signing plugin instance.
func (m *Manager) Signer(ctx context.Context) (definition.Signer, func() error, error) {
if m.sp == nil {
return nil, nil, fmt.Errorf("signing plugin provider not loaded")
}
signer, close, err := m.sp.New(ctx, m.cfg.Signer.Config)
// Publisher returns a Publisher instance based on the provided configuration.
// It reuses the loaded provider and registers a cleanup function.
func (m *Manager) Publisher(ctx context.Context, cfg *Config) (definition.Publisher, error) {
pp, err := provider[definition.PublisherProvider](m.plugins, cfg.ID)
if err != nil {
return nil, nil, fmt.Errorf("failed to initialize signer: %w", err)
return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err)
}
return signer, close, nil
}
// Verifier retrieves the verification plugin instance.
func (m *Manager) Verifier(ctx context.Context) (definition.Verifier, func() error, error) {
if m.vp == nil {
return nil, nil, fmt.Errorf("Verifier plugin provider not loaded")
}
Verifier, close, err := m.vp.New(ctx, m.cfg.Verifier.Config)
p, closer, err := pp.New(ctx, cfg.Config)
if err != nil {
return nil, nil, fmt.Errorf("failed to initialize Verifier: %w", err)
return nil, err
}
return Verifier, close, nil
m.addCloser(closer)
return p, nil
}
// Decrypter retrieves the decryption plugin instance.
func (m *Manager) Decrypter(ctx context.Context) (definition.Decrypter, func() error, error) {
if m.dp == nil {
return nil, nil, fmt.Errorf("decrypter plugin provider not loaded")
// addCloser appends a cleanup function to the Manager's closers list.
func (m *Manager) addCloser(closer func()) {
if closer != nil {
m.closers = append(m.closers, closer)
}
}
decrypter, close, err := m.dp.New(ctx, m.cfg.Decrypter.Config)
// SchemaValidator returns a SchemaValidator instance based on the provided configuration.
// It registers a cleanup function for resource management.
func (m *Manager) SchemaValidator(ctx context.Context, cfg *Config) (definition.SchemaValidator, error) {
vp, err := provider[definition.SchemaValidatorProvider](m.plugins, cfg.ID)
if err != nil {
return nil, nil, fmt.Errorf("failed to initialize Decrypter: %w", err)
return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err)
}
return decrypter, close, nil
}
// Encrypter retrieves the encryption plugin instance.
func (m *Manager) Encrypter(ctx context.Context) (definition.Encrypter, func() error, error) {
if m.ep == nil {
return nil, nil, fmt.Errorf("encryption plugin provider not loaded")
}
encrypter, close, err := m.ep.New(ctx, m.cfg.Encrypter.Config)
v, closer, err := vp.New(ctx, cfg.Config)
if err != nil {
return nil, nil, fmt.Errorf("failed to initialize encrypter: %w", err)
return nil, err
}
return encrypter, close, nil
if closer != nil {
m.addCloser(func() {
if err := closer(); err != nil {
panic(err)
}
})
}
return v, nil
}
// Publisher retrieves the publisher plugin instance.
func (m *Manager) Publisher(ctx context.Context) (definition.Publisher, error) {
if m.pb == nil {
return nil, fmt.Errorf("publisher plugin provider not loaded")
}
publisher, err := m.pb.New(ctx, m.cfg.Publisher.Config)
// Router returns a Router instance based on the provided configuration.
// It registers a cleanup function for resource management.
func (m *Manager) Router(ctx context.Context, cfg *Config) (definition.Router, error) {
rp, err := provider[definition.RouterProvider](m.plugins, cfg.ID)
if err != nil {
return nil, fmt.Errorf("failed to initialize publisher: %w", err)
return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err)
}
return publisher, nil
router, closer, err := rp.New(ctx, cfg.Config)
if err != nil {
return nil, err
}
if closer != nil {
m.addCloser(func() {
if err := closer(); err != nil {
panic(err)
}
})
}
return router, nil
}
// Middleware returns an HTTP middleware function based on the provided configuration.
func (m *Manager) Middleware(ctx context.Context, cfg *Config) (func(http.Handler) http.Handler, error) {
mwp, err := provider[definition.MiddlewareProvider](m.plugins, cfg.ID)
if err != nil {
return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err)
}
return mwp.New(ctx, cfg.Config)
}
// Step returns a Step instance based on the provided configuration.
func (m *Manager) Step(ctx context.Context, cfg *Config) (definition.Step, error) {
sp, err := provider[definition.StepProvider](m.plugins, cfg.ID)
if err != nil {
return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err)
}
step, closer, error := sp.New(ctx, cfg.Config)
if closer != nil {
m.closers = append(m.closers, closer)
}
return step, error
}
// Cache returns a Cache instance based on the provided configuration.
// It registers a cleanup function for resource management.
func (m *Manager) Cache(ctx context.Context, cfg *Config) (definition.Cache, error) {
cp, err := provider[definition.CacheProvider](m.plugins, cfg.ID)
if err != nil {
return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err)
}
c, close, err := cp.New(ctx, cfg.Config)
if err != nil {
return nil, err
}
m.addCloser(func() {
if err := close(); err != nil {
panic(err)
}
})
return c, nil
}
// Signer returns a Signer instance based on the provided configuration.
// It registers a cleanup function for resource management.
func (m *Manager) Signer(ctx context.Context, cfg *Config) (definition.Signer, error) {
sp, err := provider[definition.SignerProvider](m.plugins, cfg.ID)
if err != nil {
return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err)
}
s, closer, err := sp.New(ctx, cfg.Config)
if err != nil {
return nil, err
}
if closer != nil {
m.addCloser(func() {
if err := closer(); err != nil {
panic(err)
}
})
}
return s, nil
}
// Encryptor returns an Encrypter instance based on the provided configuration.
// It registers a cleanup function for resource management.
func (m *Manager) Encryptor(ctx context.Context, cfg *Config) (definition.Encrypter, error) {
ep, err := provider[definition.EncrypterProvider](m.plugins, cfg.ID)
if err != nil {
return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err)
}
encrypter, closer, err := ep.New(ctx, cfg.Config)
if err != nil {
return nil, err
}
if closer != nil {
m.addCloser(func() {
if err := closer(); err != nil {
panic(err)
}
})
}
return encrypter, nil
}
// Decryptor returns a Decrypter instance based on the provided configuration.
// It registers a cleanup function for resource management.
func (m *Manager) Decryptor(ctx context.Context, cfg *Config) (definition.Decrypter, error) {
dp, err := provider[definition.DecrypterProvider](m.plugins, cfg.ID)
if err != nil {
return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err)
}
decrypter, closer, err := dp.New(ctx, cfg.Config)
if err != nil {
return nil, err
}
if closer != nil {
m.addCloser(func() {
if err := closer(); err != nil {
panic(err)
}
})
}
return decrypter, nil
}
// SignValidator returns a SignValidator instance based on the provided configuration.
// It registers a cleanup function for resource management.
func (m *Manager) SignValidator(ctx context.Context, cfg *Config) (definition.SignValidator, error) {
svp, err := provider[definition.SignValidatorProvider](m.plugins, cfg.ID)
if err != nil {
return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err)
}
v, closer, err := svp.New(ctx, cfg.Config)
if err != nil {
return nil, err
}
if closer != nil {
m.addCloser(func() {
if err := closer(); err != nil {
panic(err)
}
})
}
return v, nil
}
// KeyManager returns a KeyManager instance based on the provided configuration.
// It reuses the loaded provider.
func (m *Manager) KeyManager(ctx context.Context, cache definition.Cache, rClient definition.RegistryLookup, cfg *Config) (definition.KeyManager, error) {
kmp, err := provider[definition.KeyManagerProvider](m.plugins, cfg.ID)
if err != nil {
return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err)
}
km, close, err := kmp.New(ctx, cache, rClient, cfg.Config)
if err != nil {
return nil, err
}
m.addCloser(func() {
if err := close(); err != nil {
panic(err)
}
})
return km, nil
}
// Validator implements handler.PluginManager.
func (m *Manager) Validator(ctx context.Context, cfg *Config) (definition.SchemaValidator, error) {
panic("unimplemented")
}
// Unzip extracts a ZIP file to the specified destination
func unzip(src, dest string) error {
r, err := zip.OpenReader(src)
if err != nil {
return err
}
defer r.Close()
// Ensure the destination directory exists
if err := os.MkdirAll(dest, 0755); err != nil {
return err
}
for _, f := range r.File {
fpath := filepath.Join(dest, f.Name)
// Ensure directory exists
log.Debugf(context.Background(), "Pain : fpath: %s,filepath.Dir(fpath): %s", fpath, filepath.Dir(fpath))
if err := os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
return err
}
// Open the file inside the zip
srcFile, err := f.Open()
if err != nil {
return err
}
defer srcFile.Close()
// Create the destination file
dstFile, err := os.Create(fpath)
if err != nil {
return err
}
defer dstFile.Close()
// Copy file contents
if _, err := io.Copy(dstFile, srcFile); err != nil {
return err
}
}
return nil
}

View File

@@ -7,40 +7,10 @@ import (
"fmt"
"net/http"
"strings"
"github.com/beckn/beckn-onix/pkg/model"
)
type Error struct {
Code string `json:"code,omitempty"`
Message string `json:"message,omitempty"`
Paths string `json:"paths,omitempty"`
}
// SchemaValidationErr represents a collection of schema validation failures.
type SchemaValidationErr struct {
Errors []Error
}
// Error implements the error interface for SchemaValidationErr.
func (e *SchemaValidationErr) Error() string {
var errorMessages []string
for _, err := range e.Errors {
errorMessages = append(errorMessages, fmt.Sprintf("%s: %s", err.Paths, err.Message))
}
return strings.Join(errorMessages, "; ")
}
type Message struct {
Ack struct {
Status string `json:"status,omitempty"`
} `json:"ack,omitempty"`
Error *Error `json:"error,omitempty"`
}
// SendAck sends an acknowledgment response (ACK) to the client.
func SendAck(w http.ResponseWriter) {
resp := &model.Response{
Message: model.Message{
@@ -61,7 +31,8 @@ func SendAck(w http.ResponseWriter) {
}
}
func nack(w http.ResponseWriter, err *model.Error, status int, ctx context.Context) {
// nack sends a negative acknowledgment (NACK) response with an error message.
func nack(ctx context.Context, w http.ResponseWriter, err *model.Error, status int) {
resp := &model.Response{
Message: model.Message{
Ack: model.Ack{
@@ -82,6 +53,7 @@ func nack(w http.ResponseWriter, err *model.Error, status int, ctx context.Conte
}
}
// internalServerError generates an internal server error response.
func internalServerError(ctx context.Context) *model.Error {
return &model.Error{
Code: http.StatusText(http.StatusInternalServerError),
@@ -89,6 +61,7 @@ func internalServerError(ctx context.Context) *model.Error {
}
}
// SendNack processes different types of errors and sends an appropriate NACK response.
func SendNack(ctx context.Context, w http.ResponseWriter, err error) {
var schemaErr *model.SchemaValidationErr
var signErr *model.SignValidationErr
@@ -97,19 +70,19 @@ func SendNack(ctx context.Context, w http.ResponseWriter, err error) {
switch {
case errors.As(err, &schemaErr):
nack(w, schemaErr.BecknError(), http.StatusBadRequest, ctx)
nack(ctx, w, schemaErr.BecknError(), http.StatusBadRequest)
return
case errors.As(err, &signErr):
nack(w, signErr.BecknError(), http.StatusUnauthorized, ctx)
nack(ctx, w, signErr.BecknError(), http.StatusUnauthorized)
return
case errors.As(err, &badReqErr):
nack(w, badReqErr.BecknError(), http.StatusBadRequest, ctx)
nack(ctx, w, badReqErr.BecknError(), http.StatusBadRequest)
return
case errors.As(err, &notFoundErr):
nack(w, notFoundErr.BecknError(), http.StatusNotFound, ctx)
nack(ctx, w, notFoundErr.BecknError(), http.StatusNotFound)
return
default:
nack(w, internalServerError(ctx), http.StatusInternalServerError, ctx)
nack(ctx, w, internalServerError(ctx), http.StatusInternalServerError)
return
}
}

View File

@@ -126,21 +126,6 @@ func TestSendNack(t *testing.T) {
}
}
func TestSchemaValidationErr_Error(t *testing.T) {
// Create sample validation errors
validationErrors := []Error{
{Paths: "name", Message: "Name is required"},
{Paths: "email", Message: "Invalid email format"},
}
err := SchemaValidationErr{Errors: validationErrors}
expected := "name: Name is required; email: Invalid email format"
if err.Error() != expected {
t.Errorf("err.Error() = %s, want %s",
err.Error(), expected)
}
}
func compareJSON(expected, actual map[string]interface{}) bool {
expectedBytes, _ := json.Marshal(expected)
actualBytes, _ := json.Marshal(actual)
@@ -234,7 +219,7 @@ func TestNack_1(t *testing.T) {
return
}
nack(w, tt.err, tt.status, ctx)
nack(ctx, w, tt.err, tt.status)
if !tt.useBadWrite {
recorder, ok := w.(*httptest.ResponseRecorder)
if !ok {