Update on the review comments
This commit is contained in:
@@ -21,22 +21,22 @@ type Config struct {
|
||||
RetryWaitMax time.Duration
|
||||
}
|
||||
|
||||
// RegisteryClient encapsulates the logic for calling the subscribe and lookup endpoints.
|
||||
type RegisteryClient struct {
|
||||
Config *Config
|
||||
Client *retryablehttp.Client
|
||||
// registryClient encapsulates the logic for calling the subscribe and lookup endpoints.
|
||||
type registryClient struct {
|
||||
config *Config
|
||||
client *retryablehttp.Client
|
||||
}
|
||||
|
||||
// NewRegisteryClient creates a new instance of Client.
|
||||
func NewRegisteryClient(config *Config) *RegisteryClient {
|
||||
func NewRegisteryClient(config *Config) *registryClient {
|
||||
retryClient := retryablehttp.NewClient()
|
||||
|
||||
return &RegisteryClient{Config: config, Client: retryClient}
|
||||
return ®istryClient{config: config, client: retryClient}
|
||||
}
|
||||
|
||||
// Subscribe calls the /subscribe endpoint with retry.
|
||||
func (c *RegisteryClient) Subscribe(ctx context.Context, subscription *model.Subscription) error {
|
||||
subscribeURL := fmt.Sprintf("%s/subscribe", c.Config.RegisteryURL)
|
||||
func (c *registryClient) Subscribe(ctx context.Context, subscription *model.Subscription) error {
|
||||
subscribeURL := fmt.Sprintf("%s/subscribe", c.config.RegisteryURL)
|
||||
|
||||
jsonData, err := json.Marshal(subscription)
|
||||
if err != nil {
|
||||
@@ -49,7 +49,7 @@ func (c *RegisteryClient) Subscribe(ctx context.Context, subscription *model.Sub
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := c.Client.Do(req)
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send request with retry: %w", err)
|
||||
}
|
||||
@@ -62,8 +62,8 @@ func (c *RegisteryClient) Subscribe(ctx context.Context, subscription *model.Sub
|
||||
}
|
||||
|
||||
// Lookup calls the /lookup endpoint with retry and returns a slice of Subscription.
|
||||
func (c *RegisteryClient) Lookup(ctx context.Context, subscription *model.Subscription) ([]model.Subscription, error) {
|
||||
lookupURL := fmt.Sprintf("%s/lookUp", c.Config.RegisteryURL)
|
||||
func (c *registryClient) Lookup(ctx context.Context, subscription *model.Subscription) ([]model.Subscription, error) {
|
||||
lookupURL := fmt.Sprintf("%s/lookUp", c.config.RegisteryURL)
|
||||
|
||||
jsonData, err := json.Marshal(subscription)
|
||||
if err != nil {
|
||||
@@ -76,7 +76,7 @@ func (c *RegisteryClient) Lookup(ctx context.Context, subscription *model.Subscr
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := c.Client.Do(req)
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to send request with retry: %w", err)
|
||||
}
|
||||
|
||||
@@ -46,42 +46,37 @@ func TestSubscribeSuccess(t *testing.T) {
|
||||
ValidUntil: time.Now().Add(24 * time.Hour),
|
||||
Status: "SUBSCRIBED",
|
||||
}
|
||||
|
||||
err := client.Subscribe(context.Background(), subscription)
|
||||
require.NoError(t, err)
|
||||
if err != nil {
|
||||
t.Fatalf("Subscribe() failed with error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSubscribeFailureWithMock tests different failure scenarios using a mock client.
|
||||
func TestSubscribeFailureWithMock(t *testing.T) {
|
||||
// TestSubscribeFailure tests different failure scenarios using a mock client.
|
||||
func TestSubscribeFailure(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
mockError error
|
||||
expectError bool
|
||||
name string
|
||||
mockError error
|
||||
}{
|
||||
{
|
||||
name: "Failed subscription - Internal Server Error",
|
||||
mockError: errors.New("internal server error"),
|
||||
expectError: true,
|
||||
name: "Failed subscription - Internal Server Error",
|
||||
mockError: errors.New("internal server error"),
|
||||
},
|
||||
{
|
||||
name: "Failed subscription - Bad Request",
|
||||
mockError: errors.New("bad request"),
|
||||
expectError: true,
|
||||
name: "Failed subscription - Bad Request",
|
||||
mockError: errors.New("bad request"),
|
||||
},
|
||||
{
|
||||
name: "Request Timeout",
|
||||
mockError: context.DeadlineExceeded,
|
||||
expectError: true,
|
||||
name: "Request Timeout",
|
||||
mockError: context.DeadlineExceeded,
|
||||
},
|
||||
{
|
||||
name: "Network Failure",
|
||||
mockError: errors.New("network failure"),
|
||||
expectError: true,
|
||||
name: "Network Failure",
|
||||
mockError: errors.New("network failure"),
|
||||
},
|
||||
{
|
||||
name: "JSON Marshalling Failure",
|
||||
mockError: errors.New("json marshalling failure"),
|
||||
expectError: true,
|
||||
name: "JSON Marshalling Failure",
|
||||
mockError: errors.New("json marshalling failure"),
|
||||
},
|
||||
}
|
||||
|
||||
@@ -103,67 +98,21 @@ func TestSubscribeFailureWithMock(t *testing.T) {
|
||||
}
|
||||
|
||||
if tt.name == "JSON Marshalling Failure" {
|
||||
invalidSubscription := &model.Subscription{}
|
||||
invalidSubscription.ValidFrom = time.Unix(0, 0) // Invalid zero timestamp
|
||||
subscription = invalidSubscription
|
||||
subscription = &model.Subscription{} // Example of an invalid object
|
||||
}
|
||||
|
||||
err := mockClient.Subscribe(context.Background(), subscription)
|
||||
if tt.expectError {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.Error(t, err) // Directly checking for an error since all cases should fail
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestLookupSuccess tests successful lookup scenarios.
|
||||
func TestLookupSuccess(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
responseBody interface{}
|
||||
responseCode int
|
||||
}{
|
||||
{
|
||||
name: "Successful lookup",
|
||||
responseBody: []model.Subscription{
|
||||
{
|
||||
Subscriber: model.Subscriber{
|
||||
SubscriberID: "123",
|
||||
},
|
||||
KeyID: "test-key",
|
||||
SigningPublicKey: "test-signing-key",
|
||||
EncrPublicKey: "test-encryption-key",
|
||||
ValidFrom: time.Now(),
|
||||
ValidUntil: time.Now().Add(24 * time.Hour),
|
||||
Status: "SUBSCRIBED",
|
||||
},
|
||||
},
|
||||
responseCode: http.StatusOK,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(tc.responseCode)
|
||||
if tc.responseBody != nil {
|
||||
bodyBytes, _ := json.Marshal(tc.responseBody)
|
||||
w.Write(bodyBytes)
|
||||
}
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
config := &Config{
|
||||
RegisteryURL: server.URL,
|
||||
RetryMax: 1,
|
||||
RetryWaitMin: 1 * time.Millisecond,
|
||||
RetryWaitMax: 2 * time.Millisecond,
|
||||
}
|
||||
rClient := NewRegisteryClient(config)
|
||||
ctx := context.Background()
|
||||
subscription := &model.Subscription{
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
response := []model.Subscription{
|
||||
{
|
||||
Subscriber: model.Subscriber{
|
||||
SubscriberID: "123",
|
||||
},
|
||||
@@ -173,14 +122,37 @@ func TestLookupSuccess(t *testing.T) {
|
||||
ValidFrom: time.Now(),
|
||||
ValidUntil: time.Now().Add(24 * time.Hour),
|
||||
Status: "SUBSCRIBED",
|
||||
}
|
||||
},
|
||||
}
|
||||
bodyBytes, _ := json.Marshal(response)
|
||||
w.Write(bodyBytes)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
result, err := rClient.Lookup(ctx, subscription)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, result)
|
||||
require.Equal(t, subscription.Subscriber.SubscriberID, result[0].Subscriber.SubscriberID)
|
||||
})
|
||||
config := &Config{
|
||||
RegisteryURL: server.URL,
|
||||
RetryMax: 1,
|
||||
RetryWaitMin: 1 * time.Millisecond,
|
||||
RetryWaitMax: 2 * time.Millisecond,
|
||||
}
|
||||
rClient := NewRegisteryClient(config)
|
||||
ctx := context.Background()
|
||||
subscription := &model.Subscription{
|
||||
Subscriber: model.Subscriber{
|
||||
SubscriberID: "123",
|
||||
},
|
||||
KeyID: "test-key",
|
||||
SigningPublicKey: "test-signing-key",
|
||||
EncrPublicKey: "test-encryption-key",
|
||||
ValidFrom: time.Now(),
|
||||
ValidUntil: time.Now().Add(24 * time.Hour),
|
||||
Status: "SUBSCRIBED",
|
||||
}
|
||||
|
||||
result, err := rClient.Lookup(ctx, subscription)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, result)
|
||||
require.Equal(t, subscription.Subscriber.SubscriberID, result[0].Subscriber.SubscriberID)
|
||||
}
|
||||
|
||||
// TestLookupFailure tests failure scenarios for the Lookup function.
|
||||
|
||||
@@ -245,7 +245,7 @@ func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Conf
|
||||
case "validateSchema":
|
||||
s, err = newValidateSchemaStep(h.schemaValidator)
|
||||
case "addRoute":
|
||||
s, err = newRouteStep(h.router)
|
||||
s, err = newAddRouteStep(h.router)
|
||||
case "broadcast":
|
||||
s = &broadcastStep{}
|
||||
default:
|
||||
|
||||
@@ -41,7 +41,9 @@ func (s *signStep) Run(ctx *model.StepContext) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to sign request: %w", err)
|
||||
}
|
||||
authHeader := fmt.Sprintf("Signature keyId=\"%s|%s|ed25519\",algorithm=\"ed25519\",created=\"%d\",expires=\"%d\",headers=\"(created) (expires) digest\",signature=\"%s\"", ctx.SubID, keyID, createdAt, validTill, sign)
|
||||
|
||||
authHeader := s.generateAuthHeader(ctx.SubID, keyID, createdAt, validTill, sign)
|
||||
|
||||
header := model.AuthHeaderSubscriber
|
||||
if ctx.Role == model.RoleGateway {
|
||||
header = model.AuthHeaderGateway
|
||||
@@ -50,6 +52,15 @@ func (s *signStep) Run(ctx *model.StepContext) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// generateAuthHeader constructs the authorization header for the signed request.
|
||||
// It includes key ID, algorithm, creation time, expiration time, required headers, and signature.
|
||||
func (s *signStep) generateAuthHeader(subID, keyID string, createdAt, validTill int64, signature string) string {
|
||||
return fmt.Sprintf(
|
||||
"Signature keyId=\"%s|%s|ed25519\",algorithm=\"ed25519\",created=\"%d\",expires=\"%d\",headers=\"(created) (expires) digest\",signature=\"%s\"",
|
||||
subID, keyID, createdAt, validTill, signature,
|
||||
)
|
||||
}
|
||||
|
||||
// validateSignStep represents the signature validation step.
|
||||
type validateSignStep struct {
|
||||
validator definition.SignValidator
|
||||
@@ -135,8 +146,8 @@ type addRouteStep struct {
|
||||
router definition.Router
|
||||
}
|
||||
|
||||
// newRouteStep creates and returns the addRoute step after validation.
|
||||
func newRouteStep(router definition.Router) (definition.Step, error) {
|
||||
// newAddRouteStep creates and returns the addRoute step after validation.
|
||||
func newAddRouteStep(router definition.Router) (definition.Step, error) {
|
||||
if router == nil {
|
||||
return nil, fmt.Errorf("invalid config: Router plugin not configured")
|
||||
}
|
||||
@@ -149,13 +160,11 @@ func (s *addRouteStep) Run(ctx *model.StepContext) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to determine route: %w", err)
|
||||
}
|
||||
log.Debugf(ctx, "Routing to %#v", route)
|
||||
ctx.Route = &model.Route{
|
||||
TargetType: route.TargetType,
|
||||
PublisherID: route.PublisherID,
|
||||
URL: route.URL,
|
||||
}
|
||||
log.Debugf(ctx, "ctx.Route to %#v", ctx.Route)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user