From 4e4e43892d24ce64a6eef286eb2863e69a7685dd Mon Sep 17 00:00:00 2001 From: Mayuresh A Nirhali Date: Fri, 28 Nov 2025 18:13:54 +0530 Subject: [PATCH 01/10] Update SETUP.md - to add public cloud provider links in setup guide Fix for issue -https://github.com/Beckn-One/beckn-onix/issues/571 --- SETUP.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/SETUP.md b/SETUP.md index 5d1bda8..98e06e0 100644 --- a/SETUP.md +++ b/SETUP.md @@ -536,10 +536,19 @@ curl http://localhost:8081/bap/receiver/ ## Production Setup +### Integrating with Public cloud providers + +Public cloud providers have built integration plugins and other tools to deploy Beckn ONIX easily. They provide specialized plugins that integrate with managed services on those platforms. Network participants can choose to consume them. + +1. [Google Cloud Platform](https://github.com/GoogleCloudPlatform/dpi-accelerator-beckn-onix) +2. [Amazon Web Services](https://github.com/Beckn-One/beckn-onix/tree/main-pre-plugins/aws-cdk) + +The rest of the document focuses on how to deploy ONIX in Production with cloud agnostic plugins and tools. + ### Additional Requirements for Production 1. **HashiCorp Vault** for key management -2. **RabbitMQ** for message queuing +2. **RabbitMQ** for message queuing (needed for async flows) 3. **TLS certificates** for secure communication 4. **Load balancer** for high availability From 1227a6b6be45a030d7bd7458bae11527368d9dc9 Mon Sep 17 00:00:00 2001 From: Manendra Pal Singh Date: Tue, 2 Dec 2025 23:11:51 +0530 Subject: [PATCH 02/10] fix the test case in main_test.go and module_test.go --- cmd/adapter/main_test.go | 5 +++++ core/module/module_test.go | 7 ++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/cmd/adapter/main_test.go b/cmd/adapter/main_test.go index 1c6e25d..dff0447 100644 --- a/cmd/adapter/main_test.go +++ b/cmd/adapter/main_test.go @@ -63,6 +63,11 @@ func (m *MockPluginManager) Cache(ctx context.Context, cfg *plugin.Config) (defi return nil, nil } +// Registry returns a mock implementation of the RegistryLookup interface. +func (m *MockPluginManager) Registry(ctx context.Context, cfg *plugin.Config) (definition.RegistryLookup, error) { + return nil, nil +} + // KeyManager returns a mock implementation of the KeyManager interface. func (m *MockPluginManager) KeyManager(ctx context.Context, cache definition.Cache, rLookup definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error) { return nil, nil diff --git a/core/module/module_test.go b/core/module/module_test.go index b62f57f..4921fd0 100644 --- a/core/module/module_test.go +++ b/core/module/module_test.go @@ -59,6 +59,11 @@ func (m *mockPluginManager) Cache(ctx context.Context, cfg *plugin.Config) (defi return nil, nil } +// Registry returns a mock registry lookup implementation. +func (m *mockPluginManager) Registry(ctx context.Context, cfg *plugin.Config) (definition.RegistryLookup, error) { + return nil, nil +} + // KeyManager returns a mock key manager implementation. func (m *mockPluginManager) KeyManager(ctx context.Context, cache definition.Cache, rLookup definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error) { return nil, nil @@ -180,4 +185,4 @@ func TestRegisterFailure(t *testing.T) { } }) } -} \ No newline at end of file +} From 96fac33be1e3db43c2d74e2741c1726a09aaf387 Mon Sep 17 00:00:00 2001 From: Mayuresh Nirhali Date: Fri, 5 Dec 2025 16:09:02 +0530 Subject: [PATCH 03/10] 573 - New plugin - schema mapper --- CONFIG.md | 105 ++++++- go.mod | 5 +- go.sum | 8 +- install/build-plugins.sh | 1 + .../implementation/reqmapper/cmd/plugin.go | 23 ++ .../reqmapper/cmd/plugin_test.go | 84 +++++ .../implementation/reqmapper/reqmapper.go | 287 ++++++++++++++++++ .../reqmapper/reqmapper_test.go | 240 +++++++++++++++ .../reqmapper/testdata/mappings.yaml | 17 ++ 9 files changed, 765 insertions(+), 5 deletions(-) create mode 100644 pkg/plugin/implementation/reqmapper/cmd/plugin.go create mode 100644 pkg/plugin/implementation/reqmapper/cmd/plugin_test.go create mode 100644 pkg/plugin/implementation/reqmapper/reqmapper.go create mode 100644 pkg/plugin/implementation/reqmapper/reqmapper_test.go create mode 100644 pkg/plugin/implementation/reqmapper/testdata/mappings.yaml diff --git a/CONFIG.md b/CONFIG.md index d543176..737bc6e 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -624,6 +624,109 @@ middleware: --- +#### 11. Reqmapper Plugin + +**Purpose**: Transform Beckn payloads between protocol versions or shapes using JSONata before the request continues through the handler. Mount it inside the `middleware` list wherever translation is required. + +```yaml +middleware: + - id: reqmapper + config: + role: bap # Use `bpp` when running inside a BPP handler + mappingsFile: ./config/mappings.yaml +``` + +**Parameters**: +- `role`: Required. Determines which JSONata expression is evaluated (`bapMappings` or `bppMappings`) for the current action. +- `mappingsFile`: Required. Absolute or relative path to a YAML file that contains the JSONata expressions for every action. + +**Mapping file structure**: +```yaml +mappings: + : + bapMappings: | + # JSONata expression applied when `role: bap` + bppMappings: | + # JSONata expression applied when `role: bpp` +``` +Each action entry is optional—if no mapping exists for the current action, the original request body is passed through unchanged. JSONata expressions receive the entire Beckn request as input (`$`) and must return the full payload that should replace it. + +**Sample mapping file**: +```yaml +mappings: + search: + bapMappings: | + { + "context": { + "action": "discover", + "version": "2.0.0", + "domain": "beckn.one:retail", + "bap_id": $.context.bap_id, + "bap_uri": $.context.bap_uri, + "transaction_id": $.context.transaction_id, + "message_id": $.context.message_id, + "timestamp": $.context.timestamp + }, + "message": { + "filters": $.message.intent.category ? { + "type": "jsonpath", + "expression": "$[?(@.category.code == '" & $.message.intent.category.descriptor.code & "')]" + } : null + } + } + bppMappings: | + { + "context": { + "action": "search", + "version": "1.1.0", + "domain": "retail", + "bap_id": $.context.bap_id, + "bap_uri": $.context.bap_uri, + "transaction_id": $.context.transaction_id, + "message_id": $.context.message_id, + "timestamp": $.context.timestamp + }, + "message": { + "intent": { + "category": $.message.filters ? { + "descriptor": { + "code": $substringAfter($substringBefore($.message.filters.expression, "'"), "== '") + } + } : null + } + } + } + on_search: + bapMappings: | + { + "context": $.context, + "message": { + "catalog": { + "descriptor": $.message.catalogs[0]."beckn:descriptor" ? { + "name": $.message.catalogs[0]."beckn:descriptor"."schema:name" + } : null + } + } + } + bppMappings: | + { + "context": $.context, + "message": { + "catalogs": [{ + "@type": "beckn:Catalog", + "beckn:items": $.message.catalog.providers[].items[]. + { + "@type": "beckn:Item", + "beckn:id": id + } + }] + } + } +``` +The sample illustrates how a single mapping file can convert `search` requests and `on_search` responses between Beckn 1.1.0 (BAP) and Beckn 2.0.0 (BPP) payload shapes. You can define as many action entries as needed, and the plugin will compile and cache the JSONata expressions on startup. + +--- + ## Routing Configuration ### Routing Rules File Structure @@ -1066,4 +1169,4 @@ modules: httpClientConfig: maxIdleConns: 1000 maxIdleConnsPerHost: 200 - idleConnTimeout: 300 \ No newline at end of file + idleConnTimeout: 300 diff --git a/go.mod b/go.mod index 5de656d..d85ebc5 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( require github.com/zenazn/pkcs7pad v0.0.0-20170308005700-253a5b1f0e03 -require golang.org/x/text v0.23.0 // indirect +require golang.org/x/text v0.26.0 // indirect require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect @@ -50,7 +50,7 @@ require ( github.com/ryanuber/go-glob v1.0.0 // indirect github.com/woodsbury/decimal128 v1.3.0 // indirect golang.org/x/net v0.38.0 // indirect - golang.org/x/sys v0.31.0 // indirect + golang.org/x/sys v0.38.0 // indirect golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect ) @@ -59,6 +59,7 @@ require ( github.com/google/uuid v1.6.0 github.com/hashicorp/go-retryablehttp v0.7.7 github.com/hashicorp/vault/api v1.16.0 + github.com/jsonata-go/jsonata v0.0.0-20250709164031-599f35f32e5f github.com/rabbitmq/amqp091-go v1.10.0 github.com/redis/go-redis/v9 v9.8.0 github.com/rs/zerolog v1.34.0 diff --git a/go.sum b/go.sum index 0250688..4bc3411 100644 --- a/go.sum +++ b/go.sum @@ -62,6 +62,8 @@ github.com/hashicorp/vault/api v1.16.0 h1:nbEYGJiAPGzT9U4oWgaaB0g+Rj8E59QuHKyA5L github.com/hashicorp/vault/api v1.16.0/go.mod h1:KhuUhzOD8lDSk29AtzNjgAu2kxRA9jL9NAbkFlqvkBA= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jsonata-go/jsonata v0.0.0-20250709164031-599f35f32e5f h1:JnGon8QHtmjFPq0NcSu8OTEnQDDEgFME7gtj/NkjCUo= +github.com/jsonata-go/jsonata v0.0.0-20250709164031-599f35f32e5f/go.mod h1:rYUEOEiieWXHNCE/eDXV/o5s7jZ2VyUzQKbqVns9pik= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -134,8 +136,10 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= -golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= +golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= +golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/install/build-plugins.sh b/install/build-plugins.sh index 1799f00..c1ed911 100755 --- a/install/build-plugins.sh +++ b/install/build-plugins.sh @@ -16,6 +16,7 @@ plugins=( "registry" "dediregistry" "reqpreprocessor" + "reqmapper" "router" "schemavalidator" "schemav2validator" diff --git a/pkg/plugin/implementation/reqmapper/cmd/plugin.go b/pkg/plugin/implementation/reqmapper/cmd/plugin.go new file mode 100644 index 0000000..26e98e5 --- /dev/null +++ b/pkg/plugin/implementation/reqmapper/cmd/plugin.go @@ -0,0 +1,23 @@ +package main + +import ( + "context" + "net/http" + + "github.com/beckn-one/beckn-onix/pkg/plugin/implementation/reqmapper" +) + +type provider struct{} + +func (p provider) New(ctx context.Context, c map[string]string) (func(http.Handler) http.Handler, error) { + config := &reqmapper.Config{} + if role, ok := c["role"]; ok { + config.Role = role + } + if mappingsFile, ok := c["mappingsFile"]; ok { + config.MappingsFile = mappingsFile + } + return reqmapper.NewReqMapper(config) +} + +var Provider = provider{} diff --git a/pkg/plugin/implementation/reqmapper/cmd/plugin_test.go b/pkg/plugin/implementation/reqmapper/cmd/plugin_test.go new file mode 100644 index 0000000..d6d1bba --- /dev/null +++ b/pkg/plugin/implementation/reqmapper/cmd/plugin_test.go @@ -0,0 +1,84 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" +) + +func TestProviderNewSuccess(t *testing.T) { + p := provider{} + middleware, err := p.New(context.Background(), map[string]string{"role": "bap"}) + if err != nil { + t.Fatalf("provider.New returned unexpected error: %v", err) + } + if middleware == nil { + t.Fatalf("provider.New returned nil middleware") + } + + payload := map[string]interface{}{ + "context": map[string]interface{}{ + "action": "search", + "domain": "retail", + "version": "1.1.0", + "bap_id": "bap.example", + "bap_uri": "https://bap.example/api", + "transaction_id": "txn-1", + "message_id": "msg-1", + "timestamp": "2023-01-01T10:00:00Z", + }, + "message": map[string]interface{}{ + "intent": map[string]interface{}{ + "fulfillment": map[string]interface{}{ + "start": map[string]interface{}{ + "location": map[string]interface{}{"gps": "0,0"}, + }, + "end": map[string]interface{}{ + "location": map[string]interface{}{"gps": "1,1"}, + }, + }, + }, + }, + } + + body, err := json.Marshal(payload) + if err != nil { + t.Fatalf("failed to marshal payload: %v", err) + } + + called := false + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + w.WriteHeader(http.StatusNoContent) + }) + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + middleware(next).ServeHTTP(rec, req) + + if !called { + t.Fatalf("expected downstream handler to be invoked") + } + if rec.Code != http.StatusNoContent { + t.Fatalf("unexpected response code: got %d want %d", rec.Code, http.StatusNoContent) + } +} + +func TestProviderNewMissingRole(t *testing.T) { + p := provider{} + if _, err := p.New(context.Background(), map[string]string{}); err == nil { + t.Fatalf("expected error when role is missing") + } +} + +func TestProviderNewInvalidRole(t *testing.T) { + p := provider{} + _, err := p.New(context.Background(), map[string]string{"role": "invalid"}) + if err == nil { + t.Fatalf("expected error for invalid role") + } +} diff --git a/pkg/plugin/implementation/reqmapper/reqmapper.go b/pkg/plugin/implementation/reqmapper/reqmapper.go new file mode 100644 index 0000000..fe38c9f --- /dev/null +++ b/pkg/plugin/implementation/reqmapper/reqmapper.go @@ -0,0 +1,287 @@ +package reqmapper + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "os" + "sync" + + "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/jsonata-go/jsonata" + "gopkg.in/yaml.v3" +) + +// Config represents the configuration for the request mapper middleware. +type Config struct { + Role string `yaml:"role"` // "bap" or "bpp" + MappingsFile string `yaml:"mappingsFile"` // required path to mappings YAML +} + +// MappingEngine handles JSONata-based transformations +type MappingEngine struct { + config *Config + jsonataInstance jsonata.JSONataInstance + bapMaps map[string]jsonata.Expression + bppMaps map[string]jsonata.Expression + mappings map[string]builtinMapping + mappingSource string + mutex sync.RWMutex + initialized bool +} + +type builtinMapping struct { + BAP string `yaml:"bapMappings"` + BPP string `yaml:"bppMappings"` +} + +type mappingFile struct { + Mappings map[string]builtinMapping `yaml:"mappings"` +} + +var ( + engineInstance *MappingEngine + engineOnce sync.Once +) + +// NewReqMapper returns a middleware that maps requests using JSONata expressions +func NewReqMapper(cfg *Config) (func(http.Handler) http.Handler, error) { + if err := validateConfig(cfg); err != nil { + return nil, err + } + + // Initialize the mapping engine + engine, err := initMappingEngine(cfg) + if err != nil { + return nil, fmt.Errorf("failed to initialize mapping engine: %w", err) + } + + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Failed to read request body", http.StatusBadRequest) + return + } + + var req map[string]interface{} + ctx := r.Context() + + if err := json.Unmarshal(body, &req); err != nil { + http.Error(w, "Failed to decode request body", http.StatusBadRequest) + return + } + + reqContext, ok := req["context"].(map[string]interface{}) + if !ok { + http.Error(w, "context field not found or invalid", http.StatusBadRequest) + return + } + + action, ok := reqContext["action"].(string) + if !ok { + http.Error(w, "action field not found or invalid", http.StatusBadRequest) + return + } + + // Apply transformation + mappedBody, err := engine.Transform(ctx, action, req, cfg.Role) + if err != nil { + log.Errorf(ctx, err, "Transformation failed for action %s", action) + // Fall back to original body on error + mappedBody = body + } + + r.Body = io.NopCloser(bytes.NewBuffer(mappedBody)) + r.ContentLength = int64(len(mappedBody)) + r = r.WithContext(ctx) + next.ServeHTTP(w, r) + }) + }, nil +} + +// initMappingEngine initializes or returns existing mapping engine +func initMappingEngine(cfg *Config) (*MappingEngine, error) { + var initErr error + + engineOnce.Do(func() { + engineInstance = &MappingEngine{ + config: cfg, + bapMaps: make(map[string]jsonata.Expression), + bppMaps: make(map[string]jsonata.Expression), + } + + instance, err := jsonata.OpenLatest() + if err != nil { + initErr = fmt.Errorf("failed to initialize jsonata: %w", err) + return + } + engineInstance.jsonataInstance = instance + + if err := engineInstance.loadBuiltinMappings(); err != nil { + initErr = err + return + } + + engineInstance.initialized = true + }) + + if initErr != nil { + return nil, initErr + } + + if !engineInstance.initialized { + return nil, errors.New("mapping engine failed to initialize") + } + + return engineInstance, nil +} + +func (e *MappingEngine) loadMappingsFromConfig() (map[string]builtinMapping, string, error) { + if e.config == nil || e.config.MappingsFile == "" { + return nil, "", errors.New("mappingsFile must be provided in config") + } + + data, err := os.ReadFile(e.config.MappingsFile) + if err != nil { + return nil, "", fmt.Errorf("failed to read mappings file %s: %w", e.config.MappingsFile, err) + } + source := e.config.MappingsFile + + var parsed mappingFile + if err := yaml.Unmarshal(data, &parsed); err != nil { + return nil, "", fmt.Errorf("failed to parse mappings from %s: %w", source, err) + } + + if len(parsed.Mappings) == 0 { + return nil, "", fmt.Errorf("no mappings found in %s", source) + } + + return parsed.Mappings, source, nil +} + +// loadBuiltinMappings compiles JSONata expressions for every action/direction pair from the configured mappings file. +func (e *MappingEngine) loadBuiltinMappings() error { + mappings, source, err := e.loadMappingsFromConfig() + if err != nil { + return err + } + + e.bapMaps = make(map[string]jsonata.Expression, len(mappings)) + e.bppMaps = make(map[string]jsonata.Expression, len(mappings)) + + for action, mapping := range mappings { + bapExpr, err := e.jsonataInstance.Compile(mapping.BAP, false) + if err != nil { + return fmt.Errorf("failed to compile BAP mapping for action %s: %w", action, err) + } + bppExpr, err := e.jsonataInstance.Compile(mapping.BPP, false) + if err != nil { + return fmt.Errorf("failed to compile BPP mapping for action %s: %w", action, err) + } + + e.bapMaps[action] = bapExpr + e.bppMaps[action] = bppExpr + } + + e.mappings = mappings + e.mappingSource = source + + log.Infof( + context.Background(), + "Loaded %d BAP mappings and %d BPP mappings from %s", + len(e.bapMaps), + len(e.bppMaps), + source, + ) + + return nil +} + +// Transform applies the appropriate mapping based on role and action +func (e *MappingEngine) Transform(ctx context.Context, action string, req map[string]interface{}, role string) ([]byte, error) { + e.mutex.RLock() + defer e.mutex.RUnlock() + + var expr jsonata.Expression + var found bool + + // Select appropriate mapping based on role + switch role { + case "bap": + expr, found = e.bapMaps[action] + case "bpp": + expr, found = e.bppMaps[action] + default: + return json.Marshal(req) + } + + // If no mapping found, return original request + if !found || expr == nil { + log.Debugf(ctx, "No mapping found for action: %s, role: %s", action, role) + return json.Marshal(req) + } + + // Marshal request for JSONata evaluation + input, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("failed to marshal request for mapping: %w", err) + } + + // Apply JSONata transformation + result, err := expr.Evaluate(input, nil) + if err != nil { + return nil, fmt.Errorf("JSONata evaluation failed: %w", err) + } + + log.Debugf(ctx, "Successfully transformed %s request using %s mapping, %s", action, role, result) + return result, nil +} + +// ReloadMappings reloads all mapping files (useful for hot-reload scenarios) +func (e *MappingEngine) ReloadMappings() error { + e.mutex.Lock() + defer e.mutex.Unlock() + + return e.loadBuiltinMappings() +} + +// GetMappingInfo returns information about loaded mappings +func (e *MappingEngine) GetMappingInfo() map[string]interface{} { + e.mutex.RLock() + defer e.mutex.RUnlock() + + bapActions := make([]string, 0, len(e.bapMaps)) + for action := range e.bapMaps { + bapActions = append(bapActions, action) + } + + bppActions := make([]string, 0, len(e.bppMaps)) + for action := range e.bppMaps { + bppActions = append(bppActions, action) + } + + return map[string]interface{}{ + "bap_mappings": bapActions, + "bpp_mappings": bppActions, + "mappings_source": e.mappingSource, + "action_count": len(e.mappings), + } +} + +func validateConfig(cfg *Config) error { + if cfg == nil { + return errors.New("config cannot be nil") + } + if cfg.Role != "bap" && cfg.Role != "bpp" { + return errors.New("role must be either 'bap' or 'bpp'") + } + if cfg.MappingsFile == "" { + return errors.New("mappingsFile is required") + } + return nil +} diff --git a/pkg/plugin/implementation/reqmapper/reqmapper_test.go b/pkg/plugin/implementation/reqmapper/reqmapper_test.go new file mode 100644 index 0000000..ecba12e --- /dev/null +++ b/pkg/plugin/implementation/reqmapper/reqmapper_test.go @@ -0,0 +1,240 @@ +package reqmapper + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "reflect" + "sync" + "testing" +) + +func resetEngineState(t *testing.T) { + t.Helper() + engineInstance = nil + engineOnce = sync.Once{} +} + +func testMappingsFile(t *testing.T) string { + t.Helper() + path := filepath.Join("testdata", "mappings.yaml") + if _, err := os.Stat(path); err != nil { + t.Fatalf("test mappings file missing: %v", err) + } + return path +} + +func initTestEngine(t *testing.T) *MappingEngine { + t.Helper() + resetEngineState(t) + engine, err := initMappingEngine(&Config{ + Role: "bap", + MappingsFile: testMappingsFile(t), + }) + if err != nil { + t.Fatalf("failed to init mapping engine: %v", err) + } + return engine +} + +func TestNewReqMapper_InvalidConfig(t *testing.T) { + t.Run("nil config", func(t *testing.T) { + if _, err := NewReqMapper(nil); err == nil { + t.Fatalf("expected error for nil config") + } + }) + + t.Run("invalid role", func(t *testing.T) { + if _, err := NewReqMapper(&Config{Role: "invalid"}); err == nil { + t.Fatalf("expected error for invalid role") + } + }) +} + +func TestNewReqMapper_MiddlewareTransformsRequest(t *testing.T) { + resetEngineState(t) + mw, err := NewReqMapper(&Config{ + Role: "bap", + MappingsFile: testMappingsFile(t), + }) + if err != nil { + t.Fatalf("NewReqMapper returned error: %v", err) + } + + startLocation := map[string]interface{}{ + "gps": "12.9716,77.5946", + "city": "Bengaluru", + } + endLocation := map[string]interface{}{ + "gps": "13.0827,80.2707", + "city": "Chennai", + } + + requestPayload := map[string]interface{}{ + "context": map[string]interface{}{ + "domain": "retail", + "action": "search", + "version": "1.1.0", + "bap_id": "bap.example", + "bap_uri": "https://bap.example/api", + "transaction_id": "txn-1", + "message_id": "msg-1", + "timestamp": "2023-01-01T10:00:00Z", + }, + "message": map[string]interface{}{ + "intent": map[string]interface{}{ + "item": map[string]interface{}{ + "id": "item-1", + }, + "provider": map[string]interface{}{ + "id": "provider-1", + }, + "fulfillment": map[string]interface{}{ + "start": map[string]interface{}{ + "location": startLocation, + }, + "end": map[string]interface{}{ + "location": endLocation, + }, + }, + }, + }, + } + + body, err := json.Marshal(requestPayload) + if err != nil { + t.Fatalf("failed to marshal request payload: %v", err) + } + + var captured map[string]interface{} + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + data, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("failed to read request in handler: %v", err) + } + if err := json.Unmarshal(data, &captured); err != nil { + t.Fatalf("failed to unmarshal transformed payload: %v", err) + } + w.WriteHeader(http.StatusOK) + }) + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + mw(next).ServeHTTP(rec, req) + + if captured == nil { + t.Fatalf("middleware did not forward request to next handler") + } + + message, ok := captured["message"].(map[string]interface{}) + if !ok { + t.Fatalf("expected message field in transformed payload") + } + + filters, ok := message["filters"].(map[string]interface{}) + if !ok { + t.Fatalf("expected filters in transformed payload") + } + + if pickup := filters["pickup"]; !reflect.DeepEqual(pickup, startLocation) { + t.Fatalf("pickup location mismatch\ngot: %#v\nwant: %#v", pickup, startLocation) + } + if drop := filters["drop"]; !reflect.DeepEqual(drop, endLocation) { + t.Fatalf("drop location mismatch\ngot: %#v\nwant: %#v", drop, endLocation) + } +} + +func TestMappingEngine_TransformFallbackForUnknownAction(t *testing.T) { + engine := initTestEngine(t) + req := map[string]interface{}{ + "context": map[string]interface{}{ + "action": "unknown_action", + }, + "message": map[string]interface{}{}, + } + + expected, err := json.Marshal(req) + if err != nil { + t.Fatalf("failed to marshal expected payload: %v", err) + } + + result, err := engine.Transform(context.Background(), "unknown_action", req, "bap") + if err != nil { + t.Fatalf("Transform returned error: %v", err) + } + if !bytes.Equal(result, expected) { + t.Fatalf("expected Transform to return original payload") + } +} + +func TestMappingEngine_TransformFallbackForUnknownRole(t *testing.T) { + engine := initTestEngine(t) + req := map[string]interface{}{ + "context": map[string]interface{}{ + "action": "search", + }, + "message": map[string]interface{}{}, + } + + expected, err := json.Marshal(req) + if err != nil { + t.Fatalf("failed to marshal expected payload: %v", err) + } + + result, err := engine.Transform(context.Background(), "search", req, "unknown-role") + if err != nil { + t.Fatalf("Transform returned error: %v", err) + } + + if !bytes.Equal(result, expected) { + t.Fatalf("expected Transform to return original payload when role is unknown") + } +} + +func TestMappingEngine_ReloadMappings(t *testing.T) { + engine := initTestEngine(t) + + engine.mutex.RLock() + originalBAP := len(engine.bapMaps) + originalBPP := len(engine.bppMaps) + engine.mutex.RUnlock() + + if originalBAP == 0 || originalBPP == 0 { + t.Fatalf("expected test mappings to be loaded") + } + + engine.mutex.Lock() + for action := range engine.bapMaps { + delete(engine.bapMaps, action) + break + } + engine.mutex.Unlock() + + engine.mutex.RLock() + if len(engine.bapMaps) == originalBAP { + engine.mutex.RUnlock() + t.Fatalf("expected BAP map to be altered before reload") + } + engine.mutex.RUnlock() + + if err := engine.ReloadMappings(); err != nil { + t.Fatalf("ReloadMappings returned error: %v", err) + } + + engine.mutex.RLock() + defer engine.mutex.RUnlock() + + if len(engine.bapMaps) != originalBAP { + t.Fatalf("expected BAP mappings to be reloaded, got %d want %d", len(engine.bapMaps), originalBAP) + } + if len(engine.bppMaps) != originalBPP { + t.Fatalf("expected BPP mappings to be reloaded, got %d want %d", len(engine.bppMaps), originalBPP) + } +} diff --git a/pkg/plugin/implementation/reqmapper/testdata/mappings.yaml b/pkg/plugin/implementation/reqmapper/testdata/mappings.yaml new file mode 100644 index 0000000..d34c405 --- /dev/null +++ b/pkg/plugin/implementation/reqmapper/testdata/mappings.yaml @@ -0,0 +1,17 @@ +mappings: + search: + bapMappings: | + { + "context": $.context, + "message": { + "filters": { + "pickup": $.message.intent.fulfillment.start.location, + "drop": $.message.intent.fulfillment.end.location + } + } + } + bppMappings: | + { + "context": $.context, + "message": $.message + } From 4da6ce6bde2ac33ba2bd63c6721b031b96d9aa80 Mon Sep 17 00:00:00 2001 From: Mayuresh A Nirhali Date: Fri, 5 Dec 2025 16:20:41 +0530 Subject: [PATCH 04/10] Update README.md --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 79f7330..cbe0684 100644 --- a/README.md +++ b/README.md @@ -129,8 +129,9 @@ The **Beckn Protocol** is an open protocol that enables location-aware, local co - **Encrypter**: AES encryption for sensitive data protection - **Decrypter**: AES decryption for encrypted data processing - **ReqPreprocessor**: Request preprocessing (UUID generation, headers) -- **Registry**: Standard Beckn registry lookup for participant information -- **DeDiRegistry**: DeDi registry to lookup public keys for NP. +- **ReqMapper**: Middleware to transform payload either between Beckn versions or against other platforms. +- **Registry**: Standard Beckn registry or Beckn One DeDi registry lookup for participant information + ## Quick Start ### Prerequisites From 4885e75decd989b14163a47e14673bc5d2f3829f Mon Sep 17 00:00:00 2001 From: Nirmal N R Date: Mon, 8 Dec 2025 11:52:09 +0530 Subject: [PATCH 05/10] docs: updated URL in example config files to beckn one registry URL. --- config/local-beckn-one-bap.yaml | 8 ++++---- config/local-beckn-one-bpp.yaml | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/config/local-beckn-one-bap.yaml b/config/local-beckn-one-bap.yaml index fb63f66..897d4b3 100644 --- a/config/local-beckn-one-bap.yaml +++ b/config/local-beckn-one-bap.yaml @@ -31,8 +31,8 @@ modules: registry: id: dediregistry config: - url: http://34.14.173.68:8080/dedi - registryName: subscribers.beckn.one + url: https://api.testnet.beckn.one/registry/dedi # This is the testnet URL. The production URL is https://api.beckn.one/registry/dedi + registryName: subscribers.beckn.one # This is the wildcard string used to lookup across registries in Beckn One. Do not change this. # This is the wildcard string used to lookup across registries in Beckn One. Do not change this. timeout: 10 retry_max: 3 retry_wait_min: 100ms @@ -83,8 +83,8 @@ modules: registry: id: dediregistry config: - url: http://34.14.173.68:8080/dedi - registryName: subscribers.beckn.one + url: https://api.testnet.beckn.one/registry/dedi # This is the testnet URL. The production URL is https://api.beckn.one/registry/dedi + registryName: subscribers.beckn.one # This is the wildcard string used to lookup across registries in Beckn One. Do not change this. timeout: 10 retry_max: 3 retry_wait_min: 100ms diff --git a/config/local-beckn-one-bpp.yaml b/config/local-beckn-one-bpp.yaml index 77a57aa..af527f3 100644 --- a/config/local-beckn-one-bpp.yaml +++ b/config/local-beckn-one-bpp.yaml @@ -31,8 +31,8 @@ modules: registry: id: dediregistry config: - url: http://34.14.173.68:8080/dedi - registryName: subscribers.beckn.one + url: https://api.testnet.beckn.one/registry/dedi # This is the testnet URL. The production URL is https://api.beckn.one/registry/dedi + registryName: subscribers.beckn.one # This is the wildcard string used to lookup across registries in Beckn One. Do not change this. timeout: 10 retry_max: 3 retry_wait_min: 100ms @@ -78,8 +78,8 @@ modules: registry: id: dediregistry config: - url: http://34.14.173.68:8080/dedi - registryName: subscribers.beckn.one + url: https://api.testnet.beckn.one/registry/dedi # This is the testnet URL. The production URL is https://api.beckn.one/registry/dedi + registryName: subscribers.beckn.one # This is the wildcard string used to lookup across registries in Beckn One. Do not change this. timeout: 10 retry_max: 3 retry_wait_min: 100ms From 1f8d9c12d2b54614ec0e9d8093a581c08ef14765 Mon Sep 17 00:00:00 2001 From: Nirmal N R Date: Mon, 8 Dec 2025 11:55:59 +0530 Subject: [PATCH 06/10] docs: update URL in example config files to beckn one registry URL --- config/local-beckn-one-bap.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/local-beckn-one-bap.yaml b/config/local-beckn-one-bap.yaml index 897d4b3..f9fcf43 100644 --- a/config/local-beckn-one-bap.yaml +++ b/config/local-beckn-one-bap.yaml @@ -32,7 +32,7 @@ modules: id: dediregistry config: url: https://api.testnet.beckn.one/registry/dedi # This is the testnet URL. The production URL is https://api.beckn.one/registry/dedi - registryName: subscribers.beckn.one # This is the wildcard string used to lookup across registries in Beckn One. Do not change this. # This is the wildcard string used to lookup across registries in Beckn One. Do not change this. + registryName: subscribers.beckn.one # This is the wildcard string used to lookup across registries in Beckn One. Do not change this. timeout: 10 retry_max: 3 retry_wait_min: 100ms From 20a924d43e722e2c23feccf7d69192dddf1b0dc3 Mon Sep 17 00:00:00 2001 From: Mayuresh Nirhali Date: Mon, 8 Dec 2025 12:23:36 +0530 Subject: [PATCH 07/10] 570 - transport wrapper initial checkin --- core/module/handler/config.go | 22 +++++++------- core/module/handler/stdHandler.go | 40 ++++++++++++++++---------- core/module/handler/stdHandler_test.go | 30 +++++++++---------- core/module/module_test.go | 5 ++++ pkg/plugin/manager.go | 22 ++++++++++++++ 5 files changed, 79 insertions(+), 40 deletions(-) diff --git a/core/module/handler/config.go b/core/module/handler/config.go index 7891a4d..96b2263 100644 --- a/core/module/handler/config.go +++ b/core/module/handler/config.go @@ -22,6 +22,7 @@ type PluginManager interface { Cache(ctx context.Context, cfg *plugin.Config) (definition.Cache, error) Registry(ctx context.Context, cfg *plugin.Config) (definition.RegistryLookup, error) KeyManager(ctx context.Context, cache definition.Cache, rLookup definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error) + TransportWrapper(ctx context.Context, cfg *plugin.Config) (definition.TransportWrapper, error) SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error) } @@ -35,16 +36,17 @@ const ( // PluginCfg holds the configuration for various plugins. type PluginCfg struct { - SchemaValidator *plugin.Config `yaml:"schemaValidator,omitempty"` - SignValidator *plugin.Config `yaml:"signValidator,omitempty"` - Publisher *plugin.Config `yaml:"publisher,omitempty"` - Signer *plugin.Config `yaml:"signer,omitempty"` - Router *plugin.Config `yaml:"router,omitempty"` - Cache *plugin.Config `yaml:"cache,omitempty"` - Registry *plugin.Config `yaml:"registry,omitempty"` - KeyManager *plugin.Config `yaml:"keyManager,omitempty"` - Middleware []plugin.Config `yaml:"middleware,omitempty"` - Steps []plugin.Config + SchemaValidator *plugin.Config `yaml:"schemaValidator,omitempty"` + SignValidator *plugin.Config `yaml:"signValidator,omitempty"` + Publisher *plugin.Config `yaml:"publisher,omitempty"` + Signer *plugin.Config `yaml:"signer,omitempty"` + Router *plugin.Config `yaml:"router,omitempty"` + Cache *plugin.Config `yaml:"cache,omitempty"` + Registry *plugin.Config `yaml:"registry,omitempty"` + KeyManager *plugin.Config `yaml:"keyManager,omitempty"` + TransportWrapper *plugin.Config `yaml:"transportWrapper,omitempty"` + Middleware []plugin.Config `yaml:"middleware,omitempty"` + Steps []plugin.Config } // HttpClientConfig defines the configuration for the HTTP transport layer. diff --git a/core/module/handler/stdHandler.go b/core/module/handler/stdHandler.go index 026b28f..43714ba 100644 --- a/core/module/handler/stdHandler.go +++ b/core/module/handler/stdHandler.go @@ -17,22 +17,23 @@ import ( // stdHandler orchestrates the execution of defined processing steps. type stdHandler struct { - signer definition.Signer - steps []definition.Step - signValidator definition.SignValidator - cache definition.Cache - registry definition.RegistryLookup - km definition.KeyManager - schemaValidator definition.SchemaValidator - router definition.Router - publisher definition.Publisher - SubscriberID string - role model.Role - httpClient *http.Client + signer definition.Signer + steps []definition.Step + signValidator definition.SignValidator + cache definition.Cache + registry definition.RegistryLookup + km definition.KeyManager + schemaValidator definition.SchemaValidator + router definition.Router + publisher definition.Publisher + transportWrapper definition.TransportWrapper + SubscriberID string + role model.Role + httpClient *http.Client } // newHTTPClient creates a new HTTP client with a custom transport configuration. -func newHTTPClient(cfg *HttpClientConfig) *http.Client { +func newHTTPClient(cfg *HttpClientConfig, wrapper definition.TransportWrapper) *http.Client { // Clone the default transport to inherit its sensible defaults. transport := http.DefaultTransport.(*http.Transport).Clone() @@ -50,7 +51,12 @@ func newHTTPClient(cfg *HttpClientConfig) *http.Client { if cfg.ResponseHeaderTimeout > 0 { transport.ResponseHeaderTimeout = cfg.ResponseHeaderTimeout } - return &http.Client{Transport: transport} + var finalTransport http.RoundTripper = transport + if wrapper != nil { + log.Debugf(context.Background(), "Applying custom transport wrapper") + finalTransport = wrapper.Wrap(transport) + } + return &http.Client{Transport: finalTransport} } // NewStdHandler initializes a new processor with plugins and steps. @@ -59,12 +65,13 @@ func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Ha steps: []definition.Step{}, SubscriberID: cfg.SubscriberID, role: cfg.Role, - httpClient: newHTTPClient(&cfg.HttpClientConfig), } // Initialize plugins. if err := h.initPlugins(ctx, mgr, &cfg.Plugins); err != nil { return nil, fmt.Errorf("failed to initialize plugins: %w", err) } + // Initialize HTTP client after plugins so transport wrapper can be applied. + h.httpClient = newHTTPClient(&cfg.HttpClientConfig, h.transportWrapper) // Initialize steps. if err := h.initSteps(ctx, mgr, cfg); err != nil { return nil, fmt.Errorf("failed to initialize steps: %w", err) @@ -244,6 +251,9 @@ func (h *stdHandler) initPlugins(ctx context.Context, mgr PluginManager, cfg *Pl if h.signer, err = loadPlugin(ctx, "Signer", cfg.Signer, mgr.Signer); err != nil { return err } + if h.transportWrapper, err = loadPlugin(ctx, "TransportWrapper", cfg.TransportWrapper, mgr.TransportWrapper); err != nil { + return err + } log.Debugf(ctx, "All required plugins successfully loaded for stdHandler") return nil diff --git a/core/module/handler/stdHandler_test.go b/core/module/handler/stdHandler_test.go index bf65840..c5b65a6 100644 --- a/core/module/handler/stdHandler_test.go +++ b/core/module/handler/stdHandler_test.go @@ -55,8 +55,8 @@ func TestNewHTTPClient(t *testing.T) { { name: "partial configuration", config: HttpClientConfig{ - MaxIdleConns: 500, - IdleConnTimeout: 180 * time.Second, + MaxIdleConns: 500, + IdleConnTimeout: 180 * time.Second, }, expected: struct { maxIdleConns int @@ -74,8 +74,8 @@ func TestNewHTTPClient(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - client := newHTTPClient(&tt.config) - + client := newHTTPClient(&tt.config, nil) + if client == nil { t.Fatal("newHTTPClient returned nil") } @@ -107,15 +107,15 @@ func TestNewHTTPClient(t *testing.T) { func TestHttpClientConfigDefaults(t *testing.T) { // Test that zero config values don't override defaults config := &HttpClientConfig{} - client := newHTTPClient(config) - + client := newHTTPClient(config, nil) + transport := client.Transport.(*http.Transport) - + // Verify defaults are preserved when config values are zero if transport.MaxIdleConns == 0 { t.Error("MaxIdleConns should not be zero when using defaults") } - + // MaxIdleConnsPerHost default is 0 (unlimited), which is correct if transport.MaxIdleConns != 100 { t.Errorf("Expected default MaxIdleConns=100, got %d", transport.MaxIdleConns) @@ -130,24 +130,24 @@ func TestHttpClientConfigPerformanceValues(t *testing.T) { IdleConnTimeout: 300 * time.Second, ResponseHeaderTimeout: 5 * time.Second, } - - client := newHTTPClient(config) + + client := newHTTPClient(config, nil) transport := client.Transport.(*http.Transport) - + // Verify performance-optimized values if transport.MaxIdleConns != 1000 { t.Errorf("Expected MaxIdleConns=1000, got %d", transport.MaxIdleConns) } - + if transport.MaxIdleConnsPerHost != 200 { t.Errorf("Expected MaxIdleConnsPerHost=200, got %d", transport.MaxIdleConnsPerHost) } - + if transport.IdleConnTimeout != 300*time.Second { t.Errorf("Expected IdleConnTimeout=300s, got %v", transport.IdleConnTimeout) } - + if transport.ResponseHeaderTimeout != 5*time.Second { t.Errorf("Expected ResponseHeaderTimeout=5s, got %v", transport.ResponseHeaderTimeout) } -} \ No newline at end of file +} diff --git a/core/module/module_test.go b/core/module/module_test.go index 4921fd0..0c810ae 100644 --- a/core/module/module_test.go +++ b/core/module/module_test.go @@ -69,6 +69,11 @@ func (m *mockPluginManager) KeyManager(ctx context.Context, cache definition.Cac return nil, nil } +// TransportWrapper returns a mock transport wrapper implementation. +func (m *mockPluginManager) TransportWrapper(ctx context.Context, cfg *plugin.Config) (definition.TransportWrapper, error) { + return nil, nil +} + // SchemaValidator returns a mock schema validator implementation. func (m *mockPluginManager) SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error) { return nil, nil diff --git a/pkg/plugin/manager.go b/pkg/plugin/manager.go index fd5afe5..a06ade2 100644 --- a/pkg/plugin/manager.go +++ b/pkg/plugin/manager.go @@ -196,6 +196,28 @@ func (m *Manager) Middleware(ctx context.Context, cfg *Config) (func(http.Handle return mwp.New(ctx, cfg.Config) } +// TransportWrapper returns a TransportWrapper instance based on the provided configuration. +func (m *Manager) TransportWrapper(ctx context.Context, cfg *Config) (definition.TransportWrapper, error) { + twp, err := provider[definition.TransportWrapperProvider](m.plugins, cfg.ID) + if err != nil { + return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err) + } + + config := make(map[string]any, len(cfg.Config)) + for k, v := range cfg.Config { + config[k] = v + } + + wrapper, closer, err := twp.New(ctx, config) + if err != nil { + return nil, err + } + if closer != nil { + m.closers = append(m.closers, closer) + } + return wrapper, nil +} + // Step returns a Step instance based on the provided configuration. func (m *Manager) Step(ctx context.Context, cfg *Config) (definition.Step, error) { sp, err := provider[definition.StepProvider](m.plugins, cfg.ID) From 6fffe7411d48b96cd4ea5fbd1cc3d461b5fe7e15 Mon Sep 17 00:00:00 2001 From: Mayuresh Nirhali Date: Mon, 8 Dec 2025 14:33:19 +0530 Subject: [PATCH 08/10] added a test case with no nil transport wrapper --- core/module/handler/stdHandler_test.go | 42 ++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/core/module/handler/stdHandler_test.go b/core/module/handler/stdHandler_test.go index c5b65a6..b7215ec 100644 --- a/core/module/handler/stdHandler_test.go +++ b/core/module/handler/stdHandler_test.go @@ -151,3 +151,45 @@ func TestHttpClientConfigPerformanceValues(t *testing.T) { t.Errorf("Expected ResponseHeaderTimeout=5s, got %v", transport.ResponseHeaderTimeout) } } + +func TestNewHTTPClientWithTransportWrapper(t *testing.T) { + wrappedTransport := &mockRoundTripper{} + wrapper := &mockTransportWrapper{ + returnTransport: wrappedTransport, + } + + client := newHTTPClient(&HttpClientConfig{}, wrapper) + + if !wrapper.wrapCalled { + t.Fatal("expected transport wrapper to be invoked") + } + + if wrapper.wrappedTransport == nil { + t.Fatal("expected base transport to be passed to wrapper") + } + + if client.Transport != wrappedTransport { + t.Errorf("expected client transport to use wrapper transport") + } +} + +type mockTransportWrapper struct { + wrapCalled bool + wrappedTransport http.RoundTripper + returnTransport http.RoundTripper +} + +func (m *mockTransportWrapper) Wrap(base http.RoundTripper) http.RoundTripper { + m.wrapCalled = true + m.wrappedTransport = base + if m.returnTransport != nil { + return m.returnTransport + } + return base +} + +type mockRoundTripper struct{} + +func (m *mockRoundTripper) RoundTrip(_ *http.Request) (*http.Response, error) { + return nil, nil +} From 57f6eae070d117af68003e4c50510c9aac3f1f29 Mon Sep 17 00:00:00 2001 From: ameersohel45 Date: Mon, 8 Dec 2025 15:36:07 +0530 Subject: [PATCH 09/10] Issue 569 - fix : Remove payload byte array from error logs --- core/module/handler/stdHandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/module/handler/stdHandler.go b/core/module/handler/stdHandler.go index 026b28f..532f028 100644 --- a/core/module/handler/stdHandler.go +++ b/core/module/handler/stdHandler.go @@ -85,7 +85,7 @@ func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Execute processing steps. for _, step := range h.steps { if err := step.Run(ctx); err != nil { - log.Errorf(ctx, err, "%T.run(%v):%v", step, ctx, err) + log.Errorf(ctx, err, "%T.run():%v", step, err) response.SendNack(ctx, w, err) return } From 33cd3dc31f2235ab08ae20d4acd013abbc8b10c5 Mon Sep 17 00:00:00 2001 From: Mayuresh Nirhali Date: Fri, 12 Dec 2025 13:06:11 +0530 Subject: [PATCH 10/10] adding missing file for transport wrapper --- pkg/plugin/definition/transport.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 pkg/plugin/definition/transport.go diff --git a/pkg/plugin/definition/transport.go b/pkg/plugin/definition/transport.go new file mode 100644 index 0000000..370c65e --- /dev/null +++ b/pkg/plugin/definition/transport.go @@ -0,0 +1,18 @@ +package definition + +import ( + "context" + "net/http" +) + +// TransportWrapper is a plugin that wraps an http.RoundTripper, +// allowing modification of outbound requests (like adding auth). +type TransportWrapper interface { + // Wrap takes a base transport and returns a new transport that wraps it. + Wrap(base http.RoundTripper) http.RoundTripper +} + +// TransportWrapperProvider defines the factory for a TransportWrapper. +type TransportWrapperProvider interface { + New(ctx context.Context, config map[string]any) (TransportWrapper, func(), error) +}