570 - transport wrapper initial checkin

This commit is contained in:
Mayuresh Nirhali
2025-12-08 12:23:36 +05:30
parent 4da6ce6bde
commit 20a924d43e
5 changed files with 79 additions and 40 deletions

View File

@@ -22,6 +22,7 @@ type PluginManager interface {
Cache(ctx context.Context, cfg *plugin.Config) (definition.Cache, error) Cache(ctx context.Context, cfg *plugin.Config) (definition.Cache, error)
Registry(ctx context.Context, cfg *plugin.Config) (definition.RegistryLookup, error) Registry(ctx context.Context, cfg *plugin.Config) (definition.RegistryLookup, error)
KeyManager(ctx context.Context, cache definition.Cache, rLookup definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error) KeyManager(ctx context.Context, cache definition.Cache, rLookup definition.RegistryLookup, cfg *plugin.Config) (definition.KeyManager, error)
TransportWrapper(ctx context.Context, cfg *plugin.Config) (definition.TransportWrapper, error)
SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error) SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error)
} }
@@ -43,6 +44,7 @@ type PluginCfg struct {
Cache *plugin.Config `yaml:"cache,omitempty"` Cache *plugin.Config `yaml:"cache,omitempty"`
Registry *plugin.Config `yaml:"registry,omitempty"` Registry *plugin.Config `yaml:"registry,omitempty"`
KeyManager *plugin.Config `yaml:"keyManager,omitempty"` KeyManager *plugin.Config `yaml:"keyManager,omitempty"`
TransportWrapper *plugin.Config `yaml:"transportWrapper,omitempty"`
Middleware []plugin.Config `yaml:"middleware,omitempty"` Middleware []plugin.Config `yaml:"middleware,omitempty"`
Steps []plugin.Config Steps []plugin.Config
} }

View File

