diff --git a/core/module/handler/config.go b/core/module/handler/config.go index 7891a4d..96b2263 100644 --- a/core/module/handler/config.go +++ b/core/module/handler/config.go @@ -22,6 +22,7 @@ type PluginManager interface { Cache(ctx context.Context, cfg *plugin.Config) (definition.Cache, error) Registry(ctx context.Context, cfg *plugin.Config) (definition.RegistryLookup, error) KeyManager(ctx context.Context, cache definition.Cache, rLookup definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error) + TransportWrapper(ctx context.Context, cfg *plugin.Config) (definition.TransportWrapper, error) SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error) } @@ -35,16 +36,17 @@ const ( // PluginCfg holds the configuration for various plugins. type PluginCfg struct { - SchemaValidator *plugin.Config `yaml:"schemaValidator,omitempty"` - SignValidator *plugin.Config `yaml:"signValidator,omitempty"` - Publisher *plugin.Config `yaml:"publisher,omitempty"` - Signer *plugin.Config `yaml:"signer,omitempty"` - Router *plugin.Config `yaml:"router,omitempty"` - Cache *plugin.Config `yaml:"cache,omitempty"` - Registry *plugin.Config `yaml:"registry,omitempty"` - KeyManager *plugin.Config `yaml:"keyManager,omitempty"` - Middleware []plugin.Config `yaml:"middleware,omitempty"` - Steps []plugin.Config + SchemaValidator *plugin.Config `yaml:"schemaValidator,omitempty"` + SignValidator *plugin.Config `yaml:"signValidator,omitempty"` + Publisher *plugin.Config `yaml:"publisher,omitempty"` + Signer *plugin.Config `yaml:"signer,omitempty"` + Router *plugin.Config `yaml:"router,omitempty"` + Cache *plugin.Config `yaml:"cache,omitempty"` + Registry *plugin.Config `yaml:"registry,omitempty"` + KeyManager *plugin.Config `yaml:"keyManager,omitempty"` + TransportWrapper *plugin.Config `yaml:"transportWrapper,omitempty"` + Middleware []plugin.Config `yaml:"middleware,omitempty"` + Steps []plugin.Config } // HttpClientConfig defines the configuration for the HTTP transport layer. diff --git a/core/module/handler/stdHandler.go b/core/module/handler/stdHandler.go index 026b28f..43714ba 100644 --- a/core/module/handler/stdHandler.go +++ b/core/module/handler/stdHandler.go @@ -17,22 +17,23 @@ import ( // stdHandler orchestrates the execution of defined processing steps. type stdHandler struct { - signer definition.Signer - steps []definition.Step - signValidator definition.SignValidator - cache definition.Cache - registry definition.RegistryLookup - km definition.KeyManager - schemaValidator definition.SchemaValidator - router definition.Router - publisher definition.Publisher - SubscriberID string - role model.Role - httpClient *http.Client + signer definition.Signer + steps []definition.Step + signValidator definition.SignValidator + cache definition.Cache + registry definition.RegistryLookup + km definition.KeyManager + schemaValidator definition.SchemaValidator + router definition.Router + publisher definition.Publisher + transportWrapper definition.TransportWrapper + SubscriberID string + role model.Role + httpClient *http.Client } // newHTTPClient creates a new HTTP client with a custom transport configuration. -func newHTTPClient(cfg *HttpClientConfig) *http.Client { +func newHTTPClient(cfg *HttpClientConfig, wrapper definition.TransportWrapper) *http.Client { // Clone the default transport to inherit its sensible defaults. transport := http.DefaultTransport.(*http.Transport).Clone() @@ -50,7 +51,12 @@ func newHTTPClient(cfg *HttpClientConfig) *http.Client { if cfg.ResponseHeaderTimeout > 0 { transport.ResponseHeaderTimeout = cfg.ResponseHeaderTimeout } - return &http.Client{Transport: transport} + var finalTransport http.RoundTripper = transport + if wrapper != nil { + log.Debugf(context.Background(), "Applying custom transport wrapper") + finalTransport = wrapper.Wrap(transport) + } + return &http.Client{Transport: finalTransport} } // NewStdHandler initializes a new processor with plugins and steps. @@ -59,12 +65,13 @@ func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Ha steps: []definition.Step{}, SubscriberID: cfg.SubscriberID, role: cfg.Role, - httpClient: newHTTPClient(&cfg.HttpClientConfig), } // Initialize plugins. if err := h.initPlugins(ctx, mgr, &cfg.Plugins); err != nil { return nil, fmt.Errorf("failed to initialize plugins: %w", err) } + // Initialize HTTP client after plugins so transport wrapper can be applied. + h.httpClient = newHTTPClient(&cfg.HttpClientConfig, h.transportWrapper) // Initialize steps. if err := h.initSteps(ctx, mgr, cfg); err != nil { return nil, fmt.Errorf("failed to initialize steps: %w", err) @@ -244,6 +251,9 @@ func (h *stdHandler) initPlugins(ctx context.Context, mgr PluginManager, cfg *Pl if h.signer, err = loadPlugin(ctx, "Signer", cfg.Signer, mgr.Signer); err != nil { return err } + if h.transportWrapper, err = loadPlugin(ctx, "TransportWrapper", cfg.TransportWrapper, mgr.TransportWrapper); err != nil { + return err + } log.Debugf(ctx, "All required plugins successfully loaded for stdHandler") return nil diff --git a/core/module/handler/stdHandler_test.go b/core/module/handler/stdHandler_test.go index bf65840..c5b65a6 100644 --- a/core/module/handler/stdHandler_test.go +++ b/core/module/handler/stdHandler_test.go @@ -55,8 +55,8 @@ func TestNewHTTPClient(t *testing.T) { { name: "partial configuration", config: HttpClientConfig{ - MaxIdleConns: 500, - IdleConnTimeout: 180 * time.Second, + MaxIdleConns: 500, + IdleConnTimeout: 180 * time.Second, }, expected: struct { maxIdleConns int @@ -74,8 +74,8 @@ func TestNewHTTPClient(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - client := newHTTPClient(&tt.config) - + client := newHTTPClient(&tt.config, nil) + if client == nil { t.Fatal("newHTTPClient returned nil") } @@ -107,15 +107,15 @@ func TestNewHTTPClient(t *testing.T) { func TestHttpClientConfigDefaults(t *testing.T) { // Test that zero config values don't override defaults config := &HttpClientConfig{} - client := newHTTPClient(config) - + client := newHTTPClient(config, nil) + transport := client.Transport.(*http.Transport) - + // Verify defaults are preserved when config values are zero if transport.MaxIdleConns == 0 { t.Error("MaxIdleConns should not be zero when using defaults") } - + // MaxIdleConnsPerHost default is 0 (unlimited), which is correct if transport.MaxIdleConns != 100 { t.Errorf("Expected default MaxIdleConns=100, got %d", transport.MaxIdleConns) @@ -130,24 +130,24 @@ func TestHttpClientConfigPerformanceValues(t *testing.T) { IdleConnTimeout: 300 * time.Second, ResponseHeaderTimeout: 5 * time.Second, } - - client := newHTTPClient(config) + + client := newHTTPClient(config, nil) transport := client.Transport.(*http.Transport) - + // Verify performance-optimized values if transport.MaxIdleConns != 1000 { t.Errorf("Expected MaxIdleConns=1000, got %d", transport.MaxIdleConns) } - + if transport.MaxIdleConnsPerHost != 200 { t.Errorf("Expected MaxIdleConnsPerHost=200, got %d", transport.MaxIdleConnsPerHost) } - + if transport.IdleConnTimeout != 300*time.Second { t.Errorf("Expected IdleConnTimeout=300s, got %v", transport.IdleConnTimeout) } - + if transport.ResponseHeaderTimeout != 5*time.Second { t.Errorf("Expected ResponseHeaderTimeout=5s, got %v", transport.ResponseHeaderTimeout) } -} \ No newline at end of file +} diff --git a/core/module/module_test.go b/core/module/module_test.go index 4921fd0..0c810ae 100644 --- a/core/module/module_test.go +++ b/core/module/module_test.go @@ -69,6 +69,11 @@ func (m *mockPluginManager) KeyManager(ctx context.Context, cache definition.Cac return nil, nil } +// TransportWrapper returns a mock transport wrapper implementation. +func (m *mockPluginManager) TransportWrapper(ctx context.Context, cfg *plugin.Config) (definition.TransportWrapper, error) { + return nil, nil +} + // SchemaValidator returns a mock schema validator implementation. func (m *mockPluginManager) SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error) { return nil, nil diff --git a/pkg/plugin/manager.go b/pkg/plugin/manager.go index fd5afe5..a06ade2 100644 --- a/pkg/plugin/manager.go +++ b/pkg/plugin/manager.go @@ -196,6 +196,28 @@ func (m *Manager) Middleware(ctx context.Context, cfg *Config) (func(http.Handle return mwp.New(ctx, cfg.Config) } +// TransportWrapper returns a TransportWrapper instance based on the provided configuration. +func (m *Manager) TransportWrapper(ctx context.Context, cfg *Config) (definition.TransportWrapper, error) { + twp, err := provider[definition.TransportWrapperProvider](m.plugins, cfg.ID) + if err != nil { + return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err) + } + + config := make(map[string]any, len(cfg.Config)) + for k, v := range cfg.Config { + config[k] = v + } + + wrapper, closer, err := twp.New(ctx, config) + if err != nil { + return nil, err + } + if closer != nil { + m.closers = append(m.closers, closer) + } + return wrapper, nil +} + // Step returns a Step instance based on the provided configuration. func (m *Manager) Step(ctx context.Context, cfg *Config) (definition.Step, error) { sp, err := provider[definition.StepProvider](m.plugins, cfg.ID)