From 94943e63e6721318239539bf0e66fde8cbd0c5d0 Mon Sep 17 00:00:00 2001 From: ameersohel45 Date: Fri, 12 Dec 2025 00:32:35 +0530 Subject: [PATCH 1/9] Issue580-feat: add support for referenced schema validation and caching --- .../schemav2validator/cmd/plugin.go | 31 ++ .../schemav2validator/schemav2validator.go | 451 +++++++++++++++++- 2 files changed, 472 insertions(+), 10 deletions(-) diff --git a/pkg/plugin/implementation/schemav2validator/cmd/plugin.go b/pkg/plugin/implementation/schemav2validator/cmd/plugin.go index 9d147fe..8bdfe82 100644 --- a/pkg/plugin/implementation/schemav2validator/cmd/plugin.go +++ b/pkg/plugin/implementation/schemav2validator/cmd/plugin.go @@ -4,6 +4,7 @@ import ( "context" "errors" "strconv" + "strings" "github.com/beckn-one/beckn-onix/pkg/plugin/definition" "github.com/beckn-one/beckn-onix/pkg/plugin/implementation/schemav2validator" @@ -40,6 +41,36 @@ func (vp schemav2ValidatorProvider) New(ctx context.Context, config map[string]s } } + // NEW: Parse enableReferencedSchemas + if enableStr, ok := config["enableReferencedSchemas"]; ok { + cfg.EnableReferencedSchemas = enableStr == "true" + } + + // NEW: Parse referencedSchemaConfig (if enabled) + if cfg.EnableReferencedSchemas { + if v, ok := config["referencedSchemaConfig.cacheTTL"]; ok { + if ttl, err := strconv.Atoi(v); err == nil && ttl > 0 { + cfg.ReferencedSchemaConfig.CacheTTL = ttl + } + } + if v, ok := config["referencedSchemaConfig.maxCacheSize"]; ok { + if size, err := strconv.Atoi(v); err == nil && size > 0 { + cfg.ReferencedSchemaConfig.MaxCacheSize = size + } + } + if v, ok := config["referencedSchemaConfig.downloadTimeout"]; ok { + if timeout, err := strconv.Atoi(v); err == nil && timeout > 0 { + cfg.ReferencedSchemaConfig.DownloadTimeout = timeout + } + } + if v, ok := config["referencedSchemaConfig.allowedDomains"]; ok && v != "" { + cfg.ReferencedSchemaConfig.AllowedDomains = strings.Split(v, ",") + } + if v, ok := config["referencedSchemaConfig.urlTransform"]; ok && v != "" { + cfg.ReferencedSchemaConfig.URLTransform = v + } + } + return schemav2validator.New(ctx, cfg) } diff --git a/pkg/plugin/implementation/schemav2validator/schemav2validator.go b/pkg/plugin/implementation/schemav2validator/schemav2validator.go index cc6c717..47762cc 100644 --- a/pkg/plugin/implementation/schemav2validator/schemav2validator.go +++ b/pkg/plugin/implementation/schemav2validator/schemav2validator.go @@ -2,6 +2,8 @@ package schemav2validator import ( "context" + "crypto/sha256" + "encoding/hex" "encoding/json" "fmt" "net/url" @@ -24,9 +26,10 @@ type payload struct { // schemav2Validator implements the SchemaValidator interface. type schemav2Validator struct { - config *Config - spec *cachedSpec - specMutex sync.RWMutex + config *Config + spec *cachedSpec + specMutex sync.RWMutex + schemaCache *schemaCache // NEW: cache for referenced schemas } // cachedSpec holds a cached OpenAPI spec. @@ -41,6 +44,43 @@ type Config struct { Type string // "url", "file", or "dir" Location string // URL, file path, or directory path CacheTTL int + + // NEW: Referenced schema configuration + EnableReferencedSchemas bool + ReferencedSchemaConfig ReferencedSchemaConfig +} + +// ReferencedSchemaConfig holds configuration for referenced schema validation. +type ReferencedSchemaConfig struct { + CacheTTL int // seconds, default 86400 (24h) + MaxCacheSize int // default 100 + DownloadTimeout int // seconds, default 30 + AllowedDomains []string // whitelist (empty = all allowed) + URLTransform string // e.g. "context.jsonld->attributes.yaml" +} + +// referencedObject represents ANY object with @context in the request. +type referencedObject struct { + Path string + Context string + Type string + Data map[string]interface{} +} + +// schemaCache caches loaded domain schemas with LRU eviction. +type schemaCache struct { + mu sync.RWMutex + schemas map[string]*cachedDomainSchema + maxSize int +} + +// cachedDomainSchema holds a cached domain schema with metadata. +type cachedDomainSchema struct { + doc *openapi3.T + loadedAt time.Time + expiresAt time.Time + lastAccessed time.Time + accessCount int64 } // New creates a new Schemav2Validator instance. @@ -66,6 +106,16 @@ func New(ctx context.Context, config *Config) (*schemav2Validator, func() error, config: config, } + // NEW: Initialize referenced schema cache if enabled + if config.EnableReferencedSchemas { + maxSize := 100 + if config.ReferencedSchemaConfig.MaxCacheSize > 0 { + maxSize = config.ReferencedSchemaConfig.MaxCacheSize + } + v.schemaCache = newSchemaCache(maxSize) + log.Infof(ctx, "Initialized referenced schema cache with max size: %d", maxSize) + } + if err := v.initialise(ctx); err != nil { return nil, nil, fmt.Errorf("failed to initialise schemav2Validator: %v", err) } @@ -119,6 +169,19 @@ func (v *schemav2Validator) Validate(ctx context.Context, reqURL *url.URL, data return v.formatValidationError(err) } + log.Debugf(ctx, "LEVEL 1 validation passed for action: %s", action) + + // NEW: LEVEL 2 - Referenced schema validation (if enabled) + if v.config.EnableReferencedSchemas && v.schemaCache != nil { + log.Debugf(ctx, "Starting LEVEL 2 validation for action: %s", action) + if err := v.validateReferencedSchemas(ctx, jsonData); err != nil { + // Level 2 failure - return error (same behavior as Level 1) + log.Debugf(ctx, "LEVEL 2 validation failed for action %s: %v", action, err) + return v.formatValidationError(err) + } + log.Debugf(ctx, "LEVEL 2 validation passed for action: %s", action) + } + return nil } @@ -181,15 +244,42 @@ func (v *schemav2Validator) loadSpec(ctx context.Context) error { // refreshLoop periodically reloads expired specs based on TTL. func (v *schemav2Validator) refreshLoop(ctx context.Context) { - ticker := time.NewTicker(time.Duration(v.config.CacheTTL) * time.Second) - defer ticker.Stop() + coreTicker := time.NewTicker(time.Duration(v.config.CacheTTL) * time.Second) + defer coreTicker.Stop() + + // NEW: Ticker for referenced schema cleanup + var refTicker *time.Ticker + if v.config.EnableReferencedSchemas { + ttl := v.config.ReferencedSchemaConfig.CacheTTL + if ttl <= 0 { + ttl = 86400 // Default 24 hours + } + refTicker = time.NewTicker(time.Duration(ttl) * time.Second) + defer refTicker.Stop() + } for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - v.reloadExpiredSpec(ctx) + if refTicker != nil { + select { + case <-ctx.Done(): + return + case <-coreTicker.C: + v.reloadExpiredSpec(ctx) + case <-refTicker.C: + if v.schemaCache != nil { + count := v.schemaCache.cleanupExpired() + if count > 0 { + log.Debugf(ctx, "Cleaned up %d expired referenced schemas", count) + } + } + } + } else { + select { + case <-ctx.Done(): + return + case <-coreTicker.C: + v.reloadExpiredSpec(ctx) + } } } } @@ -309,6 +399,68 @@ func (v *schemav2Validator) buildActionIndex(ctx context.Context, doc *openapi3. return actionSchemas } +// validateReferencedSchemas validates all objects with @context against their schemas. +func (v *schemav2Validator) validateReferencedSchemas(ctx context.Context, body interface{}) error { + // Extract "message" object - only scan inside message, not root + bodyMap, ok := body.(map[string]interface{}) + if !ok { + return fmt.Errorf("body is not a valid JSON object") + } + + message, hasMessage := bodyMap["message"] + if !hasMessage { + return fmt.Errorf("missing 'message' field in request body") + } + + // Find all objects with @context starting from message + objects := findReferencedObjects(message, "message") + + if len(objects) == 0 { + log.Debugf(ctx, "No objects with @context found in message, skipping LEVEL 2 validation") + return nil + } + + log.Debugf(ctx, "Found %d objects with @context for LEVEL 2 validation", len(objects)) + + // Get config with defaults + urlTransform := "context.jsonld->attributes.yaml" + ttl := 86400 * time.Second // 24 hours default + timeout := 30 * time.Second + var allowedDomains []string + + refConfig := v.config.ReferencedSchemaConfig + if refConfig.URLTransform != "" { + urlTransform = refConfig.URLTransform + } + if refConfig.CacheTTL > 0 { + ttl = time.Duration(refConfig.CacheTTL) * time.Second + } + if refConfig.DownloadTimeout > 0 { + timeout = time.Duration(refConfig.DownloadTimeout) * time.Second + } + allowedDomains = refConfig.AllowedDomains + + log.Debugf(ctx, "LEVEL 2 config: urlTransform=%s, ttl=%v, timeout=%v, allowedDomains=%v", + urlTransform, ttl, timeout, allowedDomains) + + // Validate each object and collect errors + var errors []string + for _, obj := range objects { + log.Debugf(ctx, "Validating object at path: %s, @context: %s, @type: %s", + obj.Path, obj.Context, obj.Type) + + if err := v.schemaCache.validateReferencedObject(ctx, obj, urlTransform, ttl, timeout, allowedDomains); err != nil { + errors = append(errors, err.Error()) + } + } + + if len(errors) > 0 { + return fmt.Errorf("validation errors:\n - %s", strings.Join(errors, "\n - ")) + } + + return nil +} + // extractActionFromSchema extracts the action value from a schema. func (v *schemav2Validator) extractActionFromSchema(schema *openapi3.Schema) string { // Check direct properties @@ -360,3 +512,282 @@ func (v *schemav2Validator) getActionValue(contextSchema *openapi3.Schema) strin return "" } + +// newSchemaCache creates a new schema cache. +func newSchemaCache(maxSize int) *schemaCache { + return &schemaCache{ + schemas: make(map[string]*cachedDomainSchema), + maxSize: maxSize, + } +} + +// hashURL creates a SHA256 hash of the URL for use as cache key. +func hashURL(urlStr string) string { + hash := sha256.Sum256([]byte(urlStr)) + return hex.EncodeToString(hash[:]) +} + +// isValidSchemaPath validates if the schema path is safe to load. +func isValidSchemaPath(schemaPath string) bool { + u, err := url.Parse(schemaPath) + if err != nil { + // Could be a simple file path + return schemaPath != "" + } + // Support: http://, https://, file://, or no scheme (local path) + return u.Scheme == "http" || u.Scheme == "https" || + u.Scheme == "file" || u.Scheme == "" +} + +// get retrieves a cached schema and updates access tracking. +func (c *schemaCache) get(urlHash string) (*openapi3.T, bool) { + c.mu.Lock() + defer c.mu.Unlock() + + cached, exists := c.schemas[urlHash] + if !exists || time.Now().After(cached.expiresAt) { + return nil, false + } + + // Update access tracking + cached.lastAccessed = time.Now() + cached.accessCount++ + + return cached.doc, true +} + +// set stores a schema in the cache with TTL and LRU eviction. +func (c *schemaCache) set(urlHash string, doc *openapi3.T, ttl time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + + // LRU eviction if cache is full + if len(c.schemas) >= c.maxSize { + var oldest string + var oldestTime time.Time + for k, v := range c.schemas { + if oldest == "" || v.lastAccessed.Before(oldestTime) { + oldest, oldestTime = k, v.lastAccessed + } + } + if oldest != "" { + delete(c.schemas, oldest) + } + } + + c.schemas[urlHash] = &cachedDomainSchema{ + doc: doc, + loadedAt: time.Now(), + expiresAt: time.Now().Add(ttl), + lastAccessed: time.Now(), + accessCount: 1, + } +} + +// cleanupExpired removes expired schemas from cache. +func (c *schemaCache) cleanupExpired() int { + c.mu.Lock() + defer c.mu.Unlock() + + now := time.Now() + expired := make([]string, 0) + + for urlHash, cached := range c.schemas { + if now.After(cached.expiresAt) { + expired = append(expired, urlHash) + } + } + + for _, urlHash := range expired { + delete(c.schemas, urlHash) + } + + return len(expired) +} + +// loadSchemaFromPath loads a schema from URL or local file with timeout and caching. +func (c *schemaCache) loadSchemaFromPath(ctx context.Context, schemaPath string, ttl, timeout time.Duration) (*openapi3.T, error) { + urlHash := hashURL(schemaPath) + + // Check cache first + if doc, found := c.get(urlHash); found { + log.Debugf(ctx, "Schema cache hit for: %s", schemaPath) + return doc, nil + } + + log.Debugf(ctx, "Schema cache miss, loading from: %s", schemaPath) + + // Validate path format + if !isValidSchemaPath(schemaPath) { + return nil, fmt.Errorf("invalid schema path: %s", schemaPath) + } + + loader := openapi3.NewLoader() + loader.IsExternalRefsAllowed = true + + var doc *openapi3.T + var err error + + u, parseErr := url.Parse(schemaPath) + if parseErr == nil && (u.Scheme == "http" || u.Scheme == "https") { + // Load from URL with timeout + loadCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + loader.Context = loadCtx + doc, err = loader.LoadFromURI(u) + } else { + // Load from local file (file:// or path) + filePath := schemaPath + if u != nil && u.Scheme == "file" { + filePath = u.Path + } + doc, err = loader.LoadFromFile(filePath) + } + + if err != nil { + log.Errorf(ctx, err, "Failed to load schema from: %s", schemaPath) + return nil, fmt.Errorf("failed to load schema from %s: %w", schemaPath, err) + } + + // Validate loaded schema (non-blocking, just log warnings) + if err := doc.Validate(ctx); err != nil { + log.Debugf(ctx, "Schema validation warnings for %s: %v", schemaPath, err) + } + + c.set(urlHash, doc, ttl) + log.Debugf(ctx, "Loaded and cached schema from: %s", schemaPath) + + return doc, nil +} + +// findReferencedObjects recursively finds ALL objects with @context in the data. +func findReferencedObjects(data interface{}, path string) []referencedObject { + var results []referencedObject + + switch v := data.(type) { + case map[string]interface{}: + // Check for @context and @type + if contextVal, hasContext := v["@context"].(string); hasContext { + if typeVal, hasType := v["@type"].(string); hasType { + results = append(results, referencedObject{ + Path: path, + Context: contextVal, + Type: typeVal, + Data: v, + }) + } + } + + // Recurse into nested objects + for key, val := range v { + newPath := key + if path != "" { + newPath = path + "." + key + } + results = append(results, findReferencedObjects(val, newPath)...) + } + + case []interface{}: + // Recurse into arrays + for i, item := range v { + newPath := fmt.Sprintf("%s[%d]", path, i) + results = append(results, findReferencedObjects(item, newPath)...) + } + } + + return results +} + +// transformContextToSchemaURL transforms @context URL to schema URL. +func transformContextToSchemaURL(contextURL, transform string) string { + parts := strings.Split(transform, "->") + if len(parts) != 2 { + // Default transformation + return strings.Replace(contextURL, "context.jsonld", "attributes.yaml", 1) + } + return strings.Replace(contextURL, strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]), 1) +} + +// findSchemaByType finds a schema in the document by @type value. +func findSchemaByType(doc *openapi3.T, typeName string) (*openapi3.SchemaRef, error) { + if doc.Components == nil || doc.Components.Schemas == nil { + return nil, fmt.Errorf("no schemas found in document") + } + + // Try direct match by schema name + if schema, exists := doc.Components.Schemas[typeName]; exists { + return schema, nil + } + + // Fallback: Try x-jsonld.@type match + for _, schema := range doc.Components.Schemas { + if schema.Value == nil { + continue + } + if xJsonld, ok := schema.Value.Extensions["x-jsonld"].(map[string]interface{}); ok { + if atType, ok := xJsonld["@type"].(string); ok && atType == typeName { + return schema, nil + } + } + } + + return nil, fmt.Errorf("no schema found for @type: %s", typeName) +} + +// isAllowedDomain checks if the URL domain is in the whitelist. +func isAllowedDomain(schemaURL string, allowedDomains []string) bool { + if len(allowedDomains) == 0 { + return true // No whitelist = all allowed + } + for _, domain := range allowedDomains { + if strings.Contains(schemaURL, domain) { + return true + } + } + return false +} + +// validateReferencedObject validates a single object with @context. +func (c *schemaCache) validateReferencedObject( + ctx context.Context, + obj referencedObject, + urlTransform string, + ttl, timeout time.Duration, + allowedDomains []string, +) error { + // Domain whitelist check + if !isAllowedDomain(obj.Context, allowedDomains) { + log.Warnf(ctx, "Domain not in whitelist: %s", obj.Context) + return fmt.Errorf("domain not allowed: %s", obj.Context) + } + + // Transform @context to schema path (URL or file) + schemaPath := transformContextToSchemaURL(obj.Context, urlTransform) + log.Debugf(ctx, "Transformed %s -> %s", obj.Context, schemaPath) + + // Load schema with timeout (supports URL or local file) + doc, err := c.loadSchemaFromPath(ctx, schemaPath, ttl, timeout) + if err != nil { + return fmt.Errorf("at %s: %w", obj.Path, err) + } + + // Find schema by @type + schema, err := findSchemaByType(doc, obj.Type) + if err != nil { + log.Errorf(ctx, err, "Schema not found for @type: %s at path: %s", obj.Type, obj.Path) + return fmt.Errorf("at %s: %w", obj.Path, err) + } + + // Validate object against schema (same options as Level 1) + opts := []openapi3.SchemaValidationOption{ + openapi3.VisitAsRequest(), + openapi3.EnableFormatValidation(), + } + if err := schema.Value.VisitJSON(obj.Data, opts...); err != nil { + log.Debugf(ctx, "Validation failed for @type: %s at path: %s: %v", obj.Type, obj.Path, err) + return fmt.Errorf("at %s: %w", obj.Path, err) + } + + log.Debugf(ctx, "Validation passed for @type: %s at path: %s", obj.Type, obj.Path) + return nil +} From 5739573da0abc1619de5bad710224b498575ce7e Mon Sep 17 00:00:00 2001 From: ameersohel45 Date: Fri, 12 Dec 2025 12:14:11 +0530 Subject: [PATCH 2/9] ref: remove url transform config --- .../schemav2validator/cmd/plugin.go | 18 ++++---- .../schemav2validator/schemav2validator.go | 44 ++++++++----------- 2 files changed, 26 insertions(+), 36 deletions(-) diff --git a/pkg/plugin/implementation/schemav2validator/cmd/plugin.go b/pkg/plugin/implementation/schemav2validator/cmd/plugin.go index 8bdfe82..ab2541e 100644 --- a/pkg/plugin/implementation/schemav2validator/cmd/plugin.go +++ b/pkg/plugin/implementation/schemav2validator/cmd/plugin.go @@ -41,34 +41,32 @@ func (vp schemav2ValidatorProvider) New(ctx context.Context, config map[string]s } } - // NEW: Parse enableReferencedSchemas - if enableStr, ok := config["enableReferencedSchemas"]; ok { + // NEW: Parse extendedSchema_enabled + if enableStr, ok := config["extendedSchema_enabled"]; ok { cfg.EnableReferencedSchemas = enableStr == "true" } - // NEW: Parse referencedSchemaConfig (if enabled) + // NEW: Parse Extended Schema config (if enabled) if cfg.EnableReferencedSchemas { - if v, ok := config["referencedSchemaConfig.cacheTTL"]; ok { + if v, ok := config["extendedSchema_cacheTTL"]; ok { if ttl, err := strconv.Atoi(v); err == nil && ttl > 0 { cfg.ReferencedSchemaConfig.CacheTTL = ttl } } - if v, ok := config["referencedSchemaConfig.maxCacheSize"]; ok { + if v, ok := config["extendedSchema_maxCacheSize"]; ok { if size, err := strconv.Atoi(v); err == nil && size > 0 { cfg.ReferencedSchemaConfig.MaxCacheSize = size } } - if v, ok := config["referencedSchemaConfig.downloadTimeout"]; ok { + if v, ok := config["extendedSchema_downloadTimeout"]; ok { if timeout, err := strconv.Atoi(v); err == nil && timeout > 0 { cfg.ReferencedSchemaConfig.DownloadTimeout = timeout } } - if v, ok := config["referencedSchemaConfig.allowedDomains"]; ok && v != "" { + if v, ok := config["extendedSchema_allowedDomains"]; ok && v != "" { cfg.ReferencedSchemaConfig.AllowedDomains = strings.Split(v, ",") } - if v, ok := config["referencedSchemaConfig.urlTransform"]; ok && v != "" { - cfg.ReferencedSchemaConfig.URLTransform = v - } + } return schemav2validator.New(ctx, cfg) diff --git a/pkg/plugin/implementation/schemav2validator/schemav2validator.go b/pkg/plugin/implementation/schemav2validator/schemav2validator.go index 47762cc..28fabba 100644 --- a/pkg/plugin/implementation/schemav2validator/schemav2validator.go +++ b/pkg/plugin/implementation/schemav2validator/schemav2validator.go @@ -56,7 +56,6 @@ type ReferencedSchemaConfig struct { MaxCacheSize int // default 100 DownloadTimeout int // seconds, default 30 AllowedDomains []string // whitelist (empty = all allowed) - URLTransform string // e.g. "context.jsonld->attributes.yaml" } // referencedObject represents ANY object with @context in the request. @@ -169,17 +168,17 @@ func (v *schemav2Validator) Validate(ctx context.Context, reqURL *url.URL, data return v.formatValidationError(err) } - log.Debugf(ctx, "LEVEL 1 validation passed for action: %s", action) + log.Debugf(ctx, "Core schema validation passed for action: %s", action) - // NEW: LEVEL 2 - Referenced schema validation (if enabled) + // NEW: Extended Schema validation (if enabled) if v.config.EnableReferencedSchemas && v.schemaCache != nil { - log.Debugf(ctx, "Starting LEVEL 2 validation for action: %s", action) + log.Debugf(ctx, "Starting Extended Schema validation for action: %s", action) if err := v.validateReferencedSchemas(ctx, jsonData); err != nil { - // Level 2 failure - return error (same behavior as Level 1) - log.Debugf(ctx, "LEVEL 2 validation failed for action %s: %v", action, err) + // Extended Schema failure - return error (same behavior as core schema) + log.Debugf(ctx, "Extended Schema validation failed for action %s: %v", action, err) return v.formatValidationError(err) } - log.Debugf(ctx, "LEVEL 2 validation passed for action: %s", action) + log.Debugf(ctx, "Extended Schema validation passed for action: %s", action) } return nil @@ -416,22 +415,18 @@ func (v *schemav2Validator) validateReferencedSchemas(ctx context.Context, body objects := findReferencedObjects(message, "message") if len(objects) == 0 { - log.Debugf(ctx, "No objects with @context found in message, skipping LEVEL 2 validation") + log.Debugf(ctx, "No objects with @context found in message, skipping Extended Schema validation") return nil } - log.Debugf(ctx, "Found %d objects with @context for LEVEL 2 validation", len(objects)) + log.Debugf(ctx, "Found %d objects with @context for Extended Schema validation", len(objects)) // Get config with defaults - urlTransform := "context.jsonld->attributes.yaml" ttl := 86400 * time.Second // 24 hours default timeout := 30 * time.Second var allowedDomains []string refConfig := v.config.ReferencedSchemaConfig - if refConfig.URLTransform != "" { - urlTransform = refConfig.URLTransform - } if refConfig.CacheTTL > 0 { ttl = time.Duration(refConfig.CacheTTL) * time.Second } @@ -440,8 +435,8 @@ func (v *schemav2Validator) validateReferencedSchemas(ctx context.Context, body } allowedDomains = refConfig.AllowedDomains - log.Debugf(ctx, "LEVEL 2 config: urlTransform=%s, ttl=%v, timeout=%v, allowedDomains=%v", - urlTransform, ttl, timeout, allowedDomains) + log.Debugf(ctx, "Extended Schema config: ttl=%v, timeout=%v, allowedDomains=%v", + ttl, timeout, allowedDomains) // Validate each object and collect errors var errors []string @@ -449,7 +444,7 @@ func (v *schemav2Validator) validateReferencedSchemas(ctx context.Context, body log.Debugf(ctx, "Validating object at path: %s, @context: %s, @type: %s", obj.Path, obj.Context, obj.Type) - if err := v.schemaCache.validateReferencedObject(ctx, obj, urlTransform, ttl, timeout, allowedDomains); err != nil { + if err := v.schemaCache.validateReferencedObject(ctx, obj, ttl, timeout, allowedDomains); err != nil { errors = append(errors, err.Error()) } } @@ -699,15 +694,13 @@ func findReferencedObjects(data interface{}, path string) []referencedObject { } // transformContextToSchemaURL transforms @context URL to schema URL. -func transformContextToSchemaURL(contextURL, transform string) string { - parts := strings.Split(transform, "->") - if len(parts) != 2 { - // Default transformation - return strings.Replace(contextURL, "context.jsonld", "attributes.yaml", 1) - } - return strings.Replace(contextURL, strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]), 1) +func transformContextToSchemaURL(contextURL string) string { + // Hardcoded transformation: context.jsonld -> attributes.yaml + return strings.Replace(contextURL, "context.jsonld", "attributes.yaml", 1) } + + // findSchemaByType finds a schema in the document by @type value. func findSchemaByType(doc *openapi3.T, typeName string) (*openapi3.SchemaRef, error) { if doc.Components == nil || doc.Components.Schemas == nil { @@ -751,7 +744,6 @@ func isAllowedDomain(schemaURL string, allowedDomains []string) bool { func (c *schemaCache) validateReferencedObject( ctx context.Context, obj referencedObject, - urlTransform string, ttl, timeout time.Duration, allowedDomains []string, ) error { @@ -762,7 +754,7 @@ func (c *schemaCache) validateReferencedObject( } // Transform @context to schema path (URL or file) - schemaPath := transformContextToSchemaURL(obj.Context, urlTransform) + schemaPath := transformContextToSchemaURL(obj.Context) log.Debugf(ctx, "Transformed %s -> %s", obj.Context, schemaPath) // Load schema with timeout (supports URL or local file) @@ -778,7 +770,7 @@ func (c *schemaCache) validateReferencedObject( return fmt.Errorf("at %s: %w", obj.Path, err) } - // Validate object against schema (same options as Level 1) + // Validate object against schema (same options as core schema) opts := []openapi3.SchemaValidationOption{ openapi3.VisitAsRequest(), openapi3.EnableFormatValidation(), From 5feb84196cf30c534e1ba9ce47f51ef9f0e4c975 Mon Sep 17 00:00:00 2001 From: ameersohel45 Date: Fri, 12 Dec 2025 12:53:39 +0530 Subject: [PATCH 3/9] ref: Split Extended Schema logic and rename variables --- .../schemav2validator/cmd/plugin.go | 12 +- .../schemav2validator/extended_schema.go | 379 +++++++++++++++++ .../schemav2validator/schemav2validator.go | 392 +----------------- 3 files changed, 399 insertions(+), 384 deletions(-) create mode 100644 pkg/plugin/implementation/schemav2validator/extended_schema.go diff --git a/pkg/plugin/implementation/schemav2validator/cmd/plugin.go b/pkg/plugin/implementation/schemav2validator/cmd/plugin.go index ab2541e..f93712f 100644 --- a/pkg/plugin/implementation/schemav2validator/cmd/plugin.go +++ b/pkg/plugin/implementation/schemav2validator/cmd/plugin.go @@ -43,28 +43,28 @@ func (vp schemav2ValidatorProvider) New(ctx context.Context, config map[string]s // NEW: Parse extendedSchema_enabled if enableStr, ok := config["extendedSchema_enabled"]; ok { - cfg.EnableReferencedSchemas = enableStr == "true" + cfg.EnableExtendedSchema = enableStr == "true" } // NEW: Parse Extended Schema config (if enabled) - if cfg.EnableReferencedSchemas { + if cfg.EnableExtendedSchema { if v, ok := config["extendedSchema_cacheTTL"]; ok { if ttl, err := strconv.Atoi(v); err == nil && ttl > 0 { - cfg.ReferencedSchemaConfig.CacheTTL = ttl + cfg.ExtendedSchemaConfig.CacheTTL = ttl } } if v, ok := config["extendedSchema_maxCacheSize"]; ok { if size, err := strconv.Atoi(v); err == nil && size > 0 { - cfg.ReferencedSchemaConfig.MaxCacheSize = size + cfg.ExtendedSchemaConfig.MaxCacheSize = size } } if v, ok := config["extendedSchema_downloadTimeout"]; ok { if timeout, err := strconv.Atoi(v); err == nil && timeout > 0 { - cfg.ReferencedSchemaConfig.DownloadTimeout = timeout + cfg.ExtendedSchemaConfig.DownloadTimeout = timeout } } if v, ok := config["extendedSchema_allowedDomains"]; ok && v != "" { - cfg.ReferencedSchemaConfig.AllowedDomains = strings.Split(v, ",") + cfg.ExtendedSchemaConfig.AllowedDomains = strings.Split(v, ",") } } diff --git a/pkg/plugin/implementation/schemav2validator/extended_schema.go b/pkg/plugin/implementation/schemav2validator/extended_schema.go new file mode 100644 index 0000000..300477d --- /dev/null +++ b/pkg/plugin/implementation/schemav2validator/extended_schema.go @@ -0,0 +1,379 @@ +package schemav2validator + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "net/url" + "strings" + "sync" + "time" + + "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/getkin/kin-openapi/openapi3" +) + +// ExtendedSchemaConfig holds configuration for referenced schema validation. +type ExtendedSchemaConfig struct { + CacheTTL int // seconds, default 86400 (24h) + MaxCacheSize int // default 100 + DownloadTimeout int // seconds, default 30 + AllowedDomains []string // whitelist (empty = all allowed) +} + +// referencedObject represents ANY object with @context in the request. +type referencedObject struct { + Path string + Context string + Type string + Data map[string]interface{} +} + +// schemaCache caches loaded domain schemas with LRU eviction. +type schemaCache struct { + mu sync.RWMutex + schemas map[string]*cachedDomainSchema + maxSize int +} + +// cachedDomainSchema holds a cached domain schema with metadata. +type cachedDomainSchema struct { + doc *openapi3.T + loadedAt time.Time + expiresAt time.Time + lastAccessed time.Time + accessCount int64 +} + +// validateExtendedSchemas validates all objects with @context against their schemas. +func (v *schemav2Validator) validateExtendedSchemas(ctx context.Context, body interface{}) error { + // Extract "message" object - only scan inside message, not root + bodyMap, ok := body.(map[string]interface{}) + if !ok { + return fmt.Errorf("body is not a valid JSON object") + } + + message, hasMessage := bodyMap["message"] + if !hasMessage { + return fmt.Errorf("missing 'message' field in request body") + } + + // Find all objects with @context starting from message + objects := findReferencedObjects(message, "message") + + if len(objects) == 0 { + log.Debugf(ctx, "No objects with @context found in message, skipping Extended Schema validation") + return nil + } + + log.Debugf(ctx, "Found %d objects with @context for Extended Schema validation", len(objects)) + + // Get config with defaults + ttl := 86400 * time.Second // 24 hours default + timeout := 30 * time.Second + var allowedDomains []string + + refConfig := v.config.ExtendedSchemaConfig + if refConfig.CacheTTL > 0 { + ttl = time.Duration(refConfig.CacheTTL) * time.Second + } + if refConfig.DownloadTimeout > 0 { + timeout = time.Duration(refConfig.DownloadTimeout) * time.Second + } + allowedDomains = refConfig.AllowedDomains + + log.Debugf(ctx, "Extended Schema config: ttl=%v, timeout=%v, allowedDomains=%v", + ttl, timeout, allowedDomains) + + // Validate each object and collect errors + var errors []string + for _, obj := range objects { + log.Debugf(ctx, "Validating object at path: %s, @context: %s, @type: %s", + obj.Path, obj.Context, obj.Type) + + if err := v.schemaCache.validateReferencedObject(ctx, obj, ttl, timeout, allowedDomains); err != nil { + errors = append(errors, err.Error()) + } + } + + if len(errors) > 0 { + return fmt.Errorf("validation errors:\n - %s", strings.Join(errors, "\n - ")) + } + + return nil +} + +// newSchemaCache creates a new schema cache. +func newSchemaCache(maxSize int) *schemaCache { + return &schemaCache{ + schemas: make(map[string]*cachedDomainSchema), + maxSize: maxSize, + } +} + +// hashURL creates a SHA256 hash of the URL for use as cache key. +func hashURL(urlStr string) string { + hash := sha256.Sum256([]byte(urlStr)) + return hex.EncodeToString(hash[:]) +} + +// isValidSchemaPath validates if the schema path is safe to load. +func isValidSchemaPath(schemaPath string) bool { + u, err := url.Parse(schemaPath) + if err != nil { + // Could be a simple file path + return schemaPath != "" + } + // Support: http://, https://, file://, or no scheme (local path) + return u.Scheme == "http" || u.Scheme == "https" || + u.Scheme == "file" || u.Scheme == "" +} + +// get retrieves a cached schema and updates access tracking. +func (c *schemaCache) get(urlHash string) (*openapi3.T, bool) { + c.mu.Lock() + defer c.mu.Unlock() + + cached, exists := c.schemas[urlHash] + if !exists || time.Now().After(cached.expiresAt) { + return nil, false + } + + // Update access tracking + cached.lastAccessed = time.Now() + cached.accessCount++ + + return cached.doc, true +} + +// set stores a schema in the cache with TTL and LRU eviction. +func (c *schemaCache) set(urlHash string, doc *openapi3.T, ttl time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + + // LRU eviction if cache is full + if len(c.schemas) >= c.maxSize { + var oldest string + var oldestTime time.Time + for k, v := range c.schemas { + if oldest == "" || v.lastAccessed.Before(oldestTime) { + oldest, oldestTime = k, v.lastAccessed + } + } + if oldest != "" { + delete(c.schemas, oldest) + } + } + + c.schemas[urlHash] = &cachedDomainSchema{ + doc: doc, + loadedAt: time.Now(), + expiresAt: time.Now().Add(ttl), + lastAccessed: time.Now(), + accessCount: 1, + } +} + +// cleanupExpired removes expired schemas from cache. +func (c *schemaCache) cleanupExpired() int { + c.mu.Lock() + defer c.mu.Unlock() + + now := time.Now() + expired := make([]string, 0) + + for urlHash, cached := range c.schemas { + if now.After(cached.expiresAt) { + expired = append(expired, urlHash) + } + } + + for _, urlHash := range expired { + delete(c.schemas, urlHash) + } + + return len(expired) +} + +// loadSchemaFromPath loads a schema from URL or local file with timeout and caching. +func (c *schemaCache) loadSchemaFromPath(ctx context.Context, schemaPath string, ttl, timeout time.Duration) (*openapi3.T, error) { + urlHash := hashURL(schemaPath) + + // Check cache first + if doc, found := c.get(urlHash); found { + log.Debugf(ctx, "Schema cache hit for: %s", schemaPath) + return doc, nil + } + + log.Debugf(ctx, "Schema cache miss, loading from: %s", schemaPath) + + // Validate path format + if !isValidSchemaPath(schemaPath) { + return nil, fmt.Errorf("invalid schema path: %s", schemaPath) + } + + loader := openapi3.NewLoader() + loader.IsExternalRefsAllowed = true + + var doc *openapi3.T + var err error + + u, parseErr := url.Parse(schemaPath) + if parseErr == nil && (u.Scheme == "http" || u.Scheme == "https") { + // Load from URL with timeout + loadCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + loader.Context = loadCtx + doc, err = loader.LoadFromURI(u) + } else { + // Load from local file (file:// or path) + filePath := schemaPath + if u != nil && u.Scheme == "file" { + filePath = u.Path + } + doc, err = loader.LoadFromFile(filePath) + } + + if err != nil { + log.Errorf(ctx, err, "Failed to load schema from: %s", schemaPath) + return nil, fmt.Errorf("failed to load schema from %s: %w", schemaPath, err) + } + + // Validate loaded schema (non-blocking, just log warnings) + if err := doc.Validate(ctx); err != nil { + log.Debugf(ctx, "Schema validation warnings for %s: %v", schemaPath, err) + } + + c.set(urlHash, doc, ttl) + log.Debugf(ctx, "Loaded and cached schema from: %s", schemaPath) + + return doc, nil +} + +// findReferencedObjects recursively finds ALL objects with @context in the data. +func findReferencedObjects(data interface{}, path string) []referencedObject { + var results []referencedObject + + switch v := data.(type) { + case map[string]interface{}: + // Check for @context and @type + if contextVal, hasContext := v["@context"].(string); hasContext { + if typeVal, hasType := v["@type"].(string); hasType { + results = append(results, referencedObject{ + Path: path, + Context: contextVal, + Type: typeVal, + Data: v, + }) + } + } + + // Recurse into nested objects + for key, val := range v { + newPath := key + if path != "" { + newPath = path + "." + key + } + results = append(results, findReferencedObjects(val, newPath)...) + } + + case []interface{}: + // Recurse into arrays + for i, item := range v { + newPath := fmt.Sprintf("%s[%d]", path, i) + results = append(results, findReferencedObjects(item, newPath)...) + } + } + + return results +} + +// transformContextToSchemaURL transforms @context URL to schema URL. +func transformContextToSchemaURL(contextURL string) string { + // Hardcoded transformation: context.jsonld -> attributes.yaml + return strings.Replace(contextURL, "context.jsonld", "attributes.yaml", 1) +} + +// findSchemaByType finds a schema in the document by @type value. +func findSchemaByType(doc *openapi3.T, typeName string) (*openapi3.SchemaRef, error) { + if doc.Components == nil || doc.Components.Schemas == nil { + return nil, fmt.Errorf("no schemas found in document") + } + + // Try direct match by schema name + if schema, exists := doc.Components.Schemas[typeName]; exists { + return schema, nil + } + + // Fallback: Try x-jsonld.@type match + for _, schema := range doc.Components.Schemas { + if schema.Value == nil { + continue + } + if xJsonld, ok := schema.Value.Extensions["x-jsonld"].(map[string]interface{}); ok { + if atType, ok := xJsonld["@type"].(string); ok && atType == typeName { + return schema, nil + } + } + } + + return nil, fmt.Errorf("no schema found for @type: %s", typeName) +} + +// isAllowedDomain checks if the URL domain is in the whitelist. +func isAllowedDomain(schemaURL string, allowedDomains []string) bool { + if len(allowedDomains) == 0 { + return true // No whitelist = all allowed + } + for _, domain := range allowedDomains { + if strings.Contains(schemaURL, domain) { + return true + } + } + return false +} + +// validateReferencedObject validates a single object with @context. +func (c *schemaCache) validateReferencedObject( + ctx context.Context, + obj referencedObject, + ttl, timeout time.Duration, + allowedDomains []string, +) error { + // Domain whitelist check + if !isAllowedDomain(obj.Context, allowedDomains) { + log.Warnf(ctx, "Domain not in whitelist: %s", obj.Context) + return fmt.Errorf("domain not allowed: %s", obj.Context) + } + + // Transform @context to schema path (URL or file) + schemaPath := transformContextToSchemaURL(obj.Context) + log.Debugf(ctx, "Transformed %s -> %s", obj.Context, schemaPath) + + // Load schema with timeout (supports URL or local file) + doc, err := c.loadSchemaFromPath(ctx, schemaPath, ttl, timeout) + if err != nil { + return fmt.Errorf("at %s: %w", obj.Path, err) + } + + // Find schema by @type + schema, err := findSchemaByType(doc, obj.Type) + if err != nil { + log.Errorf(ctx, err, "Schema not found for @type: %s at path: %s", obj.Type, obj.Path) + return fmt.Errorf("at %s: %w", obj.Path, err) + } + + // Validate object against schema (same options as core schema) + opts := []openapi3.SchemaValidationOption{ + openapi3.VisitAsRequest(), + openapi3.EnableFormatValidation(), + } + if err := schema.Value.VisitJSON(obj.Data, opts...); err != nil { + log.Debugf(ctx, "Validation failed for @type: %s at path: %s: %v", obj.Type, obj.Path, err) + return fmt.Errorf("at %s: %w", obj.Path, err) + } + + log.Debugf(ctx, "Validation passed for @type: %s at path: %s", obj.Type, obj.Path) + return nil +} diff --git a/pkg/plugin/implementation/schemav2validator/schemav2validator.go b/pkg/plugin/implementation/schemav2validator/schemav2validator.go index 28fabba..1c6e2e9 100644 --- a/pkg/plugin/implementation/schemav2validator/schemav2validator.go +++ b/pkg/plugin/implementation/schemav2validator/schemav2validator.go @@ -2,8 +2,6 @@ package schemav2validator import ( "context" - "crypto/sha256" - "encoding/hex" "encoding/json" "fmt" "net/url" @@ -45,42 +43,12 @@ type Config struct { Location string // URL, file path, or directory path CacheTTL int - // NEW: Referenced schema configuration - EnableReferencedSchemas bool - ReferencedSchemaConfig ReferencedSchemaConfig + // NEW: Extended Schema configuration + EnableExtendedSchema bool + ExtendedSchemaConfig ExtendedSchemaConfig } -// ReferencedSchemaConfig holds configuration for referenced schema validation. -type ReferencedSchemaConfig struct { - CacheTTL int // seconds, default 86400 (24h) - MaxCacheSize int // default 100 - DownloadTimeout int // seconds, default 30 - AllowedDomains []string // whitelist (empty = all allowed) -} -// referencedObject represents ANY object with @context in the request. -type referencedObject struct { - Path string - Context string - Type string - Data map[string]interface{} -} - -// schemaCache caches loaded domain schemas with LRU eviction. -type schemaCache struct { - mu sync.RWMutex - schemas map[string]*cachedDomainSchema - maxSize int -} - -// cachedDomainSchema holds a cached domain schema with metadata. -type cachedDomainSchema struct { - doc *openapi3.T - loadedAt time.Time - expiresAt time.Time - lastAccessed time.Time - accessCount int64 -} // New creates a new Schemav2Validator instance. func New(ctx context.Context, config *Config) (*schemav2Validator, func() error, error) { @@ -105,14 +73,14 @@ func New(ctx context.Context, config *Config) (*schemav2Validator, func() error, config: config, } - // NEW: Initialize referenced schema cache if enabled - if config.EnableReferencedSchemas { + // NEW: Initialize extended schema cache if enabled + if config.EnableExtendedSchema { maxSize := 100 - if config.ReferencedSchemaConfig.MaxCacheSize > 0 { - maxSize = config.ReferencedSchemaConfig.MaxCacheSize + if config.ExtendedSchemaConfig.MaxCacheSize > 0 { + maxSize = config.ExtendedSchemaConfig.MaxCacheSize } v.schemaCache = newSchemaCache(maxSize) - log.Infof(ctx, "Initialized referenced schema cache with max size: %d", maxSize) + log.Infof(ctx, "Initialized extended schema cache with max size: %d", maxSize) } if err := v.initialise(ctx); err != nil { @@ -171,9 +139,9 @@ func (v *schemav2Validator) Validate(ctx context.Context, reqURL *url.URL, data log.Debugf(ctx, "Core schema validation passed for action: %s", action) // NEW: Extended Schema validation (if enabled) - if v.config.EnableReferencedSchemas && v.schemaCache != nil { + if v.config.EnableExtendedSchema && v.schemaCache != nil { log.Debugf(ctx, "Starting Extended Schema validation for action: %s", action) - if err := v.validateReferencedSchemas(ctx, jsonData); err != nil { + if err := v.validateExtendedSchemas(ctx, jsonData); err != nil { // Extended Schema failure - return error (same behavior as core schema) log.Debugf(ctx, "Extended Schema validation failed for action %s: %v", action, err) return v.formatValidationError(err) @@ -246,10 +214,10 @@ func (v *schemav2Validator) refreshLoop(ctx context.Context) { coreTicker := time.NewTicker(time.Duration(v.config.CacheTTL) * time.Second) defer coreTicker.Stop() - // NEW: Ticker for referenced schema cleanup + // NEW: Ticker for extended schema cleanup var refTicker *time.Ticker - if v.config.EnableReferencedSchemas { - ttl := v.config.ReferencedSchemaConfig.CacheTTL + if v.config.EnableExtendedSchema { + ttl := v.config.ExtendedSchemaConfig.CacheTTL if ttl <= 0 { ttl = 86400 // Default 24 hours } @@ -268,7 +236,7 @@ func (v *schemav2Validator) refreshLoop(ctx context.Context) { if v.schemaCache != nil { count := v.schemaCache.cleanupExpired() if count > 0 { - log.Debugf(ctx, "Cleaned up %d expired referenced schemas", count) + log.Debugf(ctx, "Cleaned up %d expired extended schemas", count) } } } @@ -398,64 +366,6 @@ func (v *schemav2Validator) buildActionIndex(ctx context.Context, doc *openapi3. return actionSchemas } -// validateReferencedSchemas validates all objects with @context against their schemas. -func (v *schemav2Validator) validateReferencedSchemas(ctx context.Context, body interface{}) error { - // Extract "message" object - only scan inside message, not root - bodyMap, ok := body.(map[string]interface{}) - if !ok { - return fmt.Errorf("body is not a valid JSON object") - } - - message, hasMessage := bodyMap["message"] - if !hasMessage { - return fmt.Errorf("missing 'message' field in request body") - } - - // Find all objects with @context starting from message - objects := findReferencedObjects(message, "message") - - if len(objects) == 0 { - log.Debugf(ctx, "No objects with @context found in message, skipping Extended Schema validation") - return nil - } - - log.Debugf(ctx, "Found %d objects with @context for Extended Schema validation", len(objects)) - - // Get config with defaults - ttl := 86400 * time.Second // 24 hours default - timeout := 30 * time.Second - var allowedDomains []string - - refConfig := v.config.ReferencedSchemaConfig - if refConfig.CacheTTL > 0 { - ttl = time.Duration(refConfig.CacheTTL) * time.Second - } - if refConfig.DownloadTimeout > 0 { - timeout = time.Duration(refConfig.DownloadTimeout) * time.Second - } - allowedDomains = refConfig.AllowedDomains - - log.Debugf(ctx, "Extended Schema config: ttl=%v, timeout=%v, allowedDomains=%v", - ttl, timeout, allowedDomains) - - // Validate each object and collect errors - var errors []string - for _, obj := range objects { - log.Debugf(ctx, "Validating object at path: %s, @context: %s, @type: %s", - obj.Path, obj.Context, obj.Type) - - if err := v.schemaCache.validateReferencedObject(ctx, obj, ttl, timeout, allowedDomains); err != nil { - errors = append(errors, err.Error()) - } - } - - if len(errors) > 0 { - return fmt.Errorf("validation errors:\n - %s", strings.Join(errors, "\n - ")) - } - - return nil -} - // extractActionFromSchema extracts the action value from a schema. func (v *schemav2Validator) extractActionFromSchema(schema *openapi3.Schema) string { // Check direct properties @@ -508,278 +418,4 @@ func (v *schemav2Validator) getActionValue(contextSchema *openapi3.Schema) strin return "" } -// newSchemaCache creates a new schema cache. -func newSchemaCache(maxSize int) *schemaCache { - return &schemaCache{ - schemas: make(map[string]*cachedDomainSchema), - maxSize: maxSize, - } -} -// hashURL creates a SHA256 hash of the URL for use as cache key. -func hashURL(urlStr string) string { - hash := sha256.Sum256([]byte(urlStr)) - return hex.EncodeToString(hash[:]) -} - -// isValidSchemaPath validates if the schema path is safe to load. -func isValidSchemaPath(schemaPath string) bool { - u, err := url.Parse(schemaPath) - if err != nil { - // Could be a simple file path - return schemaPath != "" - } - // Support: http://, https://, file://, or no scheme (local path) - return u.Scheme == "http" || u.Scheme == "https" || - u.Scheme == "file" || u.Scheme == "" -} - -// get retrieves a cached schema and updates access tracking. -func (c *schemaCache) get(urlHash string) (*openapi3.T, bool) { - c.mu.Lock() - defer c.mu.Unlock() - - cached, exists := c.schemas[urlHash] - if !exists || time.Now().After(cached.expiresAt) { - return nil, false - } - - // Update access tracking - cached.lastAccessed = time.Now() - cached.accessCount++ - - return cached.doc, true -} - -// set stores a schema in the cache with TTL and LRU eviction. -func (c *schemaCache) set(urlHash string, doc *openapi3.T, ttl time.Duration) { - c.mu.Lock() - defer c.mu.Unlock() - - // LRU eviction if cache is full - if len(c.schemas) >= c.maxSize { - var oldest string - var oldestTime time.Time - for k, v := range c.schemas { - if oldest == "" || v.lastAccessed.Before(oldestTime) { - oldest, oldestTime = k, v.lastAccessed - } - } - if oldest != "" { - delete(c.schemas, oldest) - } - } - - c.schemas[urlHash] = &cachedDomainSchema{ - doc: doc, - loadedAt: time.Now(), - expiresAt: time.Now().Add(ttl), - lastAccessed: time.Now(), - accessCount: 1, - } -} - -// cleanupExpired removes expired schemas from cache. -func (c *schemaCache) cleanupExpired() int { - c.mu.Lock() - defer c.mu.Unlock() - - now := time.Now() - expired := make([]string, 0) - - for urlHash, cached := range c.schemas { - if now.After(cached.expiresAt) { - expired = append(expired, urlHash) - } - } - - for _, urlHash := range expired { - delete(c.schemas, urlHash) - } - - return len(expired) -} - -// loadSchemaFromPath loads a schema from URL or local file with timeout and caching. -func (c *schemaCache) loadSchemaFromPath(ctx context.Context, schemaPath string, ttl, timeout time.Duration) (*openapi3.T, error) { - urlHash := hashURL(schemaPath) - - // Check cache first - if doc, found := c.get(urlHash); found { - log.Debugf(ctx, "Schema cache hit for: %s", schemaPath) - return doc, nil - } - - log.Debugf(ctx, "Schema cache miss, loading from: %s", schemaPath) - - // Validate path format - if !isValidSchemaPath(schemaPath) { - return nil, fmt.Errorf("invalid schema path: %s", schemaPath) - } - - loader := openapi3.NewLoader() - loader.IsExternalRefsAllowed = true - - var doc *openapi3.T - var err error - - u, parseErr := url.Parse(schemaPath) - if parseErr == nil && (u.Scheme == "http" || u.Scheme == "https") { - // Load from URL with timeout - loadCtx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - loader.Context = loadCtx - doc, err = loader.LoadFromURI(u) - } else { - // Load from local file (file:// or path) - filePath := schemaPath - if u != nil && u.Scheme == "file" { - filePath = u.Path - } - doc, err = loader.LoadFromFile(filePath) - } - - if err != nil { - log.Errorf(ctx, err, "Failed to load schema from: %s", schemaPath) - return nil, fmt.Errorf("failed to load schema from %s: %w", schemaPath, err) - } - - // Validate loaded schema (non-blocking, just log warnings) - if err := doc.Validate(ctx); err != nil { - log.Debugf(ctx, "Schema validation warnings for %s: %v", schemaPath, err) - } - - c.set(urlHash, doc, ttl) - log.Debugf(ctx, "Loaded and cached schema from: %s", schemaPath) - - return doc, nil -} - -// findReferencedObjects recursively finds ALL objects with @context in the data. -func findReferencedObjects(data interface{}, path string) []referencedObject { - var results []referencedObject - - switch v := data.(type) { - case map[string]interface{}: - // Check for @context and @type - if contextVal, hasContext := v["@context"].(string); hasContext { - if typeVal, hasType := v["@type"].(string); hasType { - results = append(results, referencedObject{ - Path: path, - Context: contextVal, - Type: typeVal, - Data: v, - }) - } - } - - // Recurse into nested objects - for key, val := range v { - newPath := key - if path != "" { - newPath = path + "." + key - } - results = append(results, findReferencedObjects(val, newPath)...) - } - - case []interface{}: - // Recurse into arrays - for i, item := range v { - newPath := fmt.Sprintf("%s[%d]", path, i) - results = append(results, findReferencedObjects(item, newPath)...) - } - } - - return results -} - -// transformContextToSchemaURL transforms @context URL to schema URL. -func transformContextToSchemaURL(contextURL string) string { - // Hardcoded transformation: context.jsonld -> attributes.yaml - return strings.Replace(contextURL, "context.jsonld", "attributes.yaml", 1) -} - - - -// findSchemaByType finds a schema in the document by @type value. -func findSchemaByType(doc *openapi3.T, typeName string) (*openapi3.SchemaRef, error) { - if doc.Components == nil || doc.Components.Schemas == nil { - return nil, fmt.Errorf("no schemas found in document") - } - - // Try direct match by schema name - if schema, exists := doc.Components.Schemas[typeName]; exists { - return schema, nil - } - - // Fallback: Try x-jsonld.@type match - for _, schema := range doc.Components.Schemas { - if schema.Value == nil { - continue - } - if xJsonld, ok := schema.Value.Extensions["x-jsonld"].(map[string]interface{}); ok { - if atType, ok := xJsonld["@type"].(string); ok && atType == typeName { - return schema, nil - } - } - } - - return nil, fmt.Errorf("no schema found for @type: %s", typeName) -} - -// isAllowedDomain checks if the URL domain is in the whitelist. -func isAllowedDomain(schemaURL string, allowedDomains []string) bool { - if len(allowedDomains) == 0 { - return true // No whitelist = all allowed - } - for _, domain := range allowedDomains { - if strings.Contains(schemaURL, domain) { - return true - } - } - return false -} - -// validateReferencedObject validates a single object with @context. -func (c *schemaCache) validateReferencedObject( - ctx context.Context, - obj referencedObject, - ttl, timeout time.Duration, - allowedDomains []string, -) error { - // Domain whitelist check - if !isAllowedDomain(obj.Context, allowedDomains) { - log.Warnf(ctx, "Domain not in whitelist: %s", obj.Context) - return fmt.Errorf("domain not allowed: %s", obj.Context) - } - - // Transform @context to schema path (URL or file) - schemaPath := transformContextToSchemaURL(obj.Context) - log.Debugf(ctx, "Transformed %s -> %s", obj.Context, schemaPath) - - // Load schema with timeout (supports URL or local file) - doc, err := c.loadSchemaFromPath(ctx, schemaPath, ttl, timeout) - if err != nil { - return fmt.Errorf("at %s: %w", obj.Path, err) - } - - // Find schema by @type - schema, err := findSchemaByType(doc, obj.Type) - if err != nil { - log.Errorf(ctx, err, "Schema not found for @type: %s at path: %s", obj.Type, obj.Path) - return fmt.Errorf("at %s: %w", obj.Path, err) - } - - // Validate object against schema (same options as core schema) - opts := []openapi3.SchemaValidationOption{ - openapi3.VisitAsRequest(), - openapi3.EnableFormatValidation(), - } - if err := schema.Value.VisitJSON(obj.Data, opts...); err != nil { - log.Debugf(ctx, "Validation failed for @type: %s at path: %s: %v", obj.Type, obj.Path, err) - return fmt.Errorf("at %s: %w", obj.Path, err) - } - - log.Debugf(ctx, "Validation passed for @type: %s at path: %s", obj.Type, obj.Path) - return nil -} From 5843d2a760245f5abf346501c8058ebe938d66b5 Mon Sep 17 00:00:00 2001 From: ameersohel45 Date: Fri, 12 Dec 2025 15:10:21 +0530 Subject: [PATCH 4/9] update: logs and comments also ref refreshLoop method --- .../schemav2validator/extended_schema.go | 12 +++-- .../schemav2validator/schemav2validator.go | 46 ++++++++----------- 2 files changed, 27 insertions(+), 31 deletions(-) diff --git a/pkg/plugin/implementation/schemav2validator/extended_schema.go b/pkg/plugin/implementation/schemav2validator/extended_schema.go index 300477d..8b18ddd 100644 --- a/pkg/plugin/implementation/schemav2validator/extended_schema.go +++ b/pkg/plugin/implementation/schemav2validator/extended_schema.go @@ -48,7 +48,7 @@ type cachedDomainSchema struct { // validateExtendedSchemas validates all objects with @context against their schemas. func (v *schemav2Validator) validateExtendedSchemas(ctx context.Context, body interface{}) error { - // Extract "message" object - only scan inside message, not root + // Extract "message" object - only scan inside message bodyMap, ok := body.(map[string]interface{}) if !ok { return fmt.Errorf("body is not a valid JSON object") @@ -291,28 +291,30 @@ func findReferencedObjects(data interface{}, path string) []referencedObject { // transformContextToSchemaURL transforms @context URL to schema URL. func transformContextToSchemaURL(contextURL string) string { - // Hardcoded transformation: context.jsonld -> attributes.yaml + // transformation: context.jsonld -> attributes.yaml return strings.Replace(contextURL, "context.jsonld", "attributes.yaml", 1) } // findSchemaByType finds a schema in the document by @type value. -func findSchemaByType(doc *openapi3.T, typeName string) (*openapi3.SchemaRef, error) { +func findSchemaByType(ctx context.Context, doc *openapi3.T, typeName string) (*openapi3.SchemaRef, error) { if doc.Components == nil || doc.Components.Schemas == nil { return nil, fmt.Errorf("no schemas found in document") } // Try direct match by schema name if schema, exists := doc.Components.Schemas[typeName]; exists { + log.Debugf(ctx, "Found schema by direct match: %s", typeName) return schema, nil } // Fallback: Try x-jsonld.@type match - for _, schema := range doc.Components.Schemas { + for name, schema := range doc.Components.Schemas { if schema.Value == nil { continue } if xJsonld, ok := schema.Value.Extensions["x-jsonld"].(map[string]interface{}); ok { if atType, ok := xJsonld["@type"].(string); ok && atType == typeName { + log.Debugf(ctx, "Found schema by x-jsonld.@type match: %s (mapped to %s)", typeName, name) return schema, nil } } @@ -358,7 +360,7 @@ func (c *schemaCache) validateReferencedObject( } // Find schema by @type - schema, err := findSchemaByType(doc, obj.Type) + schema, err := findSchemaByType(ctx, doc, obj.Type) if err != nil { log.Errorf(ctx, err, "Schema not found for @type: %s at path: %s", obj.Type, obj.Path) return fmt.Errorf("at %s: %w", obj.Path, err) diff --git a/pkg/plugin/implementation/schemav2validator/schemav2validator.go b/pkg/plugin/implementation/schemav2validator/schemav2validator.go index 1c6e2e9..de6c5e0 100644 --- a/pkg/plugin/implementation/schemav2validator/schemav2validator.go +++ b/pkg/plugin/implementation/schemav2validator/schemav2validator.go @@ -27,7 +27,7 @@ type schemav2Validator struct { config *Config spec *cachedSpec specMutex sync.RWMutex - schemaCache *schemaCache // NEW: cache for referenced schemas + schemaCache *schemaCache // cache for extended schemas } // cachedSpec holds a cached OpenAPI spec. @@ -43,7 +43,7 @@ type Config struct { Location string // URL, file path, or directory path CacheTTL int - // NEW: Extended Schema configuration + // Extended Schema configuration EnableExtendedSchema bool ExtendedSchemaConfig ExtendedSchemaConfig } @@ -73,7 +73,7 @@ func New(ctx context.Context, config *Config) (*schemav2Validator, func() error, config: config, } - // NEW: Initialize extended schema cache if enabled + // Initialize extended schema cache if enabled if config.EnableExtendedSchema { maxSize := 100 if config.ExtendedSchemaConfig.MaxCacheSize > 0 { @@ -136,13 +136,13 @@ func (v *schemav2Validator) Validate(ctx context.Context, reqURL *url.URL, data return v.formatValidationError(err) } - log.Debugf(ctx, "Core schema validation passed for action: %s", action) + log.Debugf(ctx, "base schema validation passed for action: %s", action) - // NEW: Extended Schema validation (if enabled) + // Extended Schema validation (if enabled) if v.config.EnableExtendedSchema && v.schemaCache != nil { log.Debugf(ctx, "Starting Extended Schema validation for action: %s", action) if err := v.validateExtendedSchemas(ctx, jsonData); err != nil { - // Extended Schema failure - return error (same behavior as core schema) + // Extended Schema failure - return error. log.Debugf(ctx, "Extended Schema validation failed for action %s: %v", action, err) return v.formatValidationError(err) } @@ -214,8 +214,10 @@ func (v *schemav2Validator) refreshLoop(ctx context.Context) { coreTicker := time.NewTicker(time.Duration(v.config.CacheTTL) * time.Second) defer coreTicker.Stop() - // NEW: Ticker for extended schema cleanup + // Ticker for extended schema cleanup var refTicker *time.Ticker + var refTickerCh <-chan time.Time // Default nil, blocks forever + if v.config.EnableExtendedSchema { ttl := v.config.ExtendedSchemaConfig.CacheTTL if ttl <= 0 { @@ -223,30 +225,22 @@ func (v *schemav2Validator) refreshLoop(ctx context.Context) { } refTicker = time.NewTicker(time.Duration(ttl) * time.Second) defer refTicker.Stop() + refTickerCh = refTicker.C } for { - if refTicker != nil { - select { - case <-ctx.Done(): - return - case <-coreTicker.C: - v.reloadExpiredSpec(ctx) - case <-refTicker.C: - if v.schemaCache != nil { - count := v.schemaCache.cleanupExpired() - if count > 0 { - log.Debugf(ctx, "Cleaned up %d expired extended schemas", count) - } + select { + case <-ctx.Done(): + return + case <-coreTicker.C: + v.reloadExpiredSpec(ctx) + case <-refTickerCh: + if v.schemaCache != nil { + count := v.schemaCache.cleanupExpired() + if count > 0 { + log.Debugf(ctx, "Cleaned up %d expired extended schemas", count) } } - } else { - select { - case <-ctx.Done(): - return - case <-coreTicker.C: - v.reloadExpiredSpec(ctx) - } } } } From f6b32ef2f2fdcc84d4fdd3b5fdd13544f2b46619 Mon Sep 17 00:00:00 2001 From: ameersohel45 Date: Fri, 12 Dec 2025 15:23:42 +0530 Subject: [PATCH 5/9] fix: error handling in extended schema --- .../implementation/schemav2validator/extended_schema.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/pkg/plugin/implementation/schemav2validator/extended_schema.go b/pkg/plugin/implementation/schemav2validator/extended_schema.go index 8b18ddd..12c38da 100644 --- a/pkg/plugin/implementation/schemav2validator/extended_schema.go +++ b/pkg/plugin/implementation/schemav2validator/extended_schema.go @@ -86,20 +86,17 @@ func (v *schemav2Validator) validateExtendedSchemas(ctx context.Context, body in log.Debugf(ctx, "Extended Schema config: ttl=%v, timeout=%v, allowedDomains=%v", ttl, timeout, allowedDomains) - // Validate each object and collect errors - var errors []string + // Validate each object for _, obj := range objects { log.Debugf(ctx, "Validating object at path: %s, @context: %s, @type: %s", obj.Path, obj.Context, obj.Type) if err := v.schemaCache.validateReferencedObject(ctx, obj, ttl, timeout, allowedDomains); err != nil { - errors = append(errors, err.Error()) + return err } } - if len(errors) > 0 { - return fmt.Errorf("validation errors:\n - %s", strings.Join(errors, "\n - ")) - } + return nil return nil } From f0ccef9e597ec0cb5b1601101e8838cf1b0972ed Mon Sep 17 00:00:00 2001 From: ameersohel45 Date: Fri, 12 Dec 2025 17:14:49 +0530 Subject: [PATCH 6/9] fix: remove redundant code --- .../implementation/schemav2validator/extended_schema.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/plugin/implementation/schemav2validator/extended_schema.go b/pkg/plugin/implementation/schemav2validator/extended_schema.go index 12c38da..104cfaf 100644 --- a/pkg/plugin/implementation/schemav2validator/extended_schema.go +++ b/pkg/plugin/implementation/schemav2validator/extended_schema.go @@ -48,7 +48,7 @@ type cachedDomainSchema struct { // validateExtendedSchemas validates all objects with @context against their schemas. func (v *schemav2Validator) validateExtendedSchemas(ctx context.Context, body interface{}) error { - // Extract "message" object - only scan inside message + // Extract "message" object - scan inside message bodyMap, ok := body.(map[string]interface{}) if !ok { return fmt.Errorf("body is not a valid JSON object") @@ -97,8 +97,6 @@ func (v *schemav2Validator) validateExtendedSchemas(ctx context.Context, body in } return nil - - return nil } // newSchemaCache creates a new schema cache. From cd49b7dda9e73722a4bdac0d0ab33749bc89a338 Mon Sep 17 00:00:00 2001 From: ameersohel45 Date: Sun, 14 Dec 2025 21:49:51 +0530 Subject: [PATCH 7/9] ref: enhance schema validation logic and skip core schema --- .../schemav2validator/extended_schema.go | 42 +++++++++++++------ 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/pkg/plugin/implementation/schemav2validator/extended_schema.go b/pkg/plugin/implementation/schemav2validator/extended_schema.go index 104cfaf..03fad2b 100644 --- a/pkg/plugin/implementation/schemav2validator/extended_schema.go +++ b/pkg/plugin/implementation/schemav2validator/extended_schema.go @@ -22,7 +22,7 @@ type ExtendedSchemaConfig struct { AllowedDomains []string // whitelist (empty = all allowed) } -// referencedObject represents ANY object with @context in the request. +// referencedObject represents a domain-specific object with @context. type referencedObject struct { Path string Context string @@ -46,6 +46,11 @@ type cachedDomainSchema struct { accessCount int64 } +// isCoreSchema checks if @context URL is a core Beckn schema. +func isCoreSchema(contextURL string) bool { + return strings.Contains(contextURL, "/schema/core/") +} + // validateExtendedSchemas validates all objects with @context against their schemas. func (v *schemav2Validator) validateExtendedSchemas(ctx context.Context, body interface{}) error { // Extract "message" object - scan inside message @@ -59,15 +64,15 @@ func (v *schemav2Validator) validateExtendedSchemas(ctx context.Context, body in return fmt.Errorf("missing 'message' field in request body") } - // Find all objects with @context starting from message + // Find domain-specific objects with @context (skip core schemas) objects := findReferencedObjects(message, "message") if len(objects) == 0 { - log.Debugf(ctx, "No objects with @context found in message, skipping Extended Schema validation") + log.Debugf(ctx, "No domain-specific objects with @context found, skipping Extended Schema validation") return nil } - log.Debugf(ctx, "Found %d objects with @context for Extended Schema validation", len(objects)) + log.Debugf(ctx, "Found %d domain-specific objects with @context for Extended Schema validation", len(objects)) // Get config with defaults ttl := 86400 * time.Second // 24 hours default @@ -246,7 +251,7 @@ func (c *schemaCache) loadSchemaFromPath(ctx context.Context, schemaPath string, return doc, nil } -// findReferencedObjects recursively finds ALL objects with @context in the data. +// findReferencedObjects recursively finds domain-specific objects with @context . func findReferencedObjects(data interface{}, path string) []referencedObject { var results []referencedObject @@ -255,12 +260,15 @@ func findReferencedObjects(data interface{}, path string) []referencedObject { // Check for @context and @type if contextVal, hasContext := v["@context"].(string); hasContext { if typeVal, hasType := v["@type"].(string); hasType { - results = append(results, referencedObject{ - Path: path, - Context: contextVal, - Type: typeVal, - Data: v, - }) + // Skip core schemas during traversal + if !isCoreSchema(contextVal) { + results = append(results, referencedObject{ + Path: path, + Context: contextVal, + Type: typeVal, + Data: v, + }) + } } } @@ -361,12 +369,20 @@ func (c *schemaCache) validateReferencedObject( return fmt.Errorf("at %s: %w", obj.Path, err) } - // Validate object against schema (same options as core schema) + // Strip JSON-LD metadata before validation + domainData := make(map[string]interface{}, len(obj.Data)-2) + for k, v := range obj.Data { + if k != "@context" && k != "@type" { + domainData[k] = v + } + } + + // Validate domain-specific data against schema opts := []openapi3.SchemaValidationOption{ openapi3.VisitAsRequest(), openapi3.EnableFormatValidation(), } - if err := schema.Value.VisitJSON(obj.Data, opts...); err != nil { + if err := schema.Value.VisitJSON(domainData, opts...); err != nil { log.Debugf(ctx, "Validation failed for @type: %s at path: %s: %v", obj.Type, obj.Path, err) return fmt.Errorf("at %s: %w", obj.Path, err) } From 3b59507f15b90c82414a0dd72aed3bfc007b3a51 Mon Sep 17 00:00:00 2001 From: ameersohel45 Date: Sun, 14 Dec 2025 23:07:58 +0530 Subject: [PATCH 8/9] ref: improve error handling in extended schema validation --- .../schemav2validator/extended_schema.go | 18 ++++++++++++++++-- .../schemav2validator/schemav2validator.go | 8 ++------ 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/pkg/plugin/implementation/schemav2validator/extended_schema.go b/pkg/plugin/implementation/schemav2validator/extended_schema.go index 03fad2b..92c5ed0 100644 --- a/pkg/plugin/implementation/schemav2validator/extended_schema.go +++ b/pkg/plugin/implementation/schemav2validator/extended_schema.go @@ -11,6 +11,7 @@ import ( "time" "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/beckn-one/beckn-onix/pkg/model" "github.com/getkin/kin-openapi/openapi3" ) @@ -97,7 +98,20 @@ func (v *schemav2Validator) validateExtendedSchemas(ctx context.Context, body in obj.Path, obj.Context, obj.Type) if err := v.schemaCache.validateReferencedObject(ctx, obj, ttl, timeout, allowedDomains); err != nil { - return err + // Extract and prefix error paths + var schemaErrors []model.Error + v.extractSchemaErrors(err, &schemaErrors) + + // Prefix all paths with object path + for i := range schemaErrors { + if schemaErrors[i].Paths != "" { + schemaErrors[i].Paths = obj.Path + "." + schemaErrors[i].Paths + } else { + schemaErrors[i].Paths = obj.Path + } + } + + return &model.SchemaValidationErr{Errors: schemaErrors} } } @@ -384,7 +398,7 @@ func (c *schemaCache) validateReferencedObject( } if err := schema.Value.VisitJSON(domainData, opts...); err != nil { log.Debugf(ctx, "Validation failed for @type: %s at path: %s: %v", obj.Type, obj.Path, err) - return fmt.Errorf("at %s: %w", obj.Path, err) + return err } log.Debugf(ctx, "Validation passed for @type: %s at path: %s", obj.Type, obj.Path) diff --git a/pkg/plugin/implementation/schemav2validator/schemav2validator.go b/pkg/plugin/implementation/schemav2validator/schemav2validator.go index de6c5e0..f7050b7 100644 --- a/pkg/plugin/implementation/schemav2validator/schemav2validator.go +++ b/pkg/plugin/implementation/schemav2validator/schemav2validator.go @@ -48,8 +48,6 @@ type Config struct { ExtendedSchemaConfig ExtendedSchemaConfig } - - // New creates a new Schemav2Validator instance. func New(ctx context.Context, config *Config) (*schemav2Validator, func() error, error) { if config == nil { @@ -142,9 +140,9 @@ func (v *schemav2Validator) Validate(ctx context.Context, reqURL *url.URL, data if v.config.EnableExtendedSchema && v.schemaCache != nil { log.Debugf(ctx, "Starting Extended Schema validation for action: %s", action) if err := v.validateExtendedSchemas(ctx, jsonData); err != nil { - // Extended Schema failure - return error. + // Extended Schema failure - return error log.Debugf(ctx, "Extended Schema validation failed for action %s: %v", action, err) - return v.formatValidationError(err) + return err } log.Debugf(ctx, "Extended Schema validation passed for action: %s", action) } @@ -411,5 +409,3 @@ func (v *schemav2Validator) getActionValue(contextSchema *openapi3.Schema) strin return "" } - - From 706030ccecb1687925b9279231f7094bb1e6fba0 Mon Sep 17 00:00:00 2001 From: ameersohel45 Date: Sun, 14 Dec 2025 23:55:04 +0530 Subject: [PATCH 9/9] test: add tests and update docs for extended schema validation --- CONFIG.md | 13 +- .../schemav2validator/README.md | 79 +- .../schemav2validator/extended_schema_test.go | 709 ++++++++++++++++++ 3 files changed, 792 insertions(+), 9 deletions(-) create mode 100644 pkg/plugin/implementation/schemav2validator/extended_schema_test.go diff --git a/CONFIG.md b/CONFIG.md index 737bc6e..d5d1a4f 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -508,7 +508,7 @@ schemaValidator: #### 5. Schema2Validator Plugin -**Purpose**: Validate requests against OpenAPI 3.x specifications with action-based matching. +**Purpose**: Validate requests against OpenAPI 3.x specifications. Supports core protocol validation and optional extended validation for domain-specific objects with `@context` references. ```yaml schemaValidator: @@ -517,6 +517,11 @@ schemaValidator: type: url location: https://raw.githubusercontent.com/beckn/protocol-specifications-new/refs/heads/draft/api-specs/beckn-protocol-api.yaml cacheTTL: "3600" + extendedSchema_enabled: "true" + extendedSchema_cacheTTL: "86400" + extendedSchema_maxCacheSize: "100" + extendedSchema_downloadTimeout: "30" + extendedSchema_allowedDomains: "beckn.org,example.com" ``` **Or for local files:** @@ -528,12 +533,18 @@ schemaValidator: type: file location: ./validation-scripts/l2-config/mobility_1.1.0_openapi_3.1.yaml cacheTTL: "3600" + extendedSchema_enabled: "false" ``` **Parameters**: - `type`: Source type - `"url"` for remote specs, `"file"` for local files - `location`: URL or file path to OpenAPI 3.1 specification - `cacheTTL`: Cache TTL in seconds before reloading spec (default: `"3600"`) +- `extendedSchema_enabled`: Enable extended schema validation for `@context` objects (default: `"false"`) +- `extendedSchema_cacheTTL`: Domain schema cache TTL in seconds (default: `"86400"`) +- `extendedSchema_maxCacheSize`: Max cached schemas (default: `"100"`) +- `extendedSchema_downloadTimeout`: Schema download timeout in seconds (default: `"30"`) +- `extendedSchema_allowedDomains`: Comma-separated domain whitelist (empty = all allowed) --- diff --git a/pkg/plugin/implementation/schemav2validator/README.md b/pkg/plugin/implementation/schemav2validator/README.md index b76ad99..e34f56a 100644 --- a/pkg/plugin/implementation/schemav2validator/README.md +++ b/pkg/plugin/implementation/schemav2validator/README.md @@ -10,6 +10,7 @@ Validates Beckn protocol requests against OpenAPI 3.1 specifications using kin-o - TTL-based caching with automatic refresh - Generic path matching (no hardcoded paths) - Direct schema validation without router overhead +- Extended schema validation for domain-specific objects with `@context` references ## Configuration @@ -20,6 +21,11 @@ schemaValidator: type: url location: https://example.com/openapi-spec.yaml cacheTTL: "3600" + extendedSchema_enabled: "true" + extendedSchema_cacheTTL: "86400" + extendedSchema_maxCacheSize: "100" + extendedSchema_downloadTimeout: "30" + extendedSchema_allowedDomains: "beckn.org,example.com" ``` ### Configuration Parameters @@ -29,24 +35,55 @@ schemaValidator: | `type` | string | Yes | - | Type of spec source: "url" or "file" ("dir" reserved for future) | | `location` | string | Yes | - | URL or file path to OpenAPI 3.1 spec | | `cacheTTL` | string | No | "3600" | Cache TTL in seconds before reloading spec | +| `extendedSchema_enabled` | string | No | "false" | Enable extended schema validation for `@context` objects | +| `extendedSchema_cacheTTL` | string | No | "86400" | Domain schema cache TTL in seconds | +| `extendedSchema_maxCacheSize` | string | No | "100" | Maximum number of cached domain schemas | +| `extendedSchema_downloadTimeout` | string | No | "30" | Timeout for downloading domain schemas | +| `extendedSchema_allowedDomains` | string | No | "" | Comma-separated domain whitelist (empty = all allowed) | ## How It Works -1. **Load Spec**: Loads OpenAPI spec from configured URL at startup -2. **Extract Action**: Extracts `action` from request `context.action` field -3. **Find Schema**: Searches all paths and HTTP methods in spec for schema with matching action: - - Checks `properties.context.action.enum` for the action value - - Also checks `properties.context.allOf[].properties.action.enum` - - Stops at first match -4. **Validate**: Validates request body against matched schema using `Schema.VisitJSON()` with: +### Initialization (Load Time) + +**Core Protocol Validation Setup**: +1. **Load OpenAPI Spec**: Loads main spec from `location` (URL or file) with external `$ref` resolution +2. **Build Action Index**: Creates action→schema map for O(1) lookup by scanning all paths/methods +3. **Validate Spec**: Validates OpenAPI spec structure (warnings logged, non-fatal) +4. **Cache Spec**: Stores loaded spec with `loadedAt` timestamp + +**Extended Schema Setup** (if `extendedSchema_enabled: "true"`): +5. **Initialize Schema Cache**: Creates LRU cache with `maxCacheSize` (default: 100) +6. **Start Background Refresh**: Launches goroutine with two tickers: + - Core spec refresh every `cacheTTL` seconds (default: 3600) + - Extended schema cleanup every `extendedSchema_cacheTTL` seconds (default: 86400) + +### Request Validation (Runtime) + +**Core Protocol Validation** (always runs): +1. **Parse Request**: Unmarshal JSON and extract `context.action` +2. **Lookup Schema**: O(1) lookup in action index (built at load time) +3. **Validate**: Call `schema.Value.VisitJSON()` with: - Required fields validation - Data type validation (string, number, boolean, object, array) - Format validation (email, uri, date-time, uuid, etc.) - Constraint validation (min/max, pattern, enum, const) - Nested object and array validation -5. **Return Errors**: Returns validation errors in ONIX format +4. **Return Errors**: If validation fails, format and return errors + +**Extended Schema Validation** (if `extendedSchema_enabled: "true"` AND core validation passed): +5. **Scan for @context**: Recursively traverse `message` field for objects with `@context` and `@type` +6. **Filter Core Schemas**: Skip objects with `/schema/core/` in `@context` URL +7. **Validate Each Domain Object**: + - Check domain whitelist (if `allowedDomains` configured) + - Transform `@context` URL: `context.jsonld` → `attributes.yaml` + - Load schema from URL/file (check cache first, download if miss) + - Find schema by `@type` (direct match or `x-jsonld.@type` fallback) + - Strip `@context` and `@type` metadata from object + - Validate remaining data against domain schema + - Prefix error paths with object location (e.g., `message.order.field`) +8. **Return Errors**: Returns first validation error (fail-fast) ## Action-Based Matching @@ -120,7 +157,33 @@ schemaValidator: cacheTTL: "3600" ``` +### With Extended Schema Validation +```yaml +schemaValidator: + id: schemav2validator + config: + type: url + location: https://raw.githubusercontent.com/beckn/protocol-specifications-new/refs/heads/draft/api-specs/beckn-protocol-api.yaml + cacheTTL: "3600" + extendedSchema_enabled: "true" + extendedSchema_cacheTTL: "86400" + extendedSchema_maxCacheSize: "100" + extendedSchema_downloadTimeout: "30" + extendedSchema_allowedDomains: "raw.githubusercontent.com,schemas.beckn.org" +``` + +**At Load Time**: +- Creates LRU cache for domain schemas (max 100 entries) +- Starts background goroutine for cache cleanup every 24 hours + +**At Runtime** (after core validation passes): +- Scans `message` field for objects with `@context` and `@type` +- Skips core Beckn schemas (containing `/schema/core/`) +- Downloads domain schemas from `@context` URLs (cached for 24 hours) +- Validates domain-specific data against schemas +- Returns errors with full JSON paths (e.g., `message.order.chargingRate`) +- Fail-fast: returns on first validation error ## Dependencies diff --git a/pkg/plugin/implementation/schemav2validator/extended_schema_test.go b/pkg/plugin/implementation/schemav2validator/extended_schema_test.go new file mode 100644 index 0000000..a4bf0cf --- /dev/null +++ b/pkg/plugin/implementation/schemav2validator/extended_schema_test.go @@ -0,0 +1,709 @@ +package schemav2validator + +import ( + "context" + "os" + "testing" + "time" + + "github.com/getkin/kin-openapi/openapi3" + "github.com/stretchr/testify/assert" +) + +func TestIsCoreSchema(t *testing.T) { + tests := []struct { + name string + contextURL string + want bool + }{ + { + name: "core schema URL", + contextURL: "https://raw.githubusercontent.com/beckn/protocol-specifications-new/refs/heads/draft/schema/core/v2/context.jsonld", + want: true, + }, + { + name: "domain schema URL", + contextURL: "https://raw.githubusercontent.com/beckn/protocol-specifications-new/refs/heads/draft/schema/EvChargingOffer/v1/context.jsonld", + want: false, + }, + { + name: "empty URL", + contextURL: "", + want: false, + }, + { + name: "URL without schema/core", + contextURL: "https://example.com/some/path/context.jsonld", + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := isCoreSchema(tt.contextURL) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestFindReferencedObjects(t *testing.T) { + tests := []struct { + name string + data interface{} + path string + want int // number of objects found + }{ + { + name: "single domain object", + data: map[string]interface{}{ + "@context": "https://example.com/schema/DomainType/v1/context.jsonld", + "@type": "DomainType", + "field": "value", + }, + path: "message", + want: 1, + }, + { + name: "core schema object - should be skipped", + data: map[string]interface{}{ + "@context": "https://example.com/schema/core/v2/context.jsonld", + "@type": "beckn:Order", + "field": "value", + }, + path: "message", + want: 0, + }, + { + name: "nested domain objects", + data: map[string]interface{}{ + "order": map[string]interface{}{ + "@context": "https://example.com/schema/core/v2/context.jsonld", + "@type": "beckn:Order", + "orderAttributes": map[string]interface{}{ + "@context": "https://example.com/schema/ChargingSession/v1/context.jsonld", + "@type": "ChargingSession", + "field": "value", + }, + }, + }, + path: "message", + want: 1, // Only domain object, core skipped + }, + { + name: "array with domain objects", + data: map[string]interface{}{ + "items": []interface{}{ + map[string]interface{}{ + "@context": "https://example.com/schema/DomainType/v1/context.jsonld", + "@type": "DomainType", + }, + map[string]interface{}{ + "@context": "https://example.com/schema/AnotherType/v1/context.jsonld", + "@type": "AnotherType", + }, + }, + }, + path: "message", + want: 2, + }, + { + name: "object without @context", + data: map[string]interface{}{ + "field": "value", + }, + path: "message", + want: 0, + }, + { + name: "object with @context but no @type", + data: map[string]interface{}{ + "@context": "https://example.com/schema/DomainType/v1/context.jsonld", + "field": "value", + }, + path: "message", + want: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := findReferencedObjects(tt.data, tt.path) + assert.Equal(t, tt.want, len(got)) + }) + } +} + +func TestTransformContextToSchemaURL(t *testing.T) { + tests := []struct { + name string + contextURL string + want string + }{ + { + name: "standard transformation", + contextURL: "https://example.com/schema/EvChargingOffer/v1/context.jsonld", + want: "https://example.com/schema/EvChargingOffer/v1/attributes.yaml", + }, + { + name: "already attributes.yaml", + contextURL: "https://example.com/schema/EvChargingOffer/v1/attributes.yaml", + want: "https://example.com/schema/EvChargingOffer/v1/attributes.yaml", + }, + { + name: "no context.jsonld in URL", + contextURL: "https://example.com/schema/EvChargingOffer/v1/", + want: "https://example.com/schema/EvChargingOffer/v1/", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := transformContextToSchemaURL(tt.contextURL) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestHashURL(t *testing.T) { + tests := []struct { + name string + url string + }{ + { + name: "consistent hashing", + url: "https://example.com/schema.yaml", + }, + { + name: "empty string", + url: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + hash1 := hashURL(tt.url) + hash2 := hashURL(tt.url) + + // Same URL should produce same hash + assert.Equal(t, hash1, hash2) + + // Hash should be 64 characters (SHA256 hex) + assert.Equal(t, 64, len(hash1)) + }) + } +} + +func TestIsValidSchemaPath(t *testing.T) { + tests := []struct { + name string + schemaPath string + want bool + }{ + { + name: "http URL", + schemaPath: "http://example.com/schema.yaml", + want: true, + }, + { + name: "https URL", + schemaPath: "https://example.com/schema.yaml", + want: true, + }, + { + name: "file URL", + schemaPath: "file:///path/to/schema.yaml", + want: true, + }, + { + name: "local path", + schemaPath: "/path/to/schema.yaml", + want: true, + }, + { + name: "relative path", + schemaPath: "./schema.yaml", + want: true, + }, + { + name: "empty path", + schemaPath: "", + want: true, // url.Parse("") succeeds, returns empty scheme + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := isValidSchemaPath(tt.schemaPath) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestNewSchemaCache(t *testing.T) { + tests := []struct { + name string + maxSize int + }{ + { + name: "default size", + maxSize: 100, + }, + { + name: "custom size", + maxSize: 50, + }, + { + name: "zero size", + maxSize: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cache := newSchemaCache(tt.maxSize) + + assert.NotNil(t, cache) + assert.Equal(t, tt.maxSize, cache.maxSize) + assert.NotNil(t, cache.schemas) + assert.Equal(t, 0, len(cache.schemas)) + }) + } +} + +func TestSchemaCache_GetSet(t *testing.T) { + cache := newSchemaCache(10) + + // Create a simple schema doc + doc := &openapi3.T{ + OpenAPI: "3.1.0", + } + + urlHash := hashURL("https://example.com/schema.yaml") + ttl := 1 * time.Hour + + // Test Set + cache.set(urlHash, doc, ttl) + + // Test Get - should find it + retrieved, found := cache.get(urlHash) + assert.True(t, found) + assert.Equal(t, doc, retrieved) + + // Test Get - non-existent key + _, found = cache.get("non-existent-hash") + assert.False(t, found) +} + +func TestSchemaCache_LRUEviction(t *testing.T) { + cache := newSchemaCache(2) // Small cache for testing + + doc1 := &openapi3.T{OpenAPI: "3.1.0"} + doc2 := &openapi3.T{OpenAPI: "3.1.1"} + doc3 := &openapi3.T{OpenAPI: "3.1.2"} + + ttl := 1 * time.Hour + + // Add first two items + cache.set("hash1", doc1, ttl) + cache.set("hash2", doc2, ttl) + + // Access first item to make it more recent + cache.get("hash1") + + // Add third item - should evict hash2 (least recently used) + cache.set("hash3", doc3, ttl) + + // Verify hash1 and hash3 exist, hash2 was evicted + _, found1 := cache.get("hash1") + _, found2 := cache.get("hash2") + _, found3 := cache.get("hash3") + + assert.True(t, found1, "hash1 should exist (recently accessed)") + assert.False(t, found2, "hash2 should be evicted (LRU)") + assert.True(t, found3, "hash3 should exist (just added)") +} + +func TestSchemaCache_TTLExpiry(t *testing.T) { + cache := newSchemaCache(10) + + doc := &openapi3.T{OpenAPI: "3.1.0"} + urlHash := "test-hash" + + // Set with very short TTL + cache.set(urlHash, doc, 1*time.Millisecond) + + // Should be found immediately + _, found := cache.get(urlHash) + assert.True(t, found) + + // Wait for expiry + time.Sleep(10 * time.Millisecond) + + // Should not be found after expiry + _, found = cache.get(urlHash) + assert.False(t, found) +} + +func TestSchemaCache_CleanupExpired(t *testing.T) { + cache := newSchemaCache(10) + + doc := &openapi3.T{OpenAPI: "3.1.0"} + + // Add items with short TTL + cache.set("hash1", doc, 1*time.Millisecond) + cache.set("hash2", doc, 1*time.Millisecond) + cache.set("hash3", doc, 1*time.Hour) // This one won't expire + + // Wait for expiry + time.Sleep(10 * time.Millisecond) + + // Cleanup expired + count := cache.cleanupExpired() + + // Should have cleaned up 2 expired items + assert.Equal(t, 2, count) + + // Verify only hash3 remains + cache.mu.RLock() + assert.Equal(t, 1, len(cache.schemas)) + _, exists := cache.schemas["hash3"] + assert.True(t, exists) + cache.mu.RUnlock() +} + +func TestIsAllowedDomain(t *testing.T) { + tests := []struct { + name string + schemaURL string + allowedDomains []string + want bool + }{ + { + name: "empty whitelist - all allowed", + schemaURL: "https://example.com/schema.yaml", + allowedDomains: []string{}, + want: true, + }, + { + name: "nil whitelist - all allowed", + schemaURL: "https://example.com/schema.yaml", + allowedDomains: nil, + want: true, + }, + { + name: "domain in whitelist", + schemaURL: "https://raw.githubusercontent.com/beckn/schema.yaml", + allowedDomains: []string{"raw.githubusercontent.com", "schemas.beckn.org"}, + want: true, + }, + { + name: "domain not in whitelist", + schemaURL: "https://malicious.com/schema.yaml", + allowedDomains: []string{"raw.githubusercontent.com", "schemas.beckn.org"}, + want: false, + }, + { + name: "partial domain match", + schemaURL: "https://raw.githubusercontent.com/beckn/schema.yaml", + allowedDomains: []string{"githubusercontent.com"}, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := isAllowedDomain(tt.schemaURL, tt.allowedDomains) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestFindReferencedObjects_PathBuilding(t *testing.T) { + data := map[string]interface{}{ + "order": map[string]interface{}{ + "beckn:orderItems": []interface{}{ + map[string]interface{}{ + "beckn:acceptedOffer": map[string]interface{}{ + "beckn:offerAttributes": map[string]interface{}{ + "@context": "https://example.com/schema/ChargingOffer/v1/context.jsonld", + "@type": "ChargingOffer", + }, + }, + }, + }, + }, + } + + objects := findReferencedObjects(data, "message") + + assert.Equal(t, 1, len(objects)) + assert.Equal(t, "message.order.beckn:orderItems[0].beckn:acceptedOffer.beckn:offerAttributes", objects[0].Path) + assert.Equal(t, "ChargingOffer", objects[0].Type) +} + +// Integration tests for the 4 remaining functions + +func TestLoadSchemaFromPath_LocalFile(t *testing.T) { + cache := newSchemaCache(10) + ctx := context.Background() + + tmpFile, err := os.CreateTemp("", "test-schema-*.yaml") + assert.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + schemaContent := `openapi: 3.1.0 +info: + title: Test Schema + version: 1.0.0 +components: + schemas: + TestType: + type: object + properties: + field1: + type: string` + + _, err = tmpFile.Write([]byte(schemaContent)) + assert.NoError(t, err) + tmpFile.Close() + + doc, err := cache.loadSchemaFromPath(ctx, tmpFile.Name(), 1*time.Hour, 30*time.Second) + assert.NoError(t, err) + assert.NotNil(t, doc) + assert.Equal(t, "3.1.0", doc.OpenAPI) +} + +func TestLoadSchemaFromPath_CacheHit(t *testing.T) { + cache := newSchemaCache(10) + ctx := context.Background() + + tmpFile, err := os.CreateTemp("", "test-schema-*.yaml") + assert.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + schemaContent := `openapi: 3.1.0 +info: + title: Test Schema + version: 1.0.0` + + tmpFile.Write([]byte(schemaContent)) + tmpFile.Close() + + doc1, err := cache.loadSchemaFromPath(ctx, tmpFile.Name(), 1*time.Hour, 30*time.Second) + assert.NoError(t, err) + + doc2, err := cache.loadSchemaFromPath(ctx, tmpFile.Name(), 1*time.Hour, 30*time.Second) + assert.NoError(t, err) + + assert.Equal(t, doc1, doc2) +} + +func TestLoadSchemaFromPath_InvalidPath(t *testing.T) { + cache := newSchemaCache(10) + ctx := context.Background() + + _, err := cache.loadSchemaFromPath(ctx, "/nonexistent/schema.yaml", 1*time.Hour, 30*time.Second) + assert.Error(t, err) +} + +func TestFindSchemaByType_DirectMatch(t *testing.T) { + cache := newSchemaCache(10) + ctx := context.Background() + + tmpFile, err := os.CreateTemp("", "test-schema-*.yaml") + assert.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + schemaContent := `openapi: 3.1.0 +info: + title: Test Schema + version: 1.0.0 +components: + schemas: + TestType: + type: object + properties: + field1: + type: string` + + tmpFile.Write([]byte(schemaContent)) + tmpFile.Close() + + doc, err := cache.loadSchemaFromPath(ctx, tmpFile.Name(), 1*time.Hour, 30*time.Second) + assert.NoError(t, err) + + schema, err := findSchemaByType(ctx, doc, "TestType") + assert.NoError(t, err) + assert.NotNil(t, schema) +} + +func TestFindSchemaByType_NotFound(t *testing.T) { + cache := newSchemaCache(10) + ctx := context.Background() + + tmpFile, err := os.CreateTemp("", "test-schema-*.yaml") + assert.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + schemaContent := `openapi: 3.1.0 +info: + title: Test Schema + version: 1.0.0 +components: + schemas: + TestType: + type: object` + + tmpFile.Write([]byte(schemaContent)) + tmpFile.Close() + + doc, err := cache.loadSchemaFromPath(ctx, tmpFile.Name(), 1*time.Hour, 30*time.Second) + assert.NoError(t, err) + + _, err = findSchemaByType(ctx, doc, "NonExistentType") + assert.Error(t, err) + assert.Contains(t, err.Error(), "no schema found") +} + +func TestValidateReferencedObject_Valid(t *testing.T) { + cache := newSchemaCache(10) + ctx := context.Background() + + tmpFile, err := os.CreateTemp("", "test-schema-*.yaml") + assert.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + schemaContent := `openapi: 3.1.0 +info: + title: Test Schema + version: 1.0.0 +components: + schemas: + TestType: + type: object + additionalProperties: false + x-jsonld: + "@context": ./context.jsonld + "@type": TestType + properties: + field1: + type: string + required: + - field1` + + tmpFile.Write([]byte(schemaContent)) + tmpFile.Close() + + obj := referencedObject{ + Path: "message.test", + Context: tmpFile.Name(), + Type: "TestType", + Data: map[string]interface{}{ + "@context": tmpFile.Name(), + "@type": "TestType", + "field1": "value1", + }, + } + + err = cache.validateReferencedObject(ctx, obj, 1*time.Hour, 30*time.Second, nil) + assert.NoError(t, err) +} + +func TestValidateReferencedObject_Invalid(t *testing.T) { + cache := newSchemaCache(10) + ctx := context.Background() + + tmpFile, err := os.CreateTemp("", "test-schema-*.yaml") + assert.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + schemaContent := `openapi: 3.1.0 +info: + title: Test Schema + version: 1.0.0 +components: + schemas: + TestType: + type: object + additionalProperties: false + x-jsonld: + "@context": ./context.jsonld + "@type": TestType + properties: + field1: + type: string + required: + - field1` + + tmpFile.Write([]byte(schemaContent)) + tmpFile.Close() + + obj := referencedObject{ + Path: "message.test", + Context: tmpFile.Name(), + Type: "TestType", + Data: map[string]interface{}{ + "@context": tmpFile.Name(), + "@type": "TestType", + }, + } + + err = cache.validateReferencedObject(ctx, obj, 1*time.Hour, 30*time.Second, nil) + assert.Error(t, err) +} + +func TestValidateReferencedObject_DomainNotAllowed(t *testing.T) { + cache := newSchemaCache(10) + ctx := context.Background() + + obj := referencedObject{ + Path: "message.test", + Context: "https://malicious.com/schema.yaml", + Type: "TestType", + Data: map[string]interface{}{}, + } + + allowedDomains := []string{"trusted.com"} + + err := cache.validateReferencedObject(ctx, obj, 1*time.Hour, 30*time.Second, allowedDomains) + assert.Error(t, err) + assert.Contains(t, err.Error(), "domain not allowed") +} + +func TestValidateExtendedSchemas_NoObjects(t *testing.T) { + v := &schemav2Validator{ + config: &Config{ + EnableExtendedSchema: true, + ExtendedSchemaConfig: ExtendedSchemaConfig{}, + }, + schemaCache: newSchemaCache(10), + } + + ctx := context.Background() + body := map[string]interface{}{ + "message": map[string]interface{}{ + "field": "value", + }, + } + + err := v.validateExtendedSchemas(ctx, body) + assert.NoError(t, err) +} + +func TestValidateExtendedSchemas_MissingMessage(t *testing.T) { + v := &schemav2Validator{ + config: &Config{ + EnableExtendedSchema: true, + }, + schemaCache: newSchemaCache(10), + } + + ctx := context.Background() + body := map[string]interface{}{ + "context": map[string]interface{}{}, + } + + err := v.validateExtendedSchemas(ctx, body) + assert.Error(t, err) + assert.Contains(t, err.Error(), "missing 'message' field") +}