ref: Split Extended Schema logic and rename variables
This commit is contained in:
@@ -43,28 +43,28 @@ func (vp schemav2ValidatorProvider) New(ctx context.Context, config map[string]s
|
||||
|
||||
// NEW: Parse extendedSchema_enabled
|
||||
if enableStr, ok := config["extendedSchema_enabled"]; ok {
|
||||
cfg.EnableReferencedSchemas = enableStr == "true"
|
||||
cfg.EnableExtendedSchema = enableStr == "true"
|
||||
}
|
||||
|
||||
// NEW: Parse Extended Schema config (if enabled)
|
||||
if cfg.EnableReferencedSchemas {
|
||||
if cfg.EnableExtendedSchema {
|
||||
if v, ok := config["extendedSchema_cacheTTL"]; ok {
|
||||
if ttl, err := strconv.Atoi(v); err == nil && ttl > 0 {
|
||||
cfg.ReferencedSchemaConfig.CacheTTL = ttl
|
||||
cfg.ExtendedSchemaConfig.CacheTTL = ttl
|
||||
}
|
||||
}
|
||||
if v, ok := config["extendedSchema_maxCacheSize"]; ok {
|
||||
if size, err := strconv.Atoi(v); err == nil && size > 0 {
|
||||
cfg.ReferencedSchemaConfig.MaxCacheSize = size
|
||||
cfg.ExtendedSchemaConfig.MaxCacheSize = size
|
||||
}
|
||||
}
|
||||
if v, ok := config["extendedSchema_downloadTimeout"]; ok {
|
||||
if timeout, err := strconv.Atoi(v); err == nil && timeout > 0 {
|
||||
cfg.ReferencedSchemaConfig.DownloadTimeout = timeout
|
||||
cfg.ExtendedSchemaConfig.DownloadTimeout = timeout
|
||||
}
|
||||
}
|
||||
if v, ok := config["extendedSchema_allowedDomains"]; ok && v != "" {
|
||||
cfg.ReferencedSchemaConfig.AllowedDomains = strings.Split(v, ",")
|
||||
cfg.ExtendedSchemaConfig.AllowedDomains = strings.Split(v, ",")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
379
pkg/plugin/implementation/schemav2validator/extended_schema.go
Normal file
379
pkg/plugin/implementation/schemav2validator/extended_schema.go
Normal file
@@ -0,0 +1,379 @@
|
||||
package schemav2validator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/log"
|
||||
"github.com/getkin/kin-openapi/openapi3"
|
||||
)
|
||||
|
||||
// ExtendedSchemaConfig holds configuration for referenced schema validation.
|
||||
type ExtendedSchemaConfig struct {
|
||||
CacheTTL int // seconds, default 86400 (24h)
|
||||
MaxCacheSize int // default 100
|
||||
DownloadTimeout int // seconds, default 30
|
||||
AllowedDomains []string // whitelist (empty = all allowed)
|
||||
}
|
||||
|
||||
// referencedObject represents ANY object with @context in the request.
|
||||
type referencedObject struct {
|
||||
Path string
|
||||
Context string
|
||||
Type string
|
||||
Data map[string]interface{}
|
||||
}
|
||||
|
||||
// schemaCache caches loaded domain schemas with LRU eviction.
|
||||
type schemaCache struct {
|
||||
mu sync.RWMutex
|
||||
schemas map[string]*cachedDomainSchema
|
||||
maxSize int
|
||||
}
|
||||
|
||||
// cachedDomainSchema holds a cached domain schema with metadata.
|
||||
type cachedDomainSchema struct {
|
||||
doc *openapi3.T
|
||||
loadedAt time.Time
|
||||
expiresAt time.Time
|
||||
lastAccessed time.Time
|
||||
accessCount int64
|
||||
}
|
||||
|
||||
// validateExtendedSchemas validates all objects with @context against their schemas.
|
||||
func (v *schemav2Validator) validateExtendedSchemas(ctx context.Context, body interface{}) error {
|
||||
// Extract "message" object - only scan inside message, not root
|
||||
bodyMap, ok := body.(map[string]interface{})
|
||||
if !ok {
|
||||
return fmt.Errorf("body is not a valid JSON object")
|
||||
}
|
||||
|
||||
message, hasMessage := bodyMap["message"]
|
||||
if !hasMessage {
|
||||
return fmt.Errorf("missing 'message' field in request body")
|
||||
}
|
||||
|
||||
// Find all objects with @context starting from message
|
||||
objects := findReferencedObjects(message, "message")
|
||||
|
||||
if len(objects) == 0 {
|
||||
log.Debugf(ctx, "No objects with @context found in message, skipping Extended Schema validation")
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Debugf(ctx, "Found %d objects with @context for Extended Schema validation", len(objects))
|
||||
|
||||
// Get config with defaults
|
||||
ttl := 86400 * time.Second // 24 hours default
|
||||
timeout := 30 * time.Second
|
||||
var allowedDomains []string
|
||||
|
||||
refConfig := v.config.ExtendedSchemaConfig
|
||||
if refConfig.CacheTTL > 0 {
|
||||
ttl = time.Duration(refConfig.CacheTTL) * time.Second
|
||||
}
|
||||
if refConfig.DownloadTimeout > 0 {
|
||||
timeout = time.Duration(refConfig.DownloadTimeout) * time.Second
|
||||
}
|
||||
allowedDomains = refConfig.AllowedDomains
|
||||
|
||||
log.Debugf(ctx, "Extended Schema config: ttl=%v, timeout=%v, allowedDomains=%v",
|
||||
ttl, timeout, allowedDomains)
|
||||
|
||||
// Validate each object and collect errors
|
||||
var errors []string
|
||||
for _, obj := range objects {
|
||||
log.Debugf(ctx, "Validating object at path: %s, @context: %s, @type: %s",
|
||||
obj.Path, obj.Context, obj.Type)
|
||||
|
||||
if err := v.schemaCache.validateReferencedObject(ctx, obj, ttl, timeout, allowedDomains); err != nil {
|
||||
errors = append(errors, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
if len(errors) > 0 {
|
||||
return fmt.Errorf("validation errors:\n - %s", strings.Join(errors, "\n - "))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// newSchemaCache creates a new schema cache.
|
||||
func newSchemaCache(maxSize int) *schemaCache {
|
||||
return &schemaCache{
|
||||
schemas: make(map[string]*cachedDomainSchema),
|
||||
maxSize: maxSize,
|
||||
}
|
||||
}
|
||||
|
||||
// hashURL creates a SHA256 hash of the URL for use as cache key.
|
||||
func hashURL(urlStr string) string {
|
||||
hash := sha256.Sum256([]byte(urlStr))
|
||||
return hex.EncodeToString(hash[:])
|
||||
}
|
||||
|
||||
// isValidSchemaPath validates if the schema path is safe to load.
|
||||
func isValidSchemaPath(schemaPath string) bool {
|
||||
u, err := url.Parse(schemaPath)
|
||||
if err != nil {
|
||||
// Could be a simple file path
|
||||
return schemaPath != ""
|
||||
}
|
||||
// Support: http://, https://, file://, or no scheme (local path)
|
||||
return u.Scheme == "http" || u.Scheme == "https" ||
|
||||
u.Scheme == "file" || u.Scheme == ""
|
||||
}
|
||||
|
||||
// get retrieves a cached schema and updates access tracking.
|
||||
func (c *schemaCache) get(urlHash string) (*openapi3.T, bool) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
cached, exists := c.schemas[urlHash]
|
||||
if !exists || time.Now().After(cached.expiresAt) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Update access tracking
|
||||
cached.lastAccessed = time.Now()
|
||||
cached.accessCount++
|
||||
|
||||
return cached.doc, true
|
||||
}
|
||||
|
||||
// set stores a schema in the cache with TTL and LRU eviction.
|
||||
func (c *schemaCache) set(urlHash string, doc *openapi3.T, ttl time.Duration) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// LRU eviction if cache is full
|
||||
if len(c.schemas) >= c.maxSize {
|
||||
var oldest string
|
||||
var oldestTime time.Time
|
||||
for k, v := range c.schemas {
|
||||
if oldest == "" || v.lastAccessed.Before(oldestTime) {
|
||||
oldest, oldestTime = k, v.lastAccessed
|
||||
}
|
||||
}
|
||||
if oldest != "" {
|
||||
delete(c.schemas, oldest)
|
||||
}
|
||||
}
|
||||
|
||||
c.schemas[urlHash] = &cachedDomainSchema{
|
||||
doc: doc,
|
||||
loadedAt: time.Now(),
|
||||
expiresAt: time.Now().Add(ttl),
|
||||
lastAccessed: time.Now(),
|
||||
accessCount: 1,
|
||||
}
|
||||
}
|
||||
|
||||
// cleanupExpired removes expired schemas from cache.
|
||||
func (c *schemaCache) cleanupExpired() int {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
expired := make([]string, 0)
|
||||
|
||||
for urlHash, cached := range c.schemas {
|
||||
if now.After(cached.expiresAt) {
|
||||
expired = append(expired, urlHash)
|
||||
}
|
||||
}
|
||||
|
||||
for _, urlHash := range expired {
|
||||
delete(c.schemas, urlHash)
|
||||
}
|
||||
|
||||
return len(expired)
|
||||
}
|
||||
|
||||
// loadSchemaFromPath loads a schema from URL or local file with timeout and caching.
|
||||
func (c *schemaCache) loadSchemaFromPath(ctx context.Context, schemaPath string, ttl, timeout time.Duration) (*openapi3.T, error) {
|
||||
urlHash := hashURL(schemaPath)
|
||||
|
||||
// Check cache first
|
||||
if doc, found := c.get(urlHash); found {
|
||||
log.Debugf(ctx, "Schema cache hit for: %s", schemaPath)
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
log.Debugf(ctx, "Schema cache miss, loading from: %s", schemaPath)
|
||||
|
||||
// Validate path format
|
||||
if !isValidSchemaPath(schemaPath) {
|
||||
return nil, fmt.Errorf("invalid schema path: %s", schemaPath)
|
||||
}
|
||||
|
||||
loader := openapi3.NewLoader()
|
||||
loader.IsExternalRefsAllowed = true
|
||||
|
||||
var doc *openapi3.T
|
||||
var err error
|
||||
|
||||
u, parseErr := url.Parse(schemaPath)
|
||||
if parseErr == nil && (u.Scheme == "http" || u.Scheme == "https") {
|
||||
// Load from URL with timeout
|
||||
loadCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
loader.Context = loadCtx
|
||||
doc, err = loader.LoadFromURI(u)
|
||||
} else {
|
||||
// Load from local file (file:// or path)
|
||||
filePath := schemaPath
|
||||
if u != nil && u.Scheme == "file" {
|
||||
filePath = u.Path
|
||||
}
|
||||
doc, err = loader.LoadFromFile(filePath)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Errorf(ctx, err, "Failed to load schema from: %s", schemaPath)
|
||||
return nil, fmt.Errorf("failed to load schema from %s: %w", schemaPath, err)
|
||||
}
|
||||
|
||||
// Validate loaded schema (non-blocking, just log warnings)
|
||||
if err := doc.Validate(ctx); err != nil {
|
||||
log.Debugf(ctx, "Schema validation warnings for %s: %v", schemaPath, err)
|
||||
}
|
||||
|
||||
c.set(urlHash, doc, ttl)
|
||||
log.Debugf(ctx, "Loaded and cached schema from: %s", schemaPath)
|
||||
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
// findReferencedObjects recursively finds ALL objects with @context in the data.
|
||||
func findReferencedObjects(data interface{}, path string) []referencedObject {
|
||||
var results []referencedObject
|
||||
|
||||
switch v := data.(type) {
|
||||
case map[string]interface{}:
|
||||
// Check for @context and @type
|
||||
if contextVal, hasContext := v["@context"].(string); hasContext {
|
||||
if typeVal, hasType := v["@type"].(string); hasType {
|
||||
results = append(results, referencedObject{
|
||||
Path: path,
|
||||
Context: contextVal,
|
||||
Type: typeVal,
|
||||
Data: v,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Recurse into nested objects
|
||||
for key, val := range v {
|
||||
newPath := key
|
||||
if path != "" {
|
||||
newPath = path + "." + key
|
||||
}
|
||||
results = append(results, findReferencedObjects(val, newPath)...)
|
||||
}
|
||||
|
||||
case []interface{}:
|
||||
// Recurse into arrays
|
||||
for i, item := range v {
|
||||
newPath := fmt.Sprintf("%s[%d]", path, i)
|
||||
results = append(results, findReferencedObjects(item, newPath)...)
|
||||
}
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
// transformContextToSchemaURL transforms @context URL to schema URL.
|
||||
func transformContextToSchemaURL(contextURL string) string {
|
||||
// Hardcoded transformation: context.jsonld -> attributes.yaml
|
||||
return strings.Replace(contextURL, "context.jsonld", "attributes.yaml", 1)
|
||||
}
|
||||
|
||||
// findSchemaByType finds a schema in the document by @type value.
|
||||
func findSchemaByType(doc *openapi3.T, typeName string) (*openapi3.SchemaRef, error) {
|
||||
if doc.Components == nil || doc.Components.Schemas == nil {
|
||||
return nil, fmt.Errorf("no schemas found in document")
|
||||
}
|
||||
|
||||
// Try direct match by schema name
|
||||
if schema, exists := doc.Components.Schemas[typeName]; exists {
|
||||
return schema, nil
|
||||
}
|
||||
|
||||
// Fallback: Try x-jsonld.@type match
|
||||
for _, schema := range doc.Components.Schemas {
|
||||
if schema.Value == nil {
|
||||
continue
|
||||
}
|
||||
if xJsonld, ok := schema.Value.Extensions["x-jsonld"].(map[string]interface{}); ok {
|
||||
if atType, ok := xJsonld["@type"].(string); ok && atType == typeName {
|
||||
return schema, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("no schema found for @type: %s", typeName)
|
||||
}
|
||||
|
||||
// isAllowedDomain checks if the URL domain is in the whitelist.
|
||||
func isAllowedDomain(schemaURL string, allowedDomains []string) bool {
|
||||
if len(allowedDomains) == 0 {
|
||||
return true // No whitelist = all allowed
|
||||
}
|
||||
for _, domain := range allowedDomains {
|
||||
if strings.Contains(schemaURL, domain) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// validateReferencedObject validates a single object with @context.
|
||||
func (c *schemaCache) validateReferencedObject(
|
||||
ctx context.Context,
|
||||
obj referencedObject,
|
||||
ttl, timeout time.Duration,
|
||||
allowedDomains []string,
|
||||
) error {
|
||||
// Domain whitelist check
|
||||
if !isAllowedDomain(obj.Context, allowedDomains) {
|
||||
log.Warnf(ctx, "Domain not in whitelist: %s", obj.Context)
|
||||
return fmt.Errorf("domain not allowed: %s", obj.Context)
|
||||
}
|
||||
|
||||
// Transform @context to schema path (URL or file)
|
||||
schemaPath := transformContextToSchemaURL(obj.Context)
|
||||
log.Debugf(ctx, "Transformed %s -> %s", obj.Context, schemaPath)
|
||||
|
||||
// Load schema with timeout (supports URL or local file)
|
||||
doc, err := c.loadSchemaFromPath(ctx, schemaPath, ttl, timeout)
|
||||
if err != nil {
|
||||
return fmt.Errorf("at %s: %w", obj.Path, err)
|
||||
}
|
||||
|
||||
// Find schema by @type
|
||||
schema, err := findSchemaByType(doc, obj.Type)
|
||||
if err != nil {
|
||||
log.Errorf(ctx, err, "Schema not found for @type: %s at path: %s", obj.Type, obj.Path)
|
||||
return fmt.Errorf("at %s: %w", obj.Path, err)
|
||||
}
|
||||
|
||||
// Validate object against schema (same options as core schema)
|
||||
opts := []openapi3.SchemaValidationOption{
|
||||
openapi3.VisitAsRequest(),
|
||||
openapi3.EnableFormatValidation(),
|
||||
}
|
||||
if err := schema.Value.VisitJSON(obj.Data, opts...); err != nil {
|
||||
log.Debugf(ctx, "Validation failed for @type: %s at path: %s: %v", obj.Type, obj.Path, err)
|
||||
return fmt.Errorf("at %s: %w", obj.Path, err)
|
||||
}
|
||||
|
||||
log.Debugf(ctx, "Validation passed for @type: %s at path: %s", obj.Type, obj.Path)
|
||||
return nil
|
||||
}
|
||||
@@ -2,8 +2,6 @@ package schemav2validator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
@@ -45,42 +43,12 @@ type Config struct {
|
||||
Location string // URL, file path, or directory path
|
||||
CacheTTL int
|
||||
|
||||
// NEW: Referenced schema configuration
|
||||
EnableReferencedSchemas bool
|
||||
ReferencedSchemaConfig ReferencedSchemaConfig
|
||||
// NEW: Extended Schema configuration
|
||||
EnableExtendedSchema bool
|
||||
ExtendedSchemaConfig ExtendedSchemaConfig
|
||||
}
|
||||
|
||||
// ReferencedSchemaConfig holds configuration for referenced schema validation.
|
||||
type ReferencedSchemaConfig struct {
|
||||
CacheTTL int // seconds, default 86400 (24h)
|
||||
MaxCacheSize int // default 100
|
||||
DownloadTimeout int // seconds, default 30
|
||||
AllowedDomains []string // whitelist (empty = all allowed)
|
||||
}
|
||||
|
||||
// referencedObject represents ANY object with @context in the request.
|
||||
type referencedObject struct {
|
||||
Path string
|
||||
Context string
|
||||
Type string
|
||||
Data map[string]interface{}
|
||||
}
|
||||
|
||||
// schemaCache caches loaded domain schemas with LRU eviction.
|
||||
type schemaCache struct {
|
||||
mu sync.RWMutex
|
||||
schemas map[string]*cachedDomainSchema
|
||||
maxSize int
|
||||
}
|
||||
|
||||
// cachedDomainSchema holds a cached domain schema with metadata.
|
||||
type cachedDomainSchema struct {
|
||||
doc *openapi3.T
|
||||
loadedAt time.Time
|
||||
expiresAt time.Time
|
||||
lastAccessed time.Time
|
||||
accessCount int64
|
||||
}
|
||||
|
||||
// New creates a new Schemav2Validator instance.
|
||||
func New(ctx context.Context, config *Config) (*schemav2Validator, func() error, error) {
|
||||
@@ -105,14 +73,14 @@ func New(ctx context.Context, config *Config) (*schemav2Validator, func() error,
|
||||
config: config,
|
||||
}
|
||||
|
||||
// NEW: Initialize referenced schema cache if enabled
|
||||
if config.EnableReferencedSchemas {
|
||||
// NEW: Initialize extended schema cache if enabled
|
||||
if config.EnableExtendedSchema {
|
||||
maxSize := 100
|
||||
if config.ReferencedSchemaConfig.MaxCacheSize > 0 {
|
||||
maxSize = config.ReferencedSchemaConfig.MaxCacheSize
|
||||
if config.ExtendedSchemaConfig.MaxCacheSize > 0 {
|
||||
maxSize = config.ExtendedSchemaConfig.MaxCacheSize
|
||||
}
|
||||
v.schemaCache = newSchemaCache(maxSize)
|
||||
log.Infof(ctx, "Initialized referenced schema cache with max size: %d", maxSize)
|
||||
log.Infof(ctx, "Initialized extended schema cache with max size: %d", maxSize)
|
||||
}
|
||||
|
||||
if err := v.initialise(ctx); err != nil {
|
||||
@@ -171,9 +139,9 @@ func (v *schemav2Validator) Validate(ctx context.Context, reqURL *url.URL, data
|
||||
log.Debugf(ctx, "Core schema validation passed for action: %s", action)
|
||||
|
||||
// NEW: Extended Schema validation (if enabled)
|
||||
if v.config.EnableReferencedSchemas && v.schemaCache != nil {
|
||||
if v.config.EnableExtendedSchema && v.schemaCache != nil {
|
||||
log.Debugf(ctx, "Starting Extended Schema validation for action: %s", action)
|
||||
if err := v.validateReferencedSchemas(ctx, jsonData); err != nil {
|
||||
if err := v.validateExtendedSchemas(ctx, jsonData); err != nil {
|
||||
// Extended Schema failure - return error (same behavior as core schema)
|
||||
log.Debugf(ctx, "Extended Schema validation failed for action %s: %v", action, err)
|
||||
return v.formatValidationError(err)
|
||||
@@ -246,10 +214,10 @@ func (v *schemav2Validator) refreshLoop(ctx context.Context) {
|
||||
coreTicker := time.NewTicker(time.Duration(v.config.CacheTTL) * time.Second)
|
||||
defer coreTicker.Stop()
|
||||
|
||||
// NEW: Ticker for referenced schema cleanup
|
||||
// NEW: Ticker for extended schema cleanup
|
||||
var refTicker *time.Ticker
|
||||
if v.config.EnableReferencedSchemas {
|
||||
ttl := v.config.ReferencedSchemaConfig.CacheTTL
|
||||
if v.config.EnableExtendedSchema {
|
||||
ttl := v.config.ExtendedSchemaConfig.CacheTTL
|
||||
if ttl <= 0 {
|
||||
ttl = 86400 // Default 24 hours
|
||||
}
|
||||
@@ -268,7 +236,7 @@ func (v *schemav2Validator) refreshLoop(ctx context.Context) {
|
||||
if v.schemaCache != nil {
|
||||
count := v.schemaCache.cleanupExpired()
|
||||
if count > 0 {
|
||||
log.Debugf(ctx, "Cleaned up %d expired referenced schemas", count)
|
||||
log.Debugf(ctx, "Cleaned up %d expired extended schemas", count)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -398,64 +366,6 @@ func (v *schemav2Validator) buildActionIndex(ctx context.Context, doc *openapi3.
|
||||
return actionSchemas
|
||||
}
|
||||
|
||||
// validateReferencedSchemas validates all objects with @context against their schemas.
|
||||
func (v *schemav2Validator) validateReferencedSchemas(ctx context.Context, body interface{}) error {
|
||||
// Extract "message" object - only scan inside message, not root
|
||||
bodyMap, ok := body.(map[string]interface{})
|
||||
if !ok {
|
||||
return fmt.Errorf("body is not a valid JSON object")
|
||||
}
|
||||
|
||||
message, hasMessage := bodyMap["message"]
|
||||
if !hasMessage {
|
||||
return fmt.Errorf("missing 'message' field in request body")
|
||||
}
|
||||
|
||||
// Find all objects with @context starting from message
|
||||
objects := findReferencedObjects(message, "message")
|
||||
|
||||
if len(objects) == 0 {
|
||||
log.Debugf(ctx, "No objects with @context found in message, skipping Extended Schema validation")
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Debugf(ctx, "Found %d objects with @context for Extended Schema validation", len(objects))
|
||||
|
||||
// Get config with defaults
|
||||
ttl := 86400 * time.Second // 24 hours default
|
||||
timeout := 30 * time.Second
|
||||
var allowedDomains []string
|
||||
|
||||
refConfig := v.config.ReferencedSchemaConfig
|
||||
if refConfig.CacheTTL > 0 {
|
||||
ttl = time.Duration(refConfig.CacheTTL) * time.Second
|
||||
}
|
||||
if refConfig.DownloadTimeout > 0 {
|
||||
timeout = time.Duration(refConfig.DownloadTimeout) * time.Second
|
||||
}
|
||||
allowedDomains = refConfig.AllowedDomains
|
||||
|
||||
log.Debugf(ctx, "Extended Schema config: ttl=%v, timeout=%v, allowedDomains=%v",
|
||||
ttl, timeout, allowedDomains)
|
||||
|
||||
// Validate each object and collect errors
|
||||
var errors []string
|
||||
for _, obj := range objects {
|
||||
log.Debugf(ctx, "Validating object at path: %s, @context: %s, @type: %s",
|
||||
obj.Path, obj.Context, obj.Type)
|
||||
|
||||
if err := v.schemaCache.validateReferencedObject(ctx, obj, ttl, timeout, allowedDomains); err != nil {
|
||||
errors = append(errors, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
if len(errors) > 0 {
|
||||
return fmt.Errorf("validation errors:\n - %s", strings.Join(errors, "\n - "))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// extractActionFromSchema extracts the action value from a schema.
|
||||
func (v *schemav2Validator) extractActionFromSchema(schema *openapi3.Schema) string {
|
||||
// Check direct properties
|
||||
@@ -508,278 +418,4 @@ func (v *schemav2Validator) getActionValue(contextSchema *openapi3.Schema) strin
|
||||
return ""
|
||||
}
|
||||
|
||||
// newSchemaCache creates a new schema cache.
|
||||
func newSchemaCache(maxSize int) *schemaCache {
|
||||
return &schemaCache{
|
||||
schemas: make(map[string]*cachedDomainSchema),
|
||||
maxSize: maxSize,
|
||||
}
|
||||
}
|
||||
|
||||
// hashURL creates a SHA256 hash of the URL for use as cache key.
|
||||
func hashURL(urlStr string) string {
|
||||
hash := sha256.Sum256([]byte(urlStr))
|
||||
return hex.EncodeToString(hash[:])
|
||||
}
|
||||
|
||||
// isValidSchemaPath validates if the schema path is safe to load.
|
||||
func isValidSchemaPath(schemaPath string) bool {
|
||||
u, err := url.Parse(schemaPath)
|
||||
if err != nil {
|
||||
// Could be a simple file path
|
||||
return schemaPath != ""
|
||||
}
|
||||
// Support: http://, https://, file://, or no scheme (local path)
|
||||
return u.Scheme == "http" || u.Scheme == "https" ||
|
||||
u.Scheme == "file" || u.Scheme == ""
|
||||
}
|
||||
|
||||
// get retrieves a cached schema and updates access tracking.
|
||||
func (c *schemaCache) get(urlHash string) (*openapi3.T, bool) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
cached, exists := c.schemas[urlHash]
|
||||
if !exists || time.Now().After(cached.expiresAt) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Update access tracking
|
||||
cached.lastAccessed = time.Now()
|
||||
cached.accessCount++
|
||||
|
||||
return cached.doc, true
|
||||
}
|
||||
|
||||
// set stores a schema in the cache with TTL and LRU eviction.
|
||||
func (c *schemaCache) set(urlHash string, doc *openapi3.T, ttl time.Duration) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// LRU eviction if cache is full
|
||||
if len(c.schemas) >= c.maxSize {
|
||||
var oldest string
|
||||
var oldestTime time.Time
|
||||
for k, v := range c.schemas {
|
||||
if oldest == "" || v.lastAccessed.Before(oldestTime) {
|
||||
oldest, oldestTime = k, v.lastAccessed
|
||||
}
|
||||
}
|
||||
if oldest != "" {
|
||||
delete(c.schemas, oldest)
|
||||
}
|
||||
}
|
||||
|
||||
c.schemas[urlHash] = &cachedDomainSchema{
|
||||
doc: doc,
|
||||
loadedAt: time.Now(),
|
||||
expiresAt: time.Now().Add(ttl),
|
||||
lastAccessed: time.Now(),
|
||||
accessCount: 1,
|
||||
}
|
||||
}
|
||||
|
||||
// cleanupExpired removes expired schemas from cache.
|
||||
func (c *schemaCache) cleanupExpired() int {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
expired := make([]string, 0)
|
||||
|
||||
for urlHash, cached := range c.schemas {
|
||||
if now.After(cached.expiresAt) {
|
||||
expired = append(expired, urlHash)
|
||||
}
|
||||
}
|
||||
|
||||
for _, urlHash := range expired {
|
||||
delete(c.schemas, urlHash)
|
||||
}
|
||||
|
||||
return len(expired)
|
||||
}
|
||||
|
||||
// loadSchemaFromPath loads a schema from URL or local file with timeout and caching.
|
||||
func (c *schemaCache) loadSchemaFromPath(ctx context.Context, schemaPath string, ttl, timeout time.Duration) (*openapi3.T, error) {
|
||||
urlHash := hashURL(schemaPath)
|
||||
|
||||
// Check cache first
|
||||
if doc, found := c.get(urlHash); found {
|
||||
log.Debugf(ctx, "Schema cache hit for: %s", schemaPath)
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
log.Debugf(ctx, "Schema cache miss, loading from: %s", schemaPath)
|
||||
|
||||
// Validate path format
|
||||
if !isValidSchemaPath(schemaPath) {
|
||||
return nil, fmt.Errorf("invalid schema path: %s", schemaPath)
|
||||
}
|
||||
|
||||
loader := openapi3.NewLoader()
|
||||
loader.IsExternalRefsAllowed = true
|
||||
|
||||
var doc *openapi3.T
|
||||
var err error
|
||||
|
||||
u, parseErr := url.Parse(schemaPath)
|
||||
if parseErr == nil && (u.Scheme == "http" || u.Scheme == "https") {
|
||||
// Load from URL with timeout
|
||||
loadCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
loader.Context = loadCtx
|
||||
doc, err = loader.LoadFromURI(u)
|
||||
} else {
|
||||
// Load from local file (file:// or path)
|
||||
filePath := schemaPath
|
||||
if u != nil && u.Scheme == "file" {
|
||||
filePath = u.Path
|
||||
}
|
||||
doc, err = loader.LoadFromFile(filePath)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Errorf(ctx, err, "Failed to load schema from: %s", schemaPath)
|
||||
return nil, fmt.Errorf("failed to load schema from %s: %w", schemaPath, err)
|
||||
}
|
||||
|
||||
// Validate loaded schema (non-blocking, just log warnings)
|
||||
if err := doc.Validate(ctx); err != nil {
|
||||
log.Debugf(ctx, "Schema validation warnings for %s: %v", schemaPath, err)
|
||||
}
|
||||
|
||||
c.set(urlHash, doc, ttl)
|
||||
log.Debugf(ctx, "Loaded and cached schema from: %s", schemaPath)
|
||||
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
// findReferencedObjects recursively finds ALL objects with @context in the data.
|
||||
func findReferencedObjects(data interface{}, path string) []referencedObject {
|
||||
var results []referencedObject
|
||||
|
||||
switch v := data.(type) {
|
||||
case map[string]interface{}:
|
||||
// Check for @context and @type
|
||||
if contextVal, hasContext := v["@context"].(string); hasContext {
|
||||
if typeVal, hasType := v["@type"].(string); hasType {
|
||||
results = append(results, referencedObject{
|
||||
Path: path,
|
||||
Context: contextVal,
|
||||
Type: typeVal,
|
||||
Data: v,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Recurse into nested objects
|
||||
for key, val := range v {
|
||||
newPath := key
|
||||
if path != "" {
|
||||
newPath = path + "." + key
|
||||
}
|
||||
results = append(results, findReferencedObjects(val, newPath)...)
|
||||
}
|
||||
|
||||
case []interface{}:
|
||||
// Recurse into arrays
|
||||
for i, item := range v {
|
||||
newPath := fmt.Sprintf("%s[%d]", path, i)
|
||||
results = append(results, findReferencedObjects(item, newPath)...)
|
||||
}
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
// transformContextToSchemaURL transforms @context URL to schema URL.
|
||||
func transformContextToSchemaURL(contextURL string) string {
|
||||
// Hardcoded transformation: context.jsonld -> attributes.yaml
|
||||
return strings.Replace(contextURL, "context.jsonld", "attributes.yaml", 1)
|
||||
}
|
||||
|
||||
|
||||
|
||||
// findSchemaByType finds a schema in the document by @type value.
|
||||
func findSchemaByType(doc *openapi3.T, typeName string) (*openapi3.SchemaRef, error) {
|
||||
if doc.Components == nil || doc.Components.Schemas == nil {
|
||||
return nil, fmt.Errorf("no schemas found in document")
|
||||
}
|
||||
|
||||
// Try direct match by schema name
|
||||
if schema, exists := doc.Components.Schemas[typeName]; exists {
|
||||
return schema, nil
|
||||
}
|
||||
|
||||
// Fallback: Try x-jsonld.@type match
|
||||
for _, schema := range doc.Components.Schemas {
|
||||
if schema.Value == nil {
|
||||
continue
|
||||
}
|
||||
if xJsonld, ok := schema.Value.Extensions["x-jsonld"].(map[string]interface{}); ok {
|
||||
if atType, ok := xJsonld["@type"].(string); ok && atType == typeName {
|
||||
return schema, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("no schema found for @type: %s", typeName)
|
||||
}
|
||||
|
||||
// isAllowedDomain checks if the URL domain is in the whitelist.
|
||||
func isAllowedDomain(schemaURL string, allowedDomains []string) bool {
|
||||
if len(allowedDomains) == 0 {
|
||||
return true // No whitelist = all allowed
|
||||
}
|
||||
for _, domain := range allowedDomains {
|
||||
if strings.Contains(schemaURL, domain) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// validateReferencedObject validates a single object with @context.
|
||||
func (c *schemaCache) validateReferencedObject(
|
||||
ctx context.Context,
|
||||
obj referencedObject,
|
||||
ttl, timeout time.Duration,
|
||||
allowedDomains []string,
|
||||
) error {
|
||||
// Domain whitelist check
|
||||
if !isAllowedDomain(obj.Context, allowedDomains) {
|
||||
log.Warnf(ctx, "Domain not in whitelist: %s", obj.Context)
|
||||
return fmt.Errorf("domain not allowed: %s", obj.Context)
|
||||
}
|
||||
|
||||
// Transform @context to schema path (URL or file)
|
||||
schemaPath := transformContextToSchemaURL(obj.Context)
|
||||
log.Debugf(ctx, "Transformed %s -> %s", obj.Context, schemaPath)
|
||||
|
||||
// Load schema with timeout (supports URL or local file)
|
||||
doc, err := c.loadSchemaFromPath(ctx, schemaPath, ttl, timeout)
|
||||
if err != nil {
|
||||
return fmt.Errorf("at %s: %w", obj.Path, err)
|
||||
}
|
||||
|
||||
// Find schema by @type
|
||||
schema, err := findSchemaByType(doc, obj.Type)
|
||||
if err != nil {
|
||||
log.Errorf(ctx, err, "Schema not found for @type: %s at path: %s", obj.Type, obj.Path)
|
||||
return fmt.Errorf("at %s: %w", obj.Path, err)
|
||||
}
|
||||
|
||||
// Validate object against schema (same options as core schema)
|
||||
opts := []openapi3.SchemaValidationOption{
|
||||
openapi3.VisitAsRequest(),
|
||||
openapi3.EnableFormatValidation(),
|
||||
}
|
||||
if err := schema.Value.VisitJSON(obj.Data, opts...); err != nil {
|
||||
log.Debugf(ctx, "Validation failed for @type: %s at path: %s: %v", obj.Type, obj.Path, err)
|
||||
return fmt.Errorf("at %s: %w", obj.Path, err)
|
||||
}
|
||||
|
||||
log.Debugf(ctx, "Validation passed for @type: %s at path: %s", obj.Type, obj.Path)
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user