diff --git a/config/local-simple.yaml b/config/local-simple.yaml index 466ce78..72ed267 100644 --- a/config/local-simple.yaml +++ b/config/local-simple.yaml @@ -22,8 +22,14 @@ modules: handler: type: std role: bap - registryUrl: http://registry:3030/subscribers plugins: + registry: + id: registry + config: + url: http://registry:3030/subscribers + retry_max: "3" + retry_wait_min: "100ms" + retry_wait_max: "500ms" keyManager: id: simplekeymanager config: @@ -61,8 +67,14 @@ modules: handler: type: std role: bap - registryUrl: http://registry:3030/subscribers plugins: + registry: + id: registry + config: + url: http://registry:3030/subscribers + retry_max: "3" + retry_wait_min: "100ms" + retry_wait_max: "500ms" keyManager: id: simplekeymanager config: @@ -96,8 +108,14 @@ modules: handler: type: std role: bpp - registryUrl: http://registry:3030/subscribers plugins: + registry: + id: registry + config: + url: http://registry:3030/subscribers + retry_max: "3" + retry_wait_min: "100ms" + retry_wait_max: "500ms" keyManager: id: simplekeymanager config: @@ -129,8 +147,14 @@ modules: handler: type: std role: bpp - registryUrl: http://registry:3030/subscribers plugins: + registry: + id: registry + config: + url: http://registry:3030/subscribers + retry_max: "3" + retry_wait_min: "100ms" + retry_wait_max: "500ms" keyManager: id: simplekeymanager config: diff --git a/config/onix-bap/adapter.yaml b/config/onix-bap/adapter.yaml index dda19fc..aee66c7 100644 --- a/config/onix-bap/adapter.yaml +++ b/config/onix-bap/adapter.yaml @@ -23,8 +23,14 @@ modules: handler: type: std role: bap - registryUrl: http://localhost:8080/reg plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: "3" + retry_wait_min: "100ms" + retry_wait_max: "500ms" keyManager: id: secretskeymanager config: @@ -61,9 +67,15 @@ modules: path: /bap/caller/ handler: type: std - registryUrl: http://localhost:8080/reg role: bap plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: "3" + retry_wait_min: "100ms" + retry_wait_max: "500ms" keyManager: id: secretskeymanager config: diff --git a/config/onix-bpp/adapter.yaml b/config/onix-bpp/adapter.yaml index aa3d242..a1be1d0 100644 --- a/config/onix-bpp/adapter.yaml +++ b/config/onix-bpp/adapter.yaml @@ -24,8 +24,14 @@ modules: type: std role: bpp subscriberId: bpp1 - registryUrl: http://localhost:8080/reg plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: "3" + retry_wait_min: "100ms" + retry_wait_max: "500ms" keyManager: id: secretskeymanager config: @@ -63,8 +69,14 @@ modules: handler: type: std role: bpp - registryUrl: http://localhost:8080/reg plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: "3" + retry_wait_min: "100ms" + retry_wait_max: "500ms" keyManager: id: secretskeymanager config: diff --git a/config/onix/adapter.yaml b/config/onix/adapter.yaml index e3a785b..2e931ab 100644 --- a/config/onix/adapter.yaml +++ b/config/onix/adapter.yaml @@ -23,8 +23,14 @@ modules: handler: type: std role: bap - registryUrl: http://localhost:8080/reg plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: "3" + retry_wait_min: "100ms" + retry_wait_max: "500ms" keyManager: id: secretskeymanager config: @@ -61,9 +67,15 @@ modules: path: /bap/caller/ handler: type: std - registryUrl: http://localhost:8080/reg role: bap plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: "3" + retry_wait_min: "100ms" + retry_wait_max: "500ms" keyManager: id: secretskeymanager config: @@ -102,8 +114,14 @@ modules: type: std role: bpp subscriberId: bpp1 - registryUrl: http://localhost:8080/reg plugins: + registry: + id: registry + config: + url: http://localhost:8080/reg + retry_max: "3" + retry_wait_min: "100ms" + retry_wait_max: "500ms" keyManager: id: secretskeymanager config: diff --git a/pkg/plugin/implementation/registry/cmd/plugin_test.go b/pkg/plugin/implementation/registry/cmd/plugin_test.go new file mode 100644 index 0000000..d5c638c --- /dev/null +++ b/pkg/plugin/implementation/registry/cmd/plugin_test.go @@ -0,0 +1,266 @@ +package main + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/beckn-one/beckn-onix/pkg/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRegistryProvider_New(t *testing.T) { + tests := []struct { + name string + config map[string]string + expectError bool + errorMsg string + }{ + { + name: "valid config with all parameters", + config: map[string]string{ + "url": "http://localhost:8080", + "retry_max": "3", + "retry_wait_min": "100ms", + "retry_wait_max": "500ms", + }, + expectError: false, + }, + { + name: "minimal valid config", + config: map[string]string{ + "url": "http://localhost:8080", + }, + expectError: false, + }, + { + name: "missing URL", + config: map[string]string{}, + expectError: true, + errorMsg: "registry URL cannot be empty", + }, + { + name: "invalid retry_max", + config: map[string]string{ + "url": "http://localhost:8080", + "retry_max": "invalid", + }, + expectError: false, // Invalid values are ignored, not errors + }, + { + name: "invalid retry_wait_min", + config: map[string]string{ + "url": "http://localhost:8080", + "retry_wait_min": "invalid", + }, + expectError: false, // Invalid values are ignored, not errors + }, + { + name: "invalid retry_wait_max", + config: map[string]string{ + "url": "http://localhost:8080", + "retry_wait_max": "invalid", + }, + expectError: false, // Invalid values are ignored, not errors + }, + { + name: "empty URL", + config: map[string]string{ + "url": "", + }, + expectError: true, + errorMsg: "registry URL cannot be empty", + }, + } + + provider := registryProvider{} + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + registry, closer, err := provider.New(ctx, tt.config) + + if tt.expectError { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errorMsg) + assert.Nil(t, registry) + assert.Nil(t, closer) + } else { + require.NoError(t, err) + assert.NotNil(t, registry) + assert.NotNil(t, closer) + + // Test that closer works + err = closer() + assert.NoError(t, err) + } + }) + } +} + +func TestRegistryProvider_NilContext(t *testing.T) { + provider := registryProvider{} + config := map[string]string{ + "url": "http://localhost:8080", + } + + registry, closer, err := provider.New(context.TODO(), config) + require.Error(t, err) + assert.Contains(t, err.Error(), "context cannot be nil") + assert.Nil(t, registry) + assert.Nil(t, closer) +} + +func TestRegistryProvider_IntegrationTest(t *testing.T) { + // Create a test server + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/subscribe": + w.WriteHeader(http.StatusOK) + w.Write([]byte("{}")) + case "/lookup": + w.WriteHeader(http.StatusOK) + w.Write([]byte("[]")) + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + provider := registryProvider{} + config := map[string]string{ + "url": server.URL, + "retry_max": "2", + "retry_wait_min": "10ms", + "retry_wait_max": "20ms", + } + + ctx := context.Background() + registry, closer, err := provider.New(ctx, config) + require.NoError(t, err) + require.NotNil(t, registry) + require.NotNil(t, closer) + defer closer() + + // Test Subscribe + subscription := &model.Subscription{ + Subscriber: model.Subscriber{ + SubscriberID: "test-subscriber", + URL: "https://example.com", + Type: "BAP", + Domain: "mobility", + }, + KeyID: "test-key", + SigningPublicKey: "test-signing-key", + EncrPublicKey: "test-encryption-key", + ValidFrom: time.Now(), + ValidUntil: time.Now().Add(24 * time.Hour), + Status: "SUBSCRIBED", + } + + err = registry.Subscribe(ctx, subscription) + require.NoError(t, err) + + // Test Lookup + results, err := registry.Lookup(ctx, subscription) + require.NoError(t, err) + assert.NotNil(t, results) + assert.Len(t, results, 0) // Empty array response from test server +} + +func TestRegistryProvider_ConfigurationParsing(t *testing.T) { + tests := []struct { + name string + config map[string]string + expectedConfig map[string]interface{} + }{ + { + name: "all parameters set", + config: map[string]string{ + "url": "http://localhost:8080", + "retry_max": "5", + "retry_wait_min": "200ms", + "retry_wait_max": "1s", + }, + expectedConfig: map[string]interface{}{ + "url": "http://localhost:8080", + "retry_max": 5, + "retry_wait_min": 200 * time.Millisecond, + "retry_wait_max": 1 * time.Second, + }, + }, + { + name: "only required parameters", + config: map[string]string{ + "url": "https://registry.example.com", + }, + expectedConfig: map[string]interface{}{ + "url": "https://registry.example.com", + }, + }, + { + name: "invalid numeric values ignored", + config: map[string]string{ + "url": "http://localhost:8080", + "retry_max": "not-a-number", + }, + expectedConfig: map[string]interface{}{ + "url": "http://localhost:8080", + }, + }, + { + name: "invalid duration values ignored", + config: map[string]string{ + "url": "http://localhost:8080", + "retry_wait_min": "not-a-duration", + "retry_wait_max": "also-not-a-duration", + }, + expectedConfig: map[string]interface{}{ + "url": "http://localhost:8080", + }, + }, + } + + // Create a test server that just returns OK + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("{}")) + })) + defer server.Close() + + provider := registryProvider{} + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Override URL with test server URL for testing + testConfig := make(map[string]string) + for k, v := range tt.config { + testConfig[k] = v + } + testConfig["url"] = server.URL + + ctx := context.Background() + registry, closer, err := provider.New(ctx, testConfig) + require.NoError(t, err) + require.NotNil(t, registry) + require.NotNil(t, closer) + defer closer() + + // The registry should work regardless of invalid config values + subscription := &model.Subscription{ + KeyID: "test-key", + SigningPublicKey: "test-signing-key", + EncrPublicKey: "test-encryption-key", + ValidFrom: time.Now(), + ValidUntil: time.Now().Add(24 * time.Hour), + Status: "SUBSCRIBED", + } + + err = registry.Subscribe(ctx, subscription) + assert.NoError(t, err) + }) + } +} diff --git a/pkg/plugin/implementation/registry/registry_test.go b/pkg/plugin/implementation/registry/registry_test.go new file mode 100644 index 0000000..18f9f96 --- /dev/null +++ b/pkg/plugin/implementation/registry/registry_test.go @@ -0,0 +1,498 @@ +package registry + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/beckn-one/beckn-onix/pkg/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestNew tests the New function for creating RegistryClient instances +func TestNew(t *testing.T) { + tests := []struct { + name string + config *Config + expectError bool + errorMsg string + }{ + { + name: "valid config", + config: &Config{ + URL: "http://localhost:8080", + RetryMax: 3, + RetryWaitMin: time.Millisecond * 100, + RetryWaitMax: time.Millisecond * 500, + }, + expectError: false, + }, + { + name: "nil config", + config: nil, + expectError: true, + errorMsg: "registry config cannot be nil", + }, + { + name: "empty URL", + config: &Config{ + URL: "", + }, + expectError: true, + errorMsg: "registry URL cannot be empty", + }, + { + name: "minimal valid config", + config: &Config{ + URL: "http://example.com", + }, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + client, closer, err := New(ctx, tt.config) + + if tt.expectError { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errorMsg) + assert.Nil(t, client) + assert.Nil(t, closer) + } else { + require.NoError(t, err) + assert.NotNil(t, client) + assert.NotNil(t, closer) + + // Test that closer works without error + err = closer() + assert.NoError(t, err) + + // Verify config is set correctly + assert.Equal(t, tt.config.URL, client.config.URL) + } + }) + } +} + +// TestSubscribeSuccess verifies that the Subscribe function succeeds when the server responds with HTTP 200. +func TestSubscribeSuccess(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Verify request method and headers + assert.Equal(t, "POST", r.Method) + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + assert.Contains(t, r.URL.Path, "/subscribe") + + w.WriteHeader(http.StatusOK) + if _, err := w.Write([]byte("{}")); err != nil { + t.Errorf("failed to write response: %v", err) + } + })) + defer server.Close() + + config := &Config{ + URL: server.URL, + RetryMax: 3, + RetryWaitMin: time.Millisecond * 100, + RetryWaitMax: time.Millisecond * 500, + } + + ctx := context.Background() + client, closer, err := New(ctx, config) + require.NoError(t, err) + defer closer() + + subscription := &model.Subscription{ + KeyID: "test-key", + SigningPublicKey: "test-signing-key", + EncrPublicKey: "test-encryption-key", + ValidFrom: time.Now(), + ValidUntil: time.Now().Add(24 * time.Hour), + Status: "SUBSCRIBED", + } + + err = client.Subscribe(context.Background(), subscription) + require.NoError(t, err) +} + +// TestSubscribeFailure tests different failure scenarios for Subscribe. +func TestSubscribeFailure(t *testing.T) { + tests := []struct { + name string + responseCode int + responseBody string + expectError bool + errorContains string + setupServer func() *httptest.Server + config *Config + }{ + { + name: "Internal Server Error", + responseCode: http.StatusInternalServerError, + responseBody: "Internal Server Error", + expectError: true, + errorContains: "subscribe request failed with status", + }, + { + name: "Bad Request", + responseCode: http.StatusBadRequest, + responseBody: "Bad Request", + expectError: true, + errorContains: "subscribe request failed with status", + }, + { + name: "Not Found", + responseCode: http.StatusNotFound, + responseBody: "Not Found", + expectError: true, + errorContains: "subscribe request failed with status", + }, + { + name: "Connection Refused", + setupServer: func() *httptest.Server { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + server.Close() // Close immediately to simulate connection refused + return server + }, + expectError: true, + errorContains: "failed to send subscribe request with retry", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var server *httptest.Server + + if tt.setupServer != nil { + server = tt.setupServer() + } else { + server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(tt.responseCode) + if _, err := w.Write([]byte(tt.responseBody)); err != nil { + t.Errorf("failed to write response: %v", err) + } + })) + defer server.Close() + } + + config := &Config{ + URL: server.URL, + RetryMax: 1, + RetryWaitMin: 1 * time.Millisecond, + RetryWaitMax: 2 * time.Millisecond, + } + + ctx := context.Background() + client, closer, err := New(ctx, config) + require.NoError(t, err) + defer closer() + + subscription := &model.Subscription{ + KeyID: "test-key", + SigningPublicKey: "test-signing-key", + EncrPublicKey: "test-encryption-key", + ValidFrom: time.Now(), + ValidUntil: time.Now().Add(24 * time.Hour), + Status: "SUBSCRIBED", + } + + err = client.Subscribe(context.Background(), subscription) + if tt.expectError { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errorContains) + } else { + require.NoError(t, err) + } + }) + } +} + +// TestLookupSuccess tests successful lookup scenarios. +func TestLookupSuccess(t *testing.T) { + expectedResponse := []model.Subscription{ + { + Subscriber: model.Subscriber{ + SubscriberID: "123", + URL: "https://example.com", + Type: "BAP", + Domain: "mobility", + }, + KeyID: "test-key", + SigningPublicKey: "test-signing-key", + EncrPublicKey: "test-encryption-key", + ValidFrom: time.Now(), + ValidUntil: time.Now().Add(24 * time.Hour), + Status: "SUBSCRIBED", + }, + { + Subscriber: model.Subscriber{ + SubscriberID: "456", + URL: "https://example2.com", + Type: "BPP", + Domain: "retail", + }, + KeyID: "test-key-2", + SigningPublicKey: "test-signing-key-2", + EncrPublicKey: "test-encryption-key-2", + ValidFrom: time.Now(), + ValidUntil: time.Now().Add(48 * time.Hour), + Status: "SUBSCRIBED", + }, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Verify request method and headers + assert.Equal(t, "POST", r.Method) + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + assert.Contains(t, r.URL.Path, "/lookup") + + w.WriteHeader(http.StatusOK) + bodyBytes, _ := json.Marshal(expectedResponse) + if _, err := w.Write(bodyBytes); err != nil { + t.Errorf("failed to write response: %v", err) + } + })) + defer server.Close() + + config := &Config{ + URL: server.URL, + RetryMax: 1, + RetryWaitMin: 1 * time.Millisecond, + RetryWaitMax: 2 * time.Millisecond, + } + + ctx := context.Background() + client, closer, err := New(ctx, config) + require.NoError(t, err) + defer closer() + + subscription := &model.Subscription{ + Subscriber: model.Subscriber{ + SubscriberID: "123", + }, + KeyID: "test-key", + SigningPublicKey: "test-signing-key", + EncrPublicKey: "test-encryption-key", + ValidFrom: time.Now(), + ValidUntil: time.Now().Add(24 * time.Hour), + Status: "SUBSCRIBED", + } + + result, err := client.Lookup(ctx, subscription) + require.NoError(t, err) + require.NotEmpty(t, result) + assert.Len(t, result, 2) + assert.Equal(t, expectedResponse[0].Subscriber.SubscriberID, result[0].Subscriber.SubscriberID) + assert.Equal(t, expectedResponse[1].Subscriber.SubscriberID, result[1].Subscriber.SubscriberID) +} + +// TestLookupFailure tests failure scenarios for the Lookup function. +func TestLookupFailure(t *testing.T) { + tests := []struct { + name string + responseBody interface{} + responseCode int + setupServer func() *httptest.Server + expectError bool + errorContains string + }{ + { + name: "Non-200 status code", + responseBody: "Internal Server Error", + responseCode: http.StatusInternalServerError, + expectError: true, + errorContains: "lookup request failed with status", + }, + { + name: "Invalid JSON response", + responseBody: "Invalid JSON", + responseCode: http.StatusOK, + expectError: true, + errorContains: "failed to unmarshal response body", + }, + { + name: "Connection error", + setupServer: func() *httptest.Server { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + server.Close() // Close immediately to simulate connection error + return server + }, + expectError: true, + errorContains: "failed to send lookup request with retry", + }, + { + name: "Empty response body with 200 status", + responseBody: "", + responseCode: http.StatusOK, + expectError: true, + errorContains: "failed to unmarshal response body", + }, + { + name: "Valid empty array response", + responseBody: []model.Subscription{}, + responseCode: http.StatusOK, + expectError: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var server *httptest.Server + + if tc.setupServer != nil { + server = tc.setupServer() + } else { + server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if tc.responseCode != 0 { + w.WriteHeader(tc.responseCode) + } + if tc.responseBody != nil { + if str, ok := tc.responseBody.(string); ok { + if _, err := w.Write([]byte(str)); err != nil { + t.Errorf("failed to write response: %v", err) + } + } else { + bodyBytes, _ := json.Marshal(tc.responseBody) + if _, err := w.Write(bodyBytes); err != nil { + t.Errorf("failed to write response: %v", err) + } + } + } + })) + defer server.Close() + } + + config := &Config{ + URL: server.URL, + RetryMax: 0, + RetryWaitMin: 1 * time.Millisecond, + RetryWaitMax: 2 * time.Millisecond, + } + + ctx := context.Background() + client, closer, err := New(ctx, config) + require.NoError(t, err) + defer closer() + + subscription := &model.Subscription{ + Subscriber: model.Subscriber{}, + KeyID: "test-key", + SigningPublicKey: "test-signing-key", + EncrPublicKey: "test-encryption-key", + ValidFrom: time.Now(), + ValidUntil: time.Now().Add(24 * time.Hour), + Status: "SUBSCRIBED", + } + + result, err := client.Lookup(ctx, subscription) + if tc.expectError { + require.Error(t, err) + if tc.errorContains != "" { + assert.Contains(t, err.Error(), tc.errorContains) + } + assert.Empty(t, result) + } else { + require.NoError(t, err) + assert.NotNil(t, result) + } + }) + } +} + +// TestContextCancellation tests that operations respect context cancellation +func TestContextCancellation(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Simulate a slow server + time.Sleep(100 * time.Millisecond) + w.WriteHeader(http.StatusOK) + w.Write([]byte("{}")) + })) + defer server.Close() + + config := &Config{ + URL: server.URL, + RetryMax: 0, + RetryWaitMin: 1 * time.Millisecond, + RetryWaitMax: 2 * time.Millisecond, + } + + ctx := context.Background() + client, closer, err := New(ctx, config) + require.NoError(t, err) + defer closer() + + subscription := &model.Subscription{ + KeyID: "test-key", + SigningPublicKey: "test-signing-key", + EncrPublicKey: "test-encryption-key", + ValidFrom: time.Now(), + ValidUntil: time.Now().Add(24 * time.Hour), + Status: "SUBSCRIBED", + } + + t.Run("Subscribe with cancelled context", func(t *testing.T) { + cancelledCtx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + err := client.Subscribe(cancelledCtx, subscription) + require.Error(t, err) + assert.Contains(t, err.Error(), "context canceled") + }) + + t.Run("Lookup with cancelled context", func(t *testing.T) { + cancelledCtx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + result, err := client.Lookup(cancelledCtx, subscription) + require.Error(t, err) + assert.Contains(t, err.Error(), "context canceled") + assert.Empty(t, result) + }) +} + +// TestRetryConfiguration tests that retry configuration is properly applied +func TestRetryConfiguration(t *testing.T) { + attempts := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + attempts++ + if attempts < 3 { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("Server Error")) + } else { + w.WriteHeader(http.StatusOK) + w.Write([]byte("{}")) + } + })) + defer server.Close() + + config := &Config{ + URL: server.URL, + RetryMax: 3, + RetryWaitMin: 1 * time.Millisecond, + RetryWaitMax: 2 * time.Millisecond, + } + + ctx := context.Background() + client, closer, err := New(ctx, config) + require.NoError(t, err) + defer closer() + + subscription := &model.Subscription{ + KeyID: "test-key", + SigningPublicKey: "test-signing-key", + EncrPublicKey: "test-encryption-key", + ValidFrom: time.Now(), + ValidUntil: time.Now().Add(24 * time.Hour), + Status: "SUBSCRIBED", + } + + // This should succeed after retries + err = client.Subscribe(context.Background(), subscription) + require.NoError(t, err) + assert.GreaterOrEqual(t, attempts, 3) +}