diff --git a/CONFIG.md b/CONFIG.md index 737bc6e..d5d1a4f 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -508,7 +508,7 @@ schemaValidator: #### 5. Schema2Validator Plugin -**Purpose**: Validate requests against OpenAPI 3.x specifications with action-based matching. +**Purpose**: Validate requests against OpenAPI 3.x specifications. Supports core protocol validation and optional extended validation for domain-specific objects with `@context` references. ```yaml schemaValidator: @@ -517,6 +517,11 @@ schemaValidator: type: url location: https://raw.githubusercontent.com/beckn/protocol-specifications-new/refs/heads/draft/api-specs/beckn-protocol-api.yaml cacheTTL: "3600" + extendedSchema_enabled: "true" + extendedSchema_cacheTTL: "86400" + extendedSchema_maxCacheSize: "100" + extendedSchema_downloadTimeout: "30" + extendedSchema_allowedDomains: "beckn.org,example.com" ``` **Or for local files:** @@ -528,12 +533,18 @@ schemaValidator: type: file location: ./validation-scripts/l2-config/mobility_1.1.0_openapi_3.1.yaml cacheTTL: "3600" + extendedSchema_enabled: "false" ``` **Parameters**: - `type`: Source type - `"url"` for remote specs, `"file"` for local files - `location`: URL or file path to OpenAPI 3.1 specification - `cacheTTL`: Cache TTL in seconds before reloading spec (default: `"3600"`) +- `extendedSchema_enabled`: Enable extended schema validation for `@context` objects (default: `"false"`) +- `extendedSchema_cacheTTL`: Domain schema cache TTL in seconds (default: `"86400"`) +- `extendedSchema_maxCacheSize`: Max cached schemas (default: `"100"`) +- `extendedSchema_downloadTimeout`: Schema download timeout in seconds (default: `"30"`) +- `extendedSchema_allowedDomains`: Comma-separated domain whitelist (empty = all allowed) --- diff --git a/pkg/plugin/implementation/schemav2validator/README.md b/pkg/plugin/implementation/schemav2validator/README.md index b76ad99..e34f56a 100644 --- a/pkg/plugin/implementation/schemav2validator/README.md +++ b/pkg/plugin/implementation/schemav2validator/README.md @@ -10,6 +10,7 @@ Validates Beckn protocol requests against OpenAPI 3.1 specifications using kin-o - TTL-based caching with automatic refresh - Generic path matching (no hardcoded paths) - Direct schema validation without router overhead +- Extended schema validation for domain-specific objects with `@context` references ## Configuration @@ -20,6 +21,11 @@ schemaValidator: type: url location: https://example.com/openapi-spec.yaml cacheTTL: "3600" + extendedSchema_enabled: "true" + extendedSchema_cacheTTL: "86400" + extendedSchema_maxCacheSize: "100" + extendedSchema_downloadTimeout: "30" + extendedSchema_allowedDomains: "beckn.org,example.com" ``` ### Configuration Parameters @@ -29,24 +35,55 @@ schemaValidator: | `type` | string | Yes | - | Type of spec source: "url" or "file" ("dir" reserved for future) | | `location` | string | Yes | - | URL or file path to OpenAPI 3.1 spec | | `cacheTTL` | string | No | "3600" | Cache TTL in seconds before reloading spec | +| `extendedSchema_enabled` | string | No | "false" | Enable extended schema validation for `@context` objects | +| `extendedSchema_cacheTTL` | string | No | "86400" | Domain schema cache TTL in seconds | +| `extendedSchema_maxCacheSize` | string | No | "100" | Maximum number of cached domain schemas | +| `extendedSchema_downloadTimeout` | string | No | "30" | Timeout for downloading domain schemas | +| `extendedSchema_allowedDomains` | string | No | "" | Comma-separated domain whitelist (empty = all allowed) | ## How It Works -1. **Load Spec**: Loads OpenAPI spec from configured URL at startup -2. **Extract Action**: Extracts `action` from request `context.action` field -3. **Find Schema**: Searches all paths and HTTP methods in spec for schema with matching action: - - Checks `properties.context.action.enum` for the action value - - Also checks `properties.context.allOf[].properties.action.enum` - - Stops at first match -4. **Validate**: Validates request body against matched schema using `Schema.VisitJSON()` with: +### Initialization (Load Time) + +**Core Protocol Validation Setup**: +1. **Load OpenAPI Spec**: Loads main spec from `location` (URL or file) with external `$ref` resolution +2. **Build Action Index**: Creates action→schema map for O(1) lookup by scanning all paths/methods +3. **Validate Spec**: Validates OpenAPI spec structure (warnings logged, non-fatal) +4. **Cache Spec**: Stores loaded spec with `loadedAt` timestamp + +**Extended Schema Setup** (if `extendedSchema_enabled: "true"`): +5. **Initialize Schema Cache**: Creates LRU cache with `maxCacheSize` (default: 100) +6. **Start Background Refresh**: Launches goroutine with two tickers: + - Core spec refresh every `cacheTTL` seconds (default: 3600) + - Extended schema cleanup every `extendedSchema_cacheTTL` seconds (default: 86400) + +### Request Validation (Runtime) + +**Core Protocol Validation** (always runs): +1. **Parse Request**: Unmarshal JSON and extract `context.action` +2. **Lookup Schema**: O(1) lookup in action index (built at load time) +3. **Validate**: Call `schema.Value.VisitJSON()` with: - Required fields validation - Data type validation (string, number, boolean, object, array) - Format validation (email, uri, date-time, uuid, etc.) - Constraint validation (min/max, pattern, enum, const) - Nested object and array validation -5. **Return Errors**: Returns validation errors in ONIX format +4. **Return Errors**: If validation fails, format and return errors + +**Extended Schema Validation** (if `extendedSchema_enabled: "true"` AND core validation passed): +5. **Scan for @context**: Recursively traverse `message` field for objects with `@context` and `@type` +6. **Filter Core Schemas**: Skip objects with `/schema/core/` in `@context` URL +7. **Validate Each Domain Object**: + - Check domain whitelist (if `allowedDomains` configured) + - Transform `@context` URL: `context.jsonld` → `attributes.yaml` + - Load schema from URL/file (check cache first, download if miss) + - Find schema by `@type` (direct match or `x-jsonld.@type` fallback) + - Strip `@context` and `@type` metadata from object + - Validate remaining data against domain schema + - Prefix error paths with object location (e.g., `message.order.field`) +8. **Return Errors**: Returns first validation error (fail-fast) ## Action-Based Matching @@ -120,7 +157,33 @@ schemaValidator: cacheTTL: "3600" ``` +### With Extended Schema Validation +```yaml +schemaValidator: + id: schemav2validator + config: + type: url + location: https://raw.githubusercontent.com/beckn/protocol-specifications-new/refs/heads/draft/api-specs/beckn-protocol-api.yaml + cacheTTL: "3600" + extendedSchema_enabled: "true" + extendedSchema_cacheTTL: "86400" + extendedSchema_maxCacheSize: "100" + extendedSchema_downloadTimeout: "30" + extendedSchema_allowedDomains: "raw.githubusercontent.com,schemas.beckn.org" +``` + +**At Load Time**: +- Creates LRU cache for domain schemas (max 100 entries) +- Starts background goroutine for cache cleanup every 24 hours + +**At Runtime** (after core validation passes): +- Scans `message` field for objects with `@context` and `@type` +- Skips core Beckn schemas (containing `/schema/core/`) +- Downloads domain schemas from `@context` URLs (cached for 24 hours) +- Validates domain-specific data against schemas +- Returns errors with full JSON paths (e.g., `message.order.chargingRate`) +- Fail-fast: returns on first validation error ## Dependencies diff --git a/pkg/plugin/implementation/schemav2validator/cmd/plugin.go b/pkg/plugin/implementation/schemav2validator/cmd/plugin.go index 9d147fe..f93712f 100644 --- a/pkg/plugin/implementation/schemav2validator/cmd/plugin.go +++ b/pkg/plugin/implementation/schemav2validator/cmd/plugin.go @@ -4,6 +4,7 @@ import ( "context" "errors" "strconv" + "strings" "github.com/beckn-one/beckn-onix/pkg/plugin/definition" "github.com/beckn-one/beckn-onix/pkg/plugin/implementation/schemav2validator" @@ -40,6 +41,34 @@ func (vp schemav2ValidatorProvider) New(ctx context.Context, config map[string]s } } + // NEW: Parse extendedSchema_enabled + if enableStr, ok := config["extendedSchema_enabled"]; ok { + cfg.EnableExtendedSchema = enableStr == "true" + } + + // NEW: Parse Extended Schema config (if enabled) + if cfg.EnableExtendedSchema { + if v, ok := config["extendedSchema_cacheTTL"]; ok { + if ttl, err := strconv.Atoi(v); err == nil && ttl > 0 { + cfg.ExtendedSchemaConfig.CacheTTL = ttl + } + } + if v, ok := config["extendedSchema_maxCacheSize"]; ok { + if size, err := strconv.Atoi(v); err == nil && size > 0 { + cfg.ExtendedSchemaConfig.MaxCacheSize = size + } + } + if v, ok := config["extendedSchema_downloadTimeout"]; ok { + if timeout, err := strconv.Atoi(v); err == nil && timeout > 0 { + cfg.ExtendedSchemaConfig.DownloadTimeout = timeout + } + } + if v, ok := config["extendedSchema_allowedDomains"]; ok && v != "" { + cfg.ExtendedSchemaConfig.AllowedDomains = strings.Split(v, ",") + } + + } + return schemav2validator.New(ctx, cfg) } diff --git a/pkg/plugin/implementation/schemav2validator/extended_schema.go b/pkg/plugin/implementation/schemav2validator/extended_schema.go new file mode 100644 index 0000000..92c5ed0 --- /dev/null +++ b/pkg/plugin/implementation/schemav2validator/extended_schema.go @@ -0,0 +1,406 @@ +package schemav2validator + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "net/url" + "strings" + "sync" + "time" + + "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/beckn-one/beckn-onix/pkg/model" + "github.com/getkin/kin-openapi/openapi3" +) + +// ExtendedSchemaConfig holds configuration for referenced schema validation. +type ExtendedSchemaConfig struct { + CacheTTL int // seconds, default 86400 (24h) + MaxCacheSize int // default 100 + DownloadTimeout int // seconds, default 30 + AllowedDomains []string // whitelist (empty = all allowed) +} + +// referencedObject represents a domain-specific object with @context. +type referencedObject struct { + Path string + Context string + Type string + Data map[string]interface{} +} + +// schemaCache caches loaded domain schemas with LRU eviction. +type schemaCache struct { + mu sync.RWMutex + schemas map[string]*cachedDomainSchema + maxSize int +} + +// cachedDomainSchema holds a cached domain schema with metadata. +type cachedDomainSchema struct { + doc *openapi3.T + loadedAt time.Time + expiresAt time.Time + lastAccessed time.Time + accessCount int64 +} + +// isCoreSchema checks if @context URL is a core Beckn schema. +func isCoreSchema(contextURL string) bool { + return strings.Contains(contextURL, "/schema/core/") +} + +// validateExtendedSchemas validates all objects with @context against their schemas. +func (v *schemav2Validator) validateExtendedSchemas(ctx context.Context, body interface{}) error { + // Extract "message" object - scan inside message + bodyMap, ok := body.(map[string]interface{}) + if !ok { + return fmt.Errorf("body is not a valid JSON object") + } + + message, hasMessage := bodyMap["message"] + if !hasMessage { + return fmt.Errorf("missing 'message' field in request body") + } + + // Find domain-specific objects with @context (skip core schemas) + objects := findReferencedObjects(message, "message") + + if len(objects) == 0 { + log.Debugf(ctx, "No domain-specific objects with @context found, skipping Extended Schema validation") + return nil + } + + log.Debugf(ctx, "Found %d domain-specific objects with @context for Extended Schema validation", len(objects)) + + // Get config with defaults + ttl := 86400 * time.Second // 24 hours default + timeout := 30 * time.Second + var allowedDomains []string + + refConfig := v.config.ExtendedSchemaConfig + if refConfig.CacheTTL > 0 { + ttl = time.Duration(refConfig.CacheTTL) * time.Second + } + if refConfig.DownloadTimeout > 0 { + timeout = time.Duration(refConfig.DownloadTimeout) * time.Second + } + allowedDomains = refConfig.AllowedDomains + + log.Debugf(ctx, "Extended Schema config: ttl=%v, timeout=%v, allowedDomains=%v", + ttl, timeout, allowedDomains) + + // Validate each object + for _, obj := range objects { + log.Debugf(ctx, "Validating object at path: %s, @context: %s, @type: %s", + obj.Path, obj.Context, obj.Type) + + if err := v.schemaCache.validateReferencedObject(ctx, obj, ttl, timeout, allowedDomains); err != nil { + // Extract and prefix error paths + var schemaErrors []model.Error + v.extractSchemaErrors(err, &schemaErrors) + + // Prefix all paths with object path + for i := range schemaErrors { + if schemaErrors[i].Paths != "" { + schemaErrors[i].Paths = obj.Path + "." + schemaErrors[i].Paths + } else { + schemaErrors[i].Paths = obj.Path + } + } + + return &model.SchemaValidationErr{Errors: schemaErrors} + } + } + + return nil +} + +// newSchemaCache creates a new schema cache. +func newSchemaCache(maxSize int) *schemaCache { + return &schemaCache{ + schemas: make(map[string]*cachedDomainSchema), + maxSize: maxSize, + } +} + +// hashURL creates a SHA256 hash of the URL for use as cache key. +func hashURL(urlStr string) string { + hash := sha256.Sum256([]byte(urlStr)) + return hex.EncodeToString(hash[:]) +} + +// isValidSchemaPath validates if the schema path is safe to load. +func isValidSchemaPath(schemaPath string) bool { + u, err := url.Parse(schemaPath) + if err != nil { + // Could be a simple file path + return schemaPath != "" + } + // Support: http://, https://, file://, or no scheme (local path) + return u.Scheme == "http" || u.Scheme == "https" || + u.Scheme == "file" || u.Scheme == "" +} + +// get retrieves a cached schema and updates access tracking. +func (c *schemaCache) get(urlHash string) (*openapi3.T, bool) { + c.mu.Lock() + defer c.mu.Unlock() + + cached, exists := c.schemas[urlHash] + if !exists || time.Now().After(cached.expiresAt) { + return nil, false + } + + // Update access tracking + cached.lastAccessed = time.Now() + cached.accessCount++ + + return cached.doc, true +} + +// set stores a schema in the cache with TTL and LRU eviction. +func (c *schemaCache) set(urlHash string, doc *openapi3.T, ttl time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + + // LRU eviction if cache is full + if len(c.schemas) >= c.maxSize { + var oldest string + var oldestTime time.Time + for k, v := range c.schemas { + if oldest == "" || v.lastAccessed.Before(oldestTime) { + oldest, oldestTime = k, v.lastAccessed + } + } + if oldest != "" { + delete(c.schemas, oldest) + } + } + + c.schemas[urlHash] = &cachedDomainSchema{ + doc: doc, + loadedAt: time.Now(), + expiresAt: time.Now().Add(ttl), + lastAccessed: time.Now(), + accessCount: 1, + } +} + +// cleanupExpired removes expired schemas from cache. +func (c *schemaCache) cleanupExpired() int { + c.mu.Lock() + defer c.mu.Unlock() + + now := time.Now() + expired := make([]string, 0) + + for urlHash, cached := range c.schemas { + if now.After(cached.expiresAt) { + expired = append(expired, urlHash) + } + } + + for _, urlHash := range expired { + delete(c.schemas, urlHash) + } + + return len(expired) +} + +// loadSchemaFromPath loads a schema from URL or local file with timeout and caching. +func (c *schemaCache) loadSchemaFromPath(ctx context.Context, schemaPath string, ttl, timeout time.Duration) (*openapi3.T, error) { + urlHash := hashURL(schemaPath) + + // Check cache first + if doc, found := c.get(urlHash); found { + log.Debugf(ctx, "Schema cache hit for: %s", schemaPath) + return doc, nil + } + + log.Debugf(ctx, "Schema cache miss, loading from: %s", schemaPath) + + // Validate path format + if !isValidSchemaPath(schemaPath) { + return nil, fmt.Errorf("invalid schema path: %s", schemaPath) + } + + loader := openapi3.NewLoader() + loader.IsExternalRefsAllowed = true + + var doc *openapi3.T + var err error + + u, parseErr := url.Parse(schemaPath) + if parseErr == nil && (u.Scheme == "http" || u.Scheme == "https") { + // Load from URL with timeout + loadCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + loader.Context = loadCtx + doc, err = loader.LoadFromURI(u) + } else { + // Load from local file (file:// or path) + filePath := schemaPath + if u != nil && u.Scheme == "file" { + filePath = u.Path + } + doc, err = loader.LoadFromFile(filePath) + } + + if err != nil { + log.Errorf(ctx, err, "Failed to load schema from: %s", schemaPath) + return nil, fmt.Errorf("failed to load schema from %s: %w", schemaPath, err) + } + + // Validate loaded schema (non-blocking, just log warnings) + if err := doc.Validate(ctx); err != nil { + log.Debugf(ctx, "Schema validation warnings for %s: %v", schemaPath, err) + } + + c.set(urlHash, doc, ttl) + log.Debugf(ctx, "Loaded and cached schema from: %s", schemaPath) + + return doc, nil +} + +// findReferencedObjects recursively finds domain-specific objects with @context . +func findReferencedObjects(data interface{}, path string) []referencedObject { + var results []referencedObject + + switch v := data.(type) { + case map[string]interface{}: + // Check for @context and @type + if contextVal, hasContext := v["@context"].(string); hasContext { + if typeVal, hasType := v["@type"].(string); hasType { + // Skip core schemas during traversal + if !isCoreSchema(contextVal) { + results = append(results, referencedObject{ + Path: path, + Context: contextVal, + Type: typeVal, + Data: v, + }) + } + } + } + + // Recurse into nested objects + for key, val := range v { + newPath := key + if path != "" { + newPath = path + "." + key + } + results = append(results, findReferencedObjects(val, newPath)...) + } + + case []interface{}: + // Recurse into arrays + for i, item := range v { + newPath := fmt.Sprintf("%s[%d]", path, i) + results = append(results, findReferencedObjects(item, newPath)...) + } + } + + return results +} + +// transformContextToSchemaURL transforms @context URL to schema URL. +func transformContextToSchemaURL(contextURL string) string { + // transformation: context.jsonld -> attributes.yaml + return strings.Replace(contextURL, "context.jsonld", "attributes.yaml", 1) +} + +// findSchemaByType finds a schema in the document by @type value. +func findSchemaByType(ctx context.Context, doc *openapi3.T, typeName string) (*openapi3.SchemaRef, error) { + if doc.Components == nil || doc.Components.Schemas == nil { + return nil, fmt.Errorf("no schemas found in document") + } + + // Try direct match by schema name + if schema, exists := doc.Components.Schemas[typeName]; exists { + log.Debugf(ctx, "Found schema by direct match: %s", typeName) + return schema, nil + } + + // Fallback: Try x-jsonld.@type match + for name, schema := range doc.Components.Schemas { + if schema.Value == nil { + continue + } + if xJsonld, ok := schema.Value.Extensions["x-jsonld"].(map[string]interface{}); ok { + if atType, ok := xJsonld["@type"].(string); ok && atType == typeName { + log.Debugf(ctx, "Found schema by x-jsonld.@type match: %s (mapped to %s)", typeName, name) + return schema, nil + } + } + } + + return nil, fmt.Errorf("no schema found for @type: %s", typeName) +} + +// isAllowedDomain checks if the URL domain is in the whitelist. +func isAllowedDomain(schemaURL string, allowedDomains []string) bool { + if len(allowedDomains) == 0 { + return true // No whitelist = all allowed + } + for _, domain := range allowedDomains { + if strings.Contains(schemaURL, domain) { + return true + } + } + return false +} + +// validateReferencedObject validates a single object with @context. +func (c *schemaCache) validateReferencedObject( + ctx context.Context, + obj referencedObject, + ttl, timeout time.Duration, + allowedDomains []string, +) error { + // Domain whitelist check + if !isAllowedDomain(obj.Context, allowedDomains) { + log.Warnf(ctx, "Domain not in whitelist: %s", obj.Context) + return fmt.Errorf("domain not allowed: %s", obj.Context) + } + + // Transform @context to schema path (URL or file) + schemaPath := transformContextToSchemaURL(obj.Context) + log.Debugf(ctx, "Transformed %s -> %s", obj.Context, schemaPath) + + // Load schema with timeout (supports URL or local file) + doc, err := c.loadSchemaFromPath(ctx, schemaPath, ttl, timeout) + if err != nil { + return fmt.Errorf("at %s: %w", obj.Path, err) + } + + // Find schema by @type + schema, err := findSchemaByType(ctx, doc, obj.Type) + if err != nil { + log.Errorf(ctx, err, "Schema not found for @type: %s at path: %s", obj.Type, obj.Path) + return fmt.Errorf("at %s: %w", obj.Path, err) + } + + // Strip JSON-LD metadata before validation + domainData := make(map[string]interface{}, len(obj.Data)-2) + for k, v := range obj.Data { + if k != "@context" && k != "@type" { + domainData[k] = v + } + } + + // Validate domain-specific data against schema + opts := []openapi3.SchemaValidationOption{ + openapi3.VisitAsRequest(), + openapi3.EnableFormatValidation(), + } + if err := schema.Value.VisitJSON(domainData, opts...); err != nil { + log.Debugf(ctx, "Validation failed for @type: %s at path: %s: %v", obj.Type, obj.Path, err) + return err + } + + log.Debugf(ctx, "Validation passed for @type: %s at path: %s", obj.Type, obj.Path) + return nil +} diff --git a/pkg/plugin/implementation/schemav2validator/extended_schema_test.go b/pkg/plugin/implementation/schemav2validator/extended_schema_test.go new file mode 100644 index 0000000..a4bf0cf --- /dev/null +++ b/pkg/plugin/implementation/schemav2validator/extended_schema_test.go @@ -0,0 +1,709 @@ +package schemav2validator + +import ( + "context" + "os" + "testing" + "time" + + "github.com/getkin/kin-openapi/openapi3" + "github.com/stretchr/testify/assert" +) + +func TestIsCoreSchema(t *testing.T) { + tests := []struct { + name string + contextURL string + want bool + }{ + { + name: "core schema URL", + contextURL: "https://raw.githubusercontent.com/beckn/protocol-specifications-new/refs/heads/draft/schema/core/v2/context.jsonld", + want: true, + }, + { + name: "domain schema URL", + contextURL: "https://raw.githubusercontent.com/beckn/protocol-specifications-new/refs/heads/draft/schema/EvChargingOffer/v1/context.jsonld", + want: false, + }, + { + name: "empty URL", + contextURL: "", + want: false, + }, + { + name: "URL without schema/core", + contextURL: "https://example.com/some/path/context.jsonld", + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := isCoreSchema(tt.contextURL) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestFindReferencedObjects(t *testing.T) { + tests := []struct { + name string + data interface{} + path string + want int // number of objects found + }{ + { + name: "single domain object", + data: map[string]interface{}{ + "@context": "https://example.com/schema/DomainType/v1/context.jsonld", + "@type": "DomainType", + "field": "value", + }, + path: "message", + want: 1, + }, + { + name: "core schema object - should be skipped", + data: map[string]interface{}{ + "@context": "https://example.com/schema/core/v2/context.jsonld", + "@type": "beckn:Order", + "field": "value", + }, + path: "message", + want: 0, + }, + { + name: "nested domain objects", + data: map[string]interface{}{ + "order": map[string]interface{}{ + "@context": "https://example.com/schema/core/v2/context.jsonld", + "@type": "beckn:Order", + "orderAttributes": map[string]interface{}{ + "@context": "https://example.com/schema/ChargingSession/v1/context.jsonld", + "@type": "ChargingSession", + "field": "value", + }, + }, + }, + path: "message", + want: 1, // Only domain object, core skipped + }, + { + name: "array with domain objects", + data: map[string]interface{}{ + "items": []interface{}{ + map[string]interface{}{ + "@context": "https://example.com/schema/DomainType/v1/context.jsonld", + "@type": "DomainType", + }, + map[string]interface{}{ + "@context": "https://example.com/schema/AnotherType/v1/context.jsonld", + "@type": "AnotherType", + }, + }, + }, + path: "message", + want: 2, + }, + { + name: "object without @context", + data: map[string]interface{}{ + "field": "value", + }, + path: "message", + want: 0, + }, + { + name: "object with @context but no @type", + data: map[string]interface{}{ + "@context": "https://example.com/schema/DomainType/v1/context.jsonld", + "field": "value", + }, + path: "message", + want: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := findReferencedObjects(tt.data, tt.path) + assert.Equal(t, tt.want, len(got)) + }) + } +} + +func TestTransformContextToSchemaURL(t *testing.T) { + tests := []struct { + name string + contextURL string + want string + }{ + { + name: "standard transformation", + contextURL: "https://example.com/schema/EvChargingOffer/v1/context.jsonld", + want: "https://example.com/schema/EvChargingOffer/v1/attributes.yaml", + }, + { + name: "already attributes.yaml", + contextURL: "https://example.com/schema/EvChargingOffer/v1/attributes.yaml", + want: "https://example.com/schema/EvChargingOffer/v1/attributes.yaml", + }, + { + name: "no context.jsonld in URL", + contextURL: "https://example.com/schema/EvChargingOffer/v1/", + want: "https://example.com/schema/EvChargingOffer/v1/", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := transformContextToSchemaURL(tt.contextURL) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestHashURL(t *testing.T) { + tests := []struct { + name string + url string + }{ + { + name: "consistent hashing", + url: "https://example.com/schema.yaml", + }, + { + name: "empty string", + url: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + hash1 := hashURL(tt.url) + hash2 := hashURL(tt.url) + + // Same URL should produce same hash + assert.Equal(t, hash1, hash2) + + // Hash should be 64 characters (SHA256 hex) + assert.Equal(t, 64, len(hash1)) + }) + } +} + +func TestIsValidSchemaPath(t *testing.T) { + tests := []struct { + name string + schemaPath string + want bool + }{ + { + name: "http URL", + schemaPath: "http://example.com/schema.yaml", + want: true, + }, + { + name: "https URL", + schemaPath: "https://example.com/schema.yaml", + want: true, + }, + { + name: "file URL", + schemaPath: "file:///path/to/schema.yaml", + want: true, + }, + { + name: "local path", + schemaPath: "/path/to/schema.yaml", + want: true, + }, + { + name: "relative path", + schemaPath: "./schema.yaml", + want: true, + }, + { + name: "empty path", + schemaPath: "", + want: true, // url.Parse("") succeeds, returns empty scheme + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := isValidSchemaPath(tt.schemaPath) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestNewSchemaCache(t *testing.T) { + tests := []struct { + name string + maxSize int + }{ + { + name: "default size", + maxSize: 100, + }, + { + name: "custom size", + maxSize: 50, + }, + { + name: "zero size", + maxSize: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cache := newSchemaCache(tt.maxSize) + + assert.NotNil(t, cache) + assert.Equal(t, tt.maxSize, cache.maxSize) + assert.NotNil(t, cache.schemas) + assert.Equal(t, 0, len(cache.schemas)) + }) + } +} + +func TestSchemaCache_GetSet(t *testing.T) { + cache := newSchemaCache(10) + + // Create a simple schema doc + doc := &openapi3.T{ + OpenAPI: "3.1.0", + } + + urlHash := hashURL("https://example.com/schema.yaml") + ttl := 1 * time.Hour + + // Test Set + cache.set(urlHash, doc, ttl) + + // Test Get - should find it + retrieved, found := cache.get(urlHash) + assert.True(t, found) + assert.Equal(t, doc, retrieved) + + // Test Get - non-existent key + _, found = cache.get("non-existent-hash") + assert.False(t, found) +} + +func TestSchemaCache_LRUEviction(t *testing.T) { + cache := newSchemaCache(2) // Small cache for testing + + doc1 := &openapi3.T{OpenAPI: "3.1.0"} + doc2 := &openapi3.T{OpenAPI: "3.1.1"} + doc3 := &openapi3.T{OpenAPI: "3.1.2"} + + ttl := 1 * time.Hour + + // Add first two items + cache.set("hash1", doc1, ttl) + cache.set("hash2", doc2, ttl) + + // Access first item to make it more recent + cache.get("hash1") + + // Add third item - should evict hash2 (least recently used) + cache.set("hash3", doc3, ttl) + + // Verify hash1 and hash3 exist, hash2 was evicted + _, found1 := cache.get("hash1") + _, found2 := cache.get("hash2") + _, found3 := cache.get("hash3") + + assert.True(t, found1, "hash1 should exist (recently accessed)") + assert.False(t, found2, "hash2 should be evicted (LRU)") + assert.True(t, found3, "hash3 should exist (just added)") +} + +func TestSchemaCache_TTLExpiry(t *testing.T) { + cache := newSchemaCache(10) + + doc := &openapi3.T{OpenAPI: "3.1.0"} + urlHash := "test-hash" + + // Set with very short TTL + cache.set(urlHash, doc, 1*time.Millisecond) + + // Should be found immediately + _, found := cache.get(urlHash) + assert.True(t, found) + + // Wait for expiry + time.Sleep(10 * time.Millisecond) + + // Should not be found after expiry + _, found = cache.get(urlHash) + assert.False(t, found) +} + +func TestSchemaCache_CleanupExpired(t *testing.T) { + cache := newSchemaCache(10) + + doc := &openapi3.T{OpenAPI: "3.1.0"} + + // Add items with short TTL + cache.set("hash1", doc, 1*time.Millisecond) + cache.set("hash2", doc, 1*time.Millisecond) + cache.set("hash3", doc, 1*time.Hour) // This one won't expire + + // Wait for expiry + time.Sleep(10 * time.Millisecond) + + // Cleanup expired + count := cache.cleanupExpired() + + // Should have cleaned up 2 expired items + assert.Equal(t, 2, count) + + // Verify only hash3 remains + cache.mu.RLock() + assert.Equal(t, 1, len(cache.schemas)) + _, exists := cache.schemas["hash3"] + assert.True(t, exists) + cache.mu.RUnlock() +} + +func TestIsAllowedDomain(t *testing.T) { + tests := []struct { + name string + schemaURL string + allowedDomains []string + want bool + }{ + { + name: "empty whitelist - all allowed", + schemaURL: "https://example.com/schema.yaml", + allowedDomains: []string{}, + want: true, + }, + { + name: "nil whitelist - all allowed", + schemaURL: "https://example.com/schema.yaml", + allowedDomains: nil, + want: true, + }, + { + name: "domain in whitelist", + schemaURL: "https://raw.githubusercontent.com/beckn/schema.yaml", + allowedDomains: []string{"raw.githubusercontent.com", "schemas.beckn.org"}, + want: true, + }, + { + name: "domain not in whitelist", + schemaURL: "https://malicious.com/schema.yaml", + allowedDomains: []string{"raw.githubusercontent.com", "schemas.beckn.org"}, + want: false, + }, + { + name: "partial domain match", + schemaURL: "https://raw.githubusercontent.com/beckn/schema.yaml", + allowedDomains: []string{"githubusercontent.com"}, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := isAllowedDomain(tt.schemaURL, tt.allowedDomains) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestFindReferencedObjects_PathBuilding(t *testing.T) { + data := map[string]interface{}{ + "order": map[string]interface{}{ + "beckn:orderItems": []interface{}{ + map[string]interface{}{ + "beckn:acceptedOffer": map[string]interface{}{ + "beckn:offerAttributes": map[string]interface{}{ + "@context": "https://example.com/schema/ChargingOffer/v1/context.jsonld", + "@type": "ChargingOffer", + }, + }, + }, + }, + }, + } + + objects := findReferencedObjects(data, "message") + + assert.Equal(t, 1, len(objects)) + assert.Equal(t, "message.order.beckn:orderItems[0].beckn:acceptedOffer.beckn:offerAttributes", objects[0].Path) + assert.Equal(t, "ChargingOffer", objects[0].Type) +} + +// Integration tests for the 4 remaining functions + +func TestLoadSchemaFromPath_LocalFile(t *testing.T) { + cache := newSchemaCache(10) + ctx := context.Background() + + tmpFile, err := os.CreateTemp("", "test-schema-*.yaml") + assert.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + schemaContent := `openapi: 3.1.0 +info: + title: Test Schema + version: 1.0.0 +components: + schemas: + TestType: + type: object + properties: + field1: + type: string` + + _, err = tmpFile.Write([]byte(schemaContent)) + assert.NoError(t, err) + tmpFile.Close() + + doc, err := cache.loadSchemaFromPath(ctx, tmpFile.Name(), 1*time.Hour, 30*time.Second) + assert.NoError(t, err) + assert.NotNil(t, doc) + assert.Equal(t, "3.1.0", doc.OpenAPI) +} + +func TestLoadSchemaFromPath_CacheHit(t *testing.T) { + cache := newSchemaCache(10) + ctx := context.Background() + + tmpFile, err := os.CreateTemp("", "test-schema-*.yaml") + assert.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + schemaContent := `openapi: 3.1.0 +info: + title: Test Schema + version: 1.0.0` + + tmpFile.Write([]byte(schemaContent)) + tmpFile.Close() + + doc1, err := cache.loadSchemaFromPath(ctx, tmpFile.Name(), 1*time.Hour, 30*time.Second) + assert.NoError(t, err) + + doc2, err := cache.loadSchemaFromPath(ctx, tmpFile.Name(), 1*time.Hour, 30*time.Second) + assert.NoError(t, err) + + assert.Equal(t, doc1, doc2) +} + +func TestLoadSchemaFromPath_InvalidPath(t *testing.T) { + cache := newSchemaCache(10) + ctx := context.Background() + + _, err := cache.loadSchemaFromPath(ctx, "/nonexistent/schema.yaml", 1*time.Hour, 30*time.Second) + assert.Error(t, err) +} + +func TestFindSchemaByType_DirectMatch(t *testing.T) { + cache := newSchemaCache(10) + ctx := context.Background() + + tmpFile, err := os.CreateTemp("", "test-schema-*.yaml") + assert.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + schemaContent := `openapi: 3.1.0 +info: + title: Test Schema + version: 1.0.0 +components: + schemas: + TestType: + type: object + properties: + field1: + type: string` + + tmpFile.Write([]byte(schemaContent)) + tmpFile.Close() + + doc, err := cache.loadSchemaFromPath(ctx, tmpFile.Name(), 1*time.Hour, 30*time.Second) + assert.NoError(t, err) + + schema, err := findSchemaByType(ctx, doc, "TestType") + assert.NoError(t, err) + assert.NotNil(t, schema) +} + +func TestFindSchemaByType_NotFound(t *testing.T) { + cache := newSchemaCache(10) + ctx := context.Background() + + tmpFile, err := os.CreateTemp("", "test-schema-*.yaml") + assert.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + schemaContent := `openapi: 3.1.0 +info: + title: Test Schema + version: 1.0.0 +components: + schemas: + TestType: + type: object` + + tmpFile.Write([]byte(schemaContent)) + tmpFile.Close() + + doc, err := cache.loadSchemaFromPath(ctx, tmpFile.Name(), 1*time.Hour, 30*time.Second) + assert.NoError(t, err) + + _, err = findSchemaByType(ctx, doc, "NonExistentType") + assert.Error(t, err) + assert.Contains(t, err.Error(), "no schema found") +} + +func TestValidateReferencedObject_Valid(t *testing.T) { + cache := newSchemaCache(10) + ctx := context.Background() + + tmpFile, err := os.CreateTemp("", "test-schema-*.yaml") + assert.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + schemaContent := `openapi: 3.1.0 +info: + title: Test Schema + version: 1.0.0 +components: + schemas: + TestType: + type: object + additionalProperties: false + x-jsonld: + "@context": ./context.jsonld + "@type": TestType + properties: + field1: + type: string + required: + - field1` + + tmpFile.Write([]byte(schemaContent)) + tmpFile.Close() + + obj := referencedObject{ + Path: "message.test", + Context: tmpFile.Name(), + Type: "TestType", + Data: map[string]interface{}{ + "@context": tmpFile.Name(), + "@type": "TestType", + "field1": "value1", + }, + } + + err = cache.validateReferencedObject(ctx, obj, 1*time.Hour, 30*time.Second, nil) + assert.NoError(t, err) +} + +func TestValidateReferencedObject_Invalid(t *testing.T) { + cache := newSchemaCache(10) + ctx := context.Background() + + tmpFile, err := os.CreateTemp("", "test-schema-*.yaml") + assert.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + schemaContent := `openapi: 3.1.0 +info: + title: Test Schema + version: 1.0.0 +components: + schemas: + TestType: + type: object + additionalProperties: false + x-jsonld: + "@context": ./context.jsonld + "@type": TestType + properties: + field1: + type: string + required: + - field1` + + tmpFile.Write([]byte(schemaContent)) + tmpFile.Close() + + obj := referencedObject{ + Path: "message.test", + Context: tmpFile.Name(), + Type: "TestType", + Data: map[string]interface{}{ + "@context": tmpFile.Name(), + "@type": "TestType", + }, + } + + err = cache.validateReferencedObject(ctx, obj, 1*time.Hour, 30*time.Second, nil) + assert.Error(t, err) +} + +func TestValidateReferencedObject_DomainNotAllowed(t *testing.T) { + cache := newSchemaCache(10) + ctx := context.Background() + + obj := referencedObject{ + Path: "message.test", + Context: "https://malicious.com/schema.yaml", + Type: "TestType", + Data: map[string]interface{}{}, + } + + allowedDomains := []string{"trusted.com"} + + err := cache.validateReferencedObject(ctx, obj, 1*time.Hour, 30*time.Second, allowedDomains) + assert.Error(t, err) + assert.Contains(t, err.Error(), "domain not allowed") +} + +func TestValidateExtendedSchemas_NoObjects(t *testing.T) { + v := &schemav2Validator{ + config: &Config{ + EnableExtendedSchema: true, + ExtendedSchemaConfig: ExtendedSchemaConfig{}, + }, + schemaCache: newSchemaCache(10), + } + + ctx := context.Background() + body := map[string]interface{}{ + "message": map[string]interface{}{ + "field": "value", + }, + } + + err := v.validateExtendedSchemas(ctx, body) + assert.NoError(t, err) +} + +func TestValidateExtendedSchemas_MissingMessage(t *testing.T) { + v := &schemav2Validator{ + config: &Config{ + EnableExtendedSchema: true, + }, + schemaCache: newSchemaCache(10), + } + + ctx := context.Background() + body := map[string]interface{}{ + "context": map[string]interface{}{}, + } + + err := v.validateExtendedSchemas(ctx, body) + assert.Error(t, err) + assert.Contains(t, err.Error(), "missing 'message' field") +} diff --git a/pkg/plugin/implementation/schemav2validator/schemav2validator.go b/pkg/plugin/implementation/schemav2validator/schemav2validator.go index cc6c717..f7050b7 100644 --- a/pkg/plugin/implementation/schemav2validator/schemav2validator.go +++ b/pkg/plugin/implementation/schemav2validator/schemav2validator.go @@ -24,9 +24,10 @@ type payload struct { // schemav2Validator implements the SchemaValidator interface. type schemav2Validator struct { - config *Config - spec *cachedSpec - specMutex sync.RWMutex + config *Config + spec *cachedSpec + specMutex sync.RWMutex + schemaCache *schemaCache // cache for extended schemas } // cachedSpec holds a cached OpenAPI spec. @@ -41,6 +42,10 @@ type Config struct { Type string // "url", "file", or "dir" Location string // URL, file path, or directory path CacheTTL int + + // Extended Schema configuration + EnableExtendedSchema bool + ExtendedSchemaConfig ExtendedSchemaConfig } // New creates a new Schemav2Validator instance. @@ -66,6 +71,16 @@ func New(ctx context.Context, config *Config) (*schemav2Validator, func() error, config: config, } + // Initialize extended schema cache if enabled + if config.EnableExtendedSchema { + maxSize := 100 + if config.ExtendedSchemaConfig.MaxCacheSize > 0 { + maxSize = config.ExtendedSchemaConfig.MaxCacheSize + } + v.schemaCache = newSchemaCache(maxSize) + log.Infof(ctx, "Initialized extended schema cache with max size: %d", maxSize) + } + if err := v.initialise(ctx); err != nil { return nil, nil, fmt.Errorf("failed to initialise schemav2Validator: %v", err) } @@ -119,6 +134,19 @@ func (v *schemav2Validator) Validate(ctx context.Context, reqURL *url.URL, data return v.formatValidationError(err) } + log.Debugf(ctx, "base schema validation passed for action: %s", action) + + // Extended Schema validation (if enabled) + if v.config.EnableExtendedSchema && v.schemaCache != nil { + log.Debugf(ctx, "Starting Extended Schema validation for action: %s", action) + if err := v.validateExtendedSchemas(ctx, jsonData); err != nil { + // Extended Schema failure - return error + log.Debugf(ctx, "Extended Schema validation failed for action %s: %v", action, err) + return err + } + log.Debugf(ctx, "Extended Schema validation passed for action: %s", action) + } + return nil } @@ -181,15 +209,36 @@ func (v *schemav2Validator) loadSpec(ctx context.Context) error { // refreshLoop periodically reloads expired specs based on TTL. func (v *schemav2Validator) refreshLoop(ctx context.Context) { - ticker := time.NewTicker(time.Duration(v.config.CacheTTL) * time.Second) - defer ticker.Stop() + coreTicker := time.NewTicker(time.Duration(v.config.CacheTTL) * time.Second) + defer coreTicker.Stop() + + // Ticker for extended schema cleanup + var refTicker *time.Ticker + var refTickerCh <-chan time.Time // Default nil, blocks forever + + if v.config.EnableExtendedSchema { + ttl := v.config.ExtendedSchemaConfig.CacheTTL + if ttl <= 0 { + ttl = 86400 // Default 24 hours + } + refTicker = time.NewTicker(time.Duration(ttl) * time.Second) + defer refTicker.Stop() + refTickerCh = refTicker.C + } for { select { case <-ctx.Done(): return - case <-ticker.C: + case <-coreTicker.C: v.reloadExpiredSpec(ctx) + case <-refTickerCh: + if v.schemaCache != nil { + count := v.schemaCache.cleanupExpired() + if count > 0 { + log.Debugf(ctx, "Cleaned up %d expired extended schemas", count) + } + } } } }