Issue - 523 feat: Add configurable HTTP connection pooling for the adapter service

This commit is contained in:
ameersohel45
2025-09-25 16:37:13 +05:30
parent ff6b907dee
commit fc65e29590
3 changed files with 79 additions and 11 deletions

View File

@@ -28,6 +28,29 @@ type stdHandler struct {
publisher definition.Publisher
SubscriberID string
role model.Role
httpClient *http.Client
}
// newHTTPClient creates a new HTTP client with a custom transport configuration.
func newHTTPClient(cfg *HttpClientConfig) *http.Client {
// Clone the default transport to inherit its sensible defaults.
transport := http.DefaultTransport.(*http.Transport).Clone()
// Only override the defaults if a value is explicitly provided in the config.
// A zero value in the config means we stick with the default values.
if cfg.MaxIdleConns > 0 {
transport.MaxIdleConns = cfg.MaxIdleConns
}
if cfg.MaxIdleConnsPerHost > 0 {
transport.MaxIdleConnsPerHost = cfg.MaxIdleConnsPerHost
}
if cfg.IdleConnTimeout > 0 {
transport.IdleConnTimeout = cfg.IdleConnTimeout
}
if cfg.ResponseHeaderTimeout > 0 {
transport.ResponseHeaderTimeout = cfg.ResponseHeaderTimeout
}
return &http.Client{Transport: transport}
}
// NewStdHandler initializes a new processor with plugins and steps.
@@ -36,6 +59,7 @@ func NewStdHandler(ctx context.Context, mgr PluginManager, cfg *Config) (http.Ha
steps: []definition.Step{},
SubscriberID: cfg.SubscriberID,
role: cfg.Role,
httpClient: newHTTPClient(&cfg.HttpClientConfig),
}
// Initialize plugins.
if err := h.initPlugins(ctx, mgr, &cfg.Plugins); err != nil {
@@ -74,7 +98,7 @@ func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
// Handle routing based on the defined route type.
route(ctx, r, w, h.publisher)
route(ctx, r, w, h.publisher, h.httpClient)
}
// stepCtx creates a new StepContext for processing an HTTP request.
@@ -107,12 +131,12 @@ func (h *stdHandler) subID(ctx context.Context) string {
var proxyFunc = proxy
// route handles request forwarding or message publishing based on the routing type.
func route(ctx *model.StepContext, r *http.Request, w http.ResponseWriter, pb definition.Publisher) {
func route(ctx *model.StepContext, r *http.Request, w http.ResponseWriter, pb definition.Publisher, httpClient *http.Client) {
log.Debugf(ctx, "Routing to ctx.Route to %#v", ctx.Route)
switch ctx.Route.TargetType {
case "url":
log.Infof(ctx.Context, "Forwarding request to URL: %s", ctx.Route.URL)
proxyFunc(ctx, r, w)
proxyFunc(ctx, r, w, httpClient)
return
case "publisher":
if pb == nil {
@@ -136,7 +160,7 @@ func route(ctx *model.StepContext, r *http.Request, w http.ResponseWriter, pb de
}
response.SendAck(w)
}
func proxy(ctx *model.StepContext, r *http.Request, w http.ResponseWriter) {
func proxy(ctx *model.StepContext, r *http.Request, w http.ResponseWriter, httpClient *http.Client) {
target := ctx.Route.URL
r.Header.Set("X-Forwarded-Host", r.Host)
@@ -147,7 +171,10 @@ func proxy(ctx *model.StepContext, r *http.Request, w http.ResponseWriter) {
log.Request(req.Context(), req, ctx.Body)
}
proxy := &httputil.ReverseProxy{Director: director}
proxy := &httputil.ReverseProxy{
Director: director,
Transport: httpClient.Transport,
}
proxy.ServeHTTP(w, r)
}