@@ -26,13 +26,14 @@ type stdHandler struct {
schemaValidator definition.SchemaValidator schemaValidator definition.SchemaValidator
router definition.Router router definition.Router
publisher definition.Publisher publisher definition.Publisher
transportWrapper definition.TransportWrapper
SubscriberID string SubscriberID string
role model.Role role model.Role
httpClient *http.Client httpClient *http.Client
} }
// newHTTPClient creates a new HTTP client with a custom transport configuration. // newHTTPClient creates a new HTTP client with a custom transport configuration.
func newHTTPClient(cfg *HttpClientConfig) *http.Client { func newHTTPClient(cfg *HttpClientConfig, wrapper definition.TransportWrapper) *http.Client {
// Clone the default transport to inherit its sensible defaults. // Clone the default transport to inherit its sensible defaults.
transport := http.DefaultTransport.(*http.Transport).Clone() transport := http.DefaultTransport.(*http.Transport).Clone()
@@ -50,7 +51,12 @@ func newHTTPClient(cfg *HttpClientConfig) *http.Client {
if cfg.ResponseHeaderTimeout > 0 { if cfg.ResponseHeaderTimeout > 0 {
transport.ResponseHeaderTimeout = cfg.ResponseHeaderTimeout transport.ResponseHeaderTimeout = cfg.ResponseHeaderTimeout
} }
return &http.Client{Transport: transport} var finalTransport http.RoundTripper = transport
if wrapper != nil {
log.Debugf(context.Background(), "Applying custom transport wrapper")
finalTransport = wrapper.Wrap(transport)
}
return &http.Client{Transport: finalTransport}
} }
// NewStdHandler initializes a new processor with plugins and steps. // NewStdHandler initializes a new processor with plugins and steps.
@@ -59,12 +65,13 @@ func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Ha
steps: []definition.Step{}, steps: []definition.Step{},
SubscriberID: cfg.SubscriberID, SubscriberID: cfg.SubscriberID,
role: cfg.Role, role: cfg.Role,
httpClient: newHTTPClient(&cfg.HttpClientConfig),
} }
// Initialize plugins. // Initialize plugins.
if err := h.initPlugins(ctx, mgr, &cfg.Plugins); err != nil { if err := h.initPlugins(ctx, mgr, &cfg.Plugins); err != nil {
return nil, fmt.Errorf("failed to initialize plugins: %w", err) return nil, fmt.Errorf("failed to initialize plugins: %w", err)
} }
// Initialize HTTP client after plugins so transport wrapper can be applied.
h.httpClient = newHTTPClient(&cfg.HttpClientConfig, h.transportWrapper)
// Initialize steps. // Initialize steps.
if err := h.initSteps(ctx, mgr, cfg); err != nil { if err := h.initSteps(ctx, mgr, cfg); err != nil {
return nil, fmt.Errorf("failed to initialize steps: %w", err) return nil, fmt.Errorf("failed to initialize steps: %w", err)
@@ -244,6 +251,9 @@ func (h *stdHandler) initPlugins(ctx context.Context, mgr PluginManager, cfg *Pl
if h.signer, err = loadPlugin(ctx, "Signer", cfg.Signer, mgr.Signer); err != nil { if h.signer, err = loadPlugin(ctx, "Signer", cfg.Signer, mgr.Signer); err != nil {
return err return err
} }
if h.transportWrapper, err = loadPlugin(ctx, "TransportWrapper", cfg.TransportWrapper, mgr.TransportWrapper); err != nil {
return err
}
log.Debugf(ctx, "All required plugins successfully loaded for stdHandler") log.Debugf(ctx, "All required plugins successfully loaded for stdHandler")
return nil return nil

View File

@@ -74,7 +74,7 @@ func TestNewHTTPClient(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
client := newHTTPClient(&tt.config) client := newHTTPClient(&tt.config, nil)
if client == nil { if client == nil {
t.Fatal("newHTTPClient returned nil") t.Fatal("newHTTPClient returned nil")
@@ -107,7 +107,7 @@ func TestNewHTTPClient(t *testing.T) {
func TestHttpClientConfigDefaults(t *testing.T) { func TestHttpClientConfigDefaults(t *testing.T) {
// Test that zero config values don't override defaults // Test that zero config values don't override defaults
config := &HttpClientConfig{} config := &HttpClientConfig{}
client := newHTTPClient(config) client := newHTTPClient(config, nil)
transport := client.Transport.(*http.Transport) transport := client.Transport.(*http.Transport)
@@ -131,7 +131,7 @@ func TestHttpClientConfigPerformanceValues(t *testing.T) {
ResponseHeaderTimeout: 5 * time.Second, ResponseHeaderTimeout: 5 * time.Second,
} }
client := newHTTPClient(config) client := newHTTPClient(config, nil)
transport := client.Transport.(*http.Transport) transport := client.Transport.(*http.Transport)
// Verify performance-optimized values // Verify performance-optimized values

View File

@@ -69,6 +69,11 @@ func (m *mockPluginManager) KeyManager(ctx context.Context, cache definition.Cac
return nil, nil return nil, nil
} }
// TransportWrapper returns a mock transport wrapper implementation.
func (m *mockPluginManager) TransportWrapper(ctx context.Context, cfg *plugin.Config) (definition.TransportWrapper, error) {
return nil, nil
}
// SchemaValidator returns a mock schema validator implementation. // SchemaValidator returns a mock schema validator implementation.
func (m *mockPluginManager) SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error) { func (m *mockPluginManager) SchemaValidator(ctx context.Context, cfg *plugin.Config) (definition.SchemaValidator, error) {
return nil, nil return nil, nil

View File

@@ -196,6 +196,28 @@ func (m *Manager) Middleware(ctx context.Context, cfg *Config) (func(http.Handle
return mwp.New(ctx, cfg.Config) return mwp.New(ctx, cfg.Config)
} }
// TransportWrapper returns a TransportWrapper instance based on the provided configuration.
func (m *Manager) TransportWrapper(ctx context.Context, cfg *Config) (definition.TransportWrapper, error) {
twp, err := provider[definition.TransportWrapperProvider](m.plugins, cfg.ID)
if err != nil {
return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err)
}
config := make(map[string]any, len(cfg.Config))
for k, v := range cfg.Config {
config[k] = v
}
wrapper, closer, err := twp.New(ctx, config)
if err != nil {
return nil, err
}
if closer != nil {
m.closers = append(m.closers, closer)
}
return wrapper, nil
}
// Step returns a Step instance based on the provided configuration. // Step returns a Step instance based on the provided configuration.
func (m *Manager) Step(ctx context.Context, cfg *Config) (definition.Step, error) { func (m *Manager) Step(ctx context.Context, cfg *Config) (definition.Step, error) {
sp, err := provider[definition.StepProvider](m.plugins, cfg.ID) sp, err := provider[definition.StepProvider](m.plugins, cfg.ID)