Merge pull request #578 from Beckn-One/570-transportwrapper

570 - transport wrapper initial checkin
This commit is contained in:
Mayuresh A Nirhali
2025-12-08 16:48:12 +05:30
committed by GitHub
5 changed files with 121 additions and 40 deletions

View File

@@ -22,6 +22,7 @@ type PluginManager interface {
Cache(ctx context.Context, cfg *plugin.Config) (definition.Cache, error)
Registry(ctx context.Context, cfg *plugin.Config) (definition.RegistryLookup, error)
KeyManager(ctx context.Context, cache definition.Cache, rLookup definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error)
TransportWrapper(ctx context.Context, cfg *plugin.Config) (definition.TransportWrapper, error)
SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error)
}
@@ -35,16 +36,17 @@ const (
// PluginCfg holds the configuration for various plugins.
type PluginCfg struct {
SchemaValidator *plugin.Config `yaml:"schemaValidator,omitempty"`
SignValidator *plugin.Config `yaml:"signValidator,omitempty"`
Publisher *plugin.Config `yaml:"publisher,omitempty"`
Signer *plugin.Config `yaml:"signer,omitempty"`
Router *plugin.Config `yaml:"router,omitempty"`
Cache *plugin.Config `yaml:"cache,omitempty"`
Registry *plugin.Config `yaml:"registry,omitempty"`
KeyManager *plugin.Config `yaml:"keyManager,omitempty"`
Middleware []plugin.Config `yaml:"middleware,omitempty"`
Steps []plugin.Config
SchemaValidator *plugin.Config `yaml:"schemaValidator,omitempty"`
SignValidator *plugin.Config `yaml:"signValidator,omitempty"`
Publisher *plugin.Config `yaml:"publisher,omitempty"`
Signer *plugin.Config `yaml:"signer,omitempty"`
Router *plugin.Config `yaml:"router,omitempty"`
Cache *plugin.Config `yaml:"cache,omitempty"`
Registry *plugin.Config `yaml:"registry,omitempty"`
KeyManager *plugin.Config `yaml:"keyManager,omitempty"`
TransportWrapper *plugin.Config `yaml:"transportWrapper,omitempty"`
Middleware []plugin.Config `yaml:"middleware,omitempty"`
Steps []plugin.Config
}
// HttpClientConfig defines the configuration for the HTTP transport layer.

View File

@@ -17,22 +17,23 @@ import (
// stdHandler orchestrates the execution of defined processing steps.
type stdHandler struct {
signer definition.Signer
steps []definition.Step
signValidator definition.SignValidator
cache definition.Cache
registry definition.RegistryLookup
km definition.KeyManager
schemaValidator definition.SchemaValidator
router definition.Router
publisher definition.Publisher
SubscriberID string
role model.Role
httpClient *http.Client
signer definition.Signer
steps []definition.Step
signValidator definition.SignValidator
cache definition.Cache
registry definition.RegistryLookup
km definition.KeyManager
schemaValidator definition.SchemaValidator
router definition.Router
publisher definition.Publisher
transportWrapper definition.TransportWrapper
SubscriberID string
role model.Role
httpClient *http.Client
}
// newHTTPClient creates a new HTTP client with a custom transport configuration.
func newHTTPClient(cfg *HttpClientConfig) *http.Client {
func newHTTPClient(cfg *HttpClientConfig, wrapper definition.TransportWrapper) *http.Client {
// Clone the default transport to inherit its sensible defaults.
transport := http.DefaultTransport.(*http.Transport).Clone()
@@ -50,7 +51,12 @@ func newHTTPClient(cfg *HttpClientConfig) *http.Client {
if cfg.ResponseHeaderTimeout > 0 {
transport.ResponseHeaderTimeout = cfg.ResponseHeaderTimeout
}
return &http.Client{Transport: transport}
var finalTransport http.RoundTripper = transport
if wrapper != nil {
log.Debugf(context.Background(), "Applying custom transport wrapper")
finalTransport = wrapper.Wrap(transport)
}
return &http.Client{Transport: finalTransport}
}
// NewStdHandler initializes a new processor with plugins and steps.
@@ -59,12 +65,13 @@ func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Ha
steps: []definition.Step{},
SubscriberID: cfg.SubscriberID,
role: cfg.Role,
httpClient: newHTTPClient(&cfg.HttpClientConfig),
}
// Initialize plugins.
if err := h.initPlugins(ctx, mgr, &cfg.Plugins); err != nil {
return nil, fmt.Errorf("failed to initialize plugins: %w", err)
}
// Initialize HTTP client after plugins so transport wrapper can be applied.
h.httpClient = newHTTPClient(&cfg.HttpClientConfig, h.transportWrapper)
// Initialize steps.
if err := h.initSteps(ctx, mgr, cfg); err != nil {
return nil, fmt.Errorf("failed to initialize steps: %w", err)
@@ -244,6 +251,9 @@ func (h *stdHandler) initPlugins(ctx context.Context, mgr PluginManager, cfg *Pl
if h.signer, err = loadPlugin(ctx, "Signer", cfg.Signer, mgr.Signer); err != nil {
return err
}
if h.transportWrapper, err = loadPlugin(ctx, "TransportWrapper", cfg.TransportWrapper, mgr.TransportWrapper); err != nil {
return err
}
log.Debugf(ctx, "All required plugins successfully loaded for stdHandler")
return nil

View File

@@ -55,8 +55,8 @@ func TestNewHTTPClient(t *testing.T) {
{
name: "partial configuration",
config: HttpClientConfig{
MaxIdleConns: 500,
IdleConnTimeout: 180 * time.Second,
MaxIdleConns: 500,
IdleConnTimeout: 180 * time.Second,
},
expected: struct {
maxIdleConns int
@@ -74,8 +74,8 @@ func TestNewHTTPClient(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client := newHTTPClient(&tt.config)
client := newHTTPClient(&tt.config, nil)
if client == nil {
t.Fatal("newHTTPClient returned nil")
}
@@ -107,15 +107,15 @@ func TestNewHTTPClient(t *testing.T) {
func TestHttpClientConfigDefaults(t *testing.T) {
// Test that zero config values don't override defaults
config := &HttpClientConfig{}
client := newHTTPClient(config)
client := newHTTPClient(config, nil)
transport := client.Transport.(*http.Transport)
// Verify defaults are preserved when config values are zero
if transport.MaxIdleConns == 0 {
t.Error("MaxIdleConns should not be zero when using defaults")
}
// MaxIdleConnsPerHost default is 0 (unlimited), which is correct
if transport.MaxIdleConns != 100 {
t.Errorf("Expected default MaxIdleConns=100, got %d", transport.MaxIdleConns)
@@ -130,24 +130,66 @@ func TestHttpClientConfigPerformanceValues(t *testing.T) {
IdleConnTimeout: 300 * time.Second,
ResponseHeaderTimeout: 5 * time.Second,
}
client := newHTTPClient(config)
client := newHTTPClient(config, nil)
transport := client.Transport.(*http.Transport)
// Verify performance-optimized values
if transport.MaxIdleConns != 1000 {
t.Errorf("Expected MaxIdleConns=1000, got %d", transport.MaxIdleConns)
}
if transport.MaxIdleConnsPerHost != 200 {
t.Errorf("Expected MaxIdleConnsPerHost=200, got %d", transport.MaxIdleConnsPerHost)
}
if transport.IdleConnTimeout != 300*time.Second {
t.Errorf("Expected IdleConnTimeout=300s, got %v", transport.IdleConnTimeout)
}
if transport.ResponseHeaderTimeout != 5*time.Second {
t.Errorf("Expected ResponseHeaderTimeout=5s, got %v", transport.ResponseHeaderTimeout)
}
}
}
func TestNewHTTPClientWithTransportWrapper(t *testing.T) {
wrappedTransport := &mockRoundTripper{}
wrapper := &mockTransportWrapper{
returnTransport: wrappedTransport,
}
client := newHTTPClient(&HttpClientConfig{}, wrapper)
if !wrapper.wrapCalled {
t.Fatal("expected transport wrapper to be invoked")
}
if wrapper.wrappedTransport == nil {
t.Fatal("expected base transport to be passed to wrapper")
}
if client.Transport != wrappedTransport {
t.Errorf("expected client transport to use wrapper transport")
}
}
type mockTransportWrapper struct {
wrapCalled bool
wrappedTransport http.RoundTripper
returnTransport http.RoundTripper
}
func (m *mockTransportWrapper) Wrap(base http.RoundTripper) http.RoundTripper {
m.wrapCalled = true
m.wrappedTransport = base
if m.returnTransport != nil {
return m.returnTransport
}
return base
}
type mockRoundTripper struct{}
func (m *mockRoundTripper) RoundTrip(_ *http.Request) (*http.Response, error) {
return nil, nil
}

View File

@@ -69,6 +69,11 @@ func (m *mockPluginManager) KeyManager(ctx context.Context, cache definition.Cac
return nil, nil
}
// TransportWrapper returns a mock transport wrapper implementation.
func (m *mockPluginManager) TransportWrapper(ctx context.Context, cfg *plugin.Config) (definition.TransportWrapper, error) {
return nil, nil
}
// SchemaValidator returns a mock schema validator implementation.
func (m *mockPluginManager) SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error) {
return nil, nil