Merge pull request #582 from Beckn-One/580-schema2validator

Issue 580 - Add extended schema validation capability to schemav2validator plugin
This commit is contained in:
Mayuresh A Nirhali
2025-12-15 15:55:33 +05:30
committed by GitHub
6 changed files with 1282 additions and 15 deletions

View File

@@ -508,7 +508,7 @@ schemaValidator:
#### 5. Schema2Validator Plugin
**Purpose**: Validate requests against OpenAPI 3.x specifications with action-based matching.
**Purpose**: Validate requests against OpenAPI 3.x specifications. Supports core protocol validation and optional extended validation for domain-specific objects with `@context` references.
```yaml
schemaValidator:
@@ -517,6 +517,11 @@ schemaValidator:
type: url
location: https://raw.githubusercontent.com/beckn/protocol-specifications-new/refs/heads/draft/api-specs/beckn-protocol-api.yaml
cacheTTL: "3600"
extendedSchema_enabled: "true"
extendedSchema_cacheTTL: "86400"
extendedSchema_maxCacheSize: "100"
extendedSchema_downloadTimeout: "30"
extendedSchema_allowedDomains: "beckn.org,example.com"
```
**Or for local files:**
@@ -528,12 +533,18 @@ schemaValidator:
type: file
location: ./validation-scripts/l2-config/mobility_1.1.0_openapi_3.1.yaml
cacheTTL: "3600"
extendedSchema_enabled: "false"
```
**Parameters**:
- `type`: Source type - `"url"` for remote specs, `"file"` for local files
- `location`: URL or file path to OpenAPI 3.1 specification
- `cacheTTL`: Cache TTL in seconds before reloading spec (default: `"3600"`)
- `extendedSchema_enabled`: Enable extended schema validation for `@context` objects (default: `"false"`)
- `extendedSchema_cacheTTL`: Domain schema cache TTL in seconds (default: `"86400"`)
- `extendedSchema_maxCacheSize`: Max cached schemas (default: `"100"`)
- `extendedSchema_downloadTimeout`: Schema download timeout in seconds (default: `"30"`)
- `extendedSchema_allowedDomains`: Comma-separated domain whitelist (empty = all allowed)
---

View File

@@ -10,6 +10,7 @@ Validates Beckn protocol requests against OpenAPI 3.1 specifications using kin-o
- TTL-based caching with automatic refresh
- Generic path matching (no hardcoded paths)
- Direct schema validation without router overhead
- Extended schema validation for domain-specific objects with `@context` references
## Configuration
@@ -20,6 +21,11 @@ schemaValidator:
type: url
location: https://example.com/openapi-spec.yaml
cacheTTL: "3600"
extendedSchema_enabled: "true"
extendedSchema_cacheTTL: "86400"
extendedSchema_maxCacheSize: "100"
extendedSchema_downloadTimeout: "30"
extendedSchema_allowedDomains: "beckn.org,example.com"
```
### Configuration Parameters
@@ -29,24 +35,55 @@ schemaValidator:
| `type` | string | Yes | - | Type of spec source: "url" or "file" ("dir" reserved for future) |
| `location` | string | Yes | - | URL or file path to OpenAPI 3.1 spec |
| `cacheTTL` | string | No | "3600" | Cache TTL in seconds before reloading spec |
| `extendedSchema_enabled` | string | No | "false" | Enable extended schema validation for `@context` objects |
| `extendedSchema_cacheTTL` | string | No | "86400" | Domain schema cache TTL in seconds |
| `extendedSchema_maxCacheSize` | string | No | "100" | Maximum number of cached domain schemas |
| `extendedSchema_downloadTimeout` | string | No | "30" | Timeout for downloading domain schemas |
| `extendedSchema_allowedDomains` | string | No | "" | Comma-separated domain whitelist (empty = all allowed) |
## How It Works
1. **Load Spec**: Loads OpenAPI spec from configured URL at startup
2. **Extract Action**: Extracts `action` from request `context.action` field
3. **Find Schema**: Searches all paths and HTTP methods in spec for schema with matching action:
- Checks `properties.context.action.enum` for the action value
- Also checks `properties.context.allOf[].properties.action.enum`
- Stops at first match
4. **Validate**: Validates request body against matched schema using `Schema.VisitJSON()` with:
### Initialization (Load Time)
**Core Protocol Validation Setup**:
1. **Load OpenAPI Spec**: Loads main spec from `location` (URL or file) with external `$ref` resolution
2. **Build Action Index**: Creates action→schema map for O(1) lookup by scanning all paths/methods
3. **Validate Spec**: Validates OpenAPI spec structure (warnings logged, non-fatal)
4. **Cache Spec**: Stores loaded spec with `loadedAt` timestamp
**Extended Schema Setup** (if `extendedSchema_enabled: "true"`):
5. **Initialize Schema Cache**: Creates LRU cache with `maxCacheSize` (default: 100)
6. **Start Background Refresh**: Launches goroutine with two tickers:
- Core spec refresh every `cacheTTL` seconds (default: 3600)
- Extended schema cleanup every `extendedSchema_cacheTTL` seconds (default: 86400)
### Request Validation (Runtime)
**Core Protocol Validation** (always runs):
1. **Parse Request**: Unmarshal JSON and extract `context.action`
2. **Lookup Schema**: O(1) lookup in action index (built at load time)
3. **Validate**: Call `schema.Value.VisitJSON()` with:
- Required fields validation
- Data type validation (string, number, boolean, object, array)
- Format validation (email, uri, date-time, uuid, etc.)
- Constraint validation (min/max, pattern, enum, const)
- Nested object and array validation
5. **Return Errors**: Returns validation errors in ONIX format
4. **Return Errors**: If validation fails, format and return errors
**Extended Schema Validation** (if `extendedSchema_enabled: "true"` AND core validation passed):
5. **Scan for @context**: Recursively traverse `message` field for objects with `@context` and `@type`
6. **Filter Core Schemas**: Skip objects with `/schema/core/` in `@context` URL
7. **Validate Each Domain Object**:
- Check domain whitelist (if `allowedDomains` configured)
- Transform `@context` URL: `context.jsonld``attributes.yaml`
- Load schema from URL/file (check cache first, download if miss)
- Find schema by `@type` (direct match or `x-jsonld.@type` fallback)
- Strip `@context` and `@type` metadata from object
- Validate remaining data against domain schema
- Prefix error paths with object location (e.g., `message.order.field`)
8. **Return Errors**: Returns first validation error (fail-fast)
## Action-Based Matching
@@ -120,7 +157,33 @@ schemaValidator:
cacheTTL: "3600"
```
### With Extended Schema Validation
```yaml
schemaValidator:
id: schemav2validator
config:
type: url
location: https://raw.githubusercontent.com/beckn/protocol-specifications-new/refs/heads/draft/api-specs/beckn-protocol-api.yaml
cacheTTL: "3600"
extendedSchema_enabled: "true"
extendedSchema_cacheTTL: "86400"
extendedSchema_maxCacheSize: "100"
extendedSchema_downloadTimeout: "30"
extendedSchema_allowedDomains: "raw.githubusercontent.com,schemas.beckn.org"
```
**At Load Time**:
- Creates LRU cache for domain schemas (max 100 entries)
- Starts background goroutine for cache cleanup every 24 hours
**At Runtime** (after core validation passes):
- Scans `message` field for objects with `@context` and `@type`
- Skips core Beckn schemas (containing `/schema/core/`)
- Downloads domain schemas from `@context` URLs (cached for 24 hours)
- Validates domain-specific data against schemas
- Returns errors with full JSON paths (e.g., `message.order.chargingRate`)
- Fail-fast: returns on first validation error
## Dependencies

View File

@@ -4,6 +4,7 @@ import (
"context"
"errors"
"strconv"
"strings"
"github.com/beckn-one/beckn-onix/pkg/plugin/definition"
"github.com/beckn-one/beckn-onix/pkg/plugin/implementation/schemav2validator"
@@ -40,6 +41,34 @@ func (vp schemav2ValidatorProvider) New(ctx context.Context, config map[string]s
}
}
// NEW: Parse extendedSchema_enabled
if enableStr, ok := config["extendedSchema_enabled"]; ok {
cfg.EnableExtendedSchema = enableStr == "true"
}
// NEW: Parse Extended Schema config (if enabled)
if cfg.EnableExtendedSchema {
if v, ok := config["extendedSchema_cacheTTL"]; ok {
if ttl, err := strconv.Atoi(v); err == nil && ttl > 0 {
cfg.ExtendedSchemaConfig.CacheTTL = ttl
}
}
if v, ok := config["extendedSchema_maxCacheSize"]; ok {
if size, err := strconv.Atoi(v); err == nil && size > 0 {
cfg.ExtendedSchemaConfig.MaxCacheSize = size
}
}
if v, ok := config["extendedSchema_downloadTimeout"]; ok {
if timeout, err := strconv.Atoi(v); err == nil && timeout > 0 {
cfg.ExtendedSchemaConfig.DownloadTimeout = timeout
}
}
if v, ok := config["extendedSchema_allowedDomains"]; ok && v != "" {
cfg.ExtendedSchemaConfig.AllowedDomains = strings.Split(v, ",")
}
}
return schemav2validator.New(ctx, cfg)
}

View File

@@ -0,0 +1,406 @@
package schemav2validator
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"net/url"
"strings"
"sync"
"time"
"github.com/beckn-one/beckn-onix/pkg/log"
"github.com/beckn-one/beckn-onix/pkg/model"
"github.com/getkin/kin-openapi/openapi3"
)
// ExtendedSchemaConfig holds configuration for referenced schema validation.
type ExtendedSchemaConfig struct {
CacheTTL int // seconds, default 86400 (24h)
MaxCacheSize int // default 100
DownloadTimeout int // seconds, default 30
AllowedDomains []string // whitelist (empty = all allowed)
}
// referencedObject represents a domain-specific object with @context.
type referencedObject struct {
Path string
Context string
Type string
Data map[string]interface{}
}
// schemaCache caches loaded domain schemas with LRU eviction.
type schemaCache struct {
mu sync.RWMutex
schemas map[string]*cachedDomainSchema
maxSize int
}
// cachedDomainSchema holds a cached domain schema with metadata.
type cachedDomainSchema struct {
doc *openapi3.T
loadedAt time.Time
expiresAt time.Time
lastAccessed time.Time
accessCount int64
}
// isCoreSchema checks if @context URL is a core Beckn schema.
func isCoreSchema(contextURL string) bool {
return strings.Contains(contextURL, "/schema/core/")
}
// validateExtendedSchemas validates all objects with @context against their schemas.
func (v *schemav2Validator) validateExtendedSchemas(ctx context.Context, body interface{}) error {
// Extract "message" object - scan inside message
bodyMap, ok := body.(map[string]interface{})
if !ok {
return fmt.Errorf("body is not a valid JSON object")
}
message, hasMessage := bodyMap["message"]
if !hasMessage {
return fmt.Errorf("missing 'message' field in request body")
}
// Find domain-specific objects with @context (skip core schemas)
objects := findReferencedObjects(message, "message")
if len(objects) == 0 {
log.Debugf(ctx, "No domain-specific objects with @context found, skipping Extended Schema validation")
return nil
}
log.Debugf(ctx, "Found %d domain-specific objects with @context for Extended Schema validation", len(objects))
// Get config with defaults
ttl := 86400 * time.Second // 24 hours default
timeout := 30 * time.Second
var allowedDomains []string
refConfig := v.config.ExtendedSchemaConfig
if refConfig.CacheTTL > 0 {
ttl = time.Duration(refConfig.CacheTTL) * time.Second
}
if refConfig.DownloadTimeout > 0 {
timeout = time.Duration(refConfig.DownloadTimeout) * time.Second
}
allowedDomains = refConfig.AllowedDomains
log.Debugf(ctx, "Extended Schema config: ttl=%v, timeout=%v, allowedDomains=%v",
ttl, timeout, allowedDomains)
// Validate each object
for _, obj := range objects {
log.Debugf(ctx, "Validating object at path: %s, @context: %s, @type: %s",
obj.Path, obj.Context, obj.Type)
if err := v.schemaCache.validateReferencedObject(ctx, obj, ttl, timeout, allowedDomains); err != nil {
// Extract and prefix error paths
var schemaErrors []model.Error
v.extractSchemaErrors(err, &schemaErrors)
// Prefix all paths with object path
for i := range schemaErrors {
if schemaErrors[i].Paths != "" {
schemaErrors[i].Paths = obj.Path + "." + schemaErrors[i].Paths
} else {
schemaErrors[i].Paths = obj.Path
}
}
return &model.SchemaValidationErr{Errors: schemaErrors}
}
}
return nil
}
// newSchemaCache creates a new schema cache.
func newSchemaCache(maxSize int) *schemaCache {
return &schemaCache{
schemas: make(map[string]*cachedDomainSchema),
maxSize: maxSize,
}
}
// hashURL creates a SHA256 hash of the URL for use as cache key.
func hashURL(urlStr string) string {
hash := sha256.Sum256([]byte(urlStr))
return hex.EncodeToString(hash[:])
}
// isValidSchemaPath validates if the schema path is safe to load.
func isValidSchemaPath(schemaPath string) bool {
u, err := url.Parse(schemaPath)
if err != nil {
// Could be a simple file path
return schemaPath != ""
}
// Support: http://, https://, file://, or no scheme (local path)
return u.Scheme == "http" || u.Scheme == "https" ||
u.Scheme == "file" || u.Scheme == ""
}
// get retrieves a cached schema and updates access tracking.
func (c *schemaCache) get(urlHash string) (*openapi3.T, bool) {
c.mu.Lock()
defer c.mu.Unlock()
cached, exists := c.schemas[urlHash]
if !exists || time.Now().After(cached.expiresAt) {
return nil, false
}
// Update access tracking
cached.lastAccessed = time.Now()
cached.accessCount++
return cached.doc, true
}
// set stores a schema in the cache with TTL and LRU eviction.
func (c *schemaCache) set(urlHash string, doc *openapi3.T, ttl time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
// LRU eviction if cache is full
if len(c.schemas) >= c.maxSize {
var oldest string
var oldestTime time.Time
for k, v := range c.schemas {
if oldest == "" || v.lastAccessed.Before(oldestTime) {
oldest, oldestTime = k, v.lastAccessed
}
}
if oldest != "" {
delete(c.schemas, oldest)
}
}
c.schemas[urlHash] = &cachedDomainSchema{
doc: doc,
loadedAt: time.Now(),
expiresAt: time.Now().Add(ttl),
lastAccessed: time.Now(),
accessCount: 1,
}
}
// cleanupExpired removes expired schemas from cache.
func (c *schemaCache) cleanupExpired() int {
c.mu.Lock()
defer c.mu.Unlock()
now := time.Now()
expired := make([]string, 0)
for urlHash, cached := range c.schemas {
if now.After(cached.expiresAt) {
expired = append(expired, urlHash)
}
}
for _, urlHash := range expired {
delete(c.schemas, urlHash)
}
return len(expired)
}
// loadSchemaFromPath loads a schema from URL or local file with timeout and caching.
func (c *schemaCache) loadSchemaFromPath(ctx context.Context, schemaPath string, ttl, timeout time.Duration) (*openapi3.T, error) {
urlHash := hashURL(schemaPath)
// Check cache first
if doc, found := c.get(urlHash); found {
log.Debugf(ctx, "Schema cache hit for: %s", schemaPath)
return doc, nil
}
log.Debugf(ctx, "Schema cache miss, loading from: %s", schemaPath)
// Validate path format
if !isValidSchemaPath(schemaPath) {
return nil, fmt.Errorf("invalid schema path: %s", schemaPath)
}
loader := openapi3.NewLoader()
loader.IsExternalRefsAllowed = true
var doc *openapi3.T
var err error
u, parseErr := url.Parse(schemaPath)
if parseErr == nil && (u.Scheme == "http" || u.Scheme == "https") {
// Load from URL with timeout
loadCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
loader.Context = loadCtx
doc, err = loader.LoadFromURI(u)
} else {
// Load from local file (file:// or path)
filePath := schemaPath
if u != nil && u.Scheme == "file" {
filePath = u.Path
}
doc, err = loader.LoadFromFile(filePath)
}
if err != nil {
log.Errorf(ctx, err, "Failed to load schema from: %s", schemaPath)
return nil, fmt.Errorf("failed to load schema from %s: %w", schemaPath, err)
}
// Validate loaded schema (non-blocking, just log warnings)
if err := doc.Validate(ctx); err != nil {
log.Debugf(ctx, "Schema validation warnings for %s: %v", schemaPath, err)
}
c.set(urlHash, doc, ttl)
log.Debugf(ctx, "Loaded and cached schema from: %s", schemaPath)
return doc, nil
}
// findReferencedObjects recursively finds domain-specific objects with @context .
func findReferencedObjects(data interface{}, path string) []referencedObject {
var results []referencedObject
switch v := data.(type) {
case map[string]interface{}:
// Check for @context and @type
if contextVal, hasContext := v["@context"].(string); hasContext {
if typeVal, hasType := v["@type"].(string); hasType {
// Skip core schemas during traversal
if !isCoreSchema(contextVal) {
results = append(results, referencedObject{
Path: path,
Context: contextVal,
Type: typeVal,
Data: v,
})
}
}
}
// Recurse into nested objects
for key, val := range v {
newPath := key
if path != "" {
newPath = path + "." + key
}
results = append(results, findReferencedObjects(val, newPath)...)
}
case []interface{}:
// Recurse into arrays
for i, item := range v {
newPath := fmt.Sprintf("%s[%d]", path, i)
results = append(results, findReferencedObjects(item, newPath)...)
}
}
return results
}
// transformContextToSchemaURL transforms @context URL to schema URL.
func transformContextToSchemaURL(contextURL string) string {
// transformation: context.jsonld -> attributes.yaml
return strings.Replace(contextURL, "context.jsonld", "attributes.yaml", 1)
}
// findSchemaByType finds a schema in the document by @type value.
func findSchemaByType(ctx context.Context, doc *openapi3.T, typeName string) (*openapi3.SchemaRef, error) {
if doc.Components == nil || doc.Components.Schemas == nil {
return nil, fmt.Errorf("no schemas found in document")
}
// Try direct match by schema name
if schema, exists := doc.Components.Schemas[typeName]; exists {
log.Debugf(ctx, "Found schema by direct match: %s", typeName)
return schema, nil
}
// Fallback: Try x-jsonld.@type match
for name, schema := range doc.Components.Schemas {
if schema.Value == nil {
continue
}
if xJsonld, ok := schema.Value.Extensions["x-jsonld"].(map[string]interface{}); ok {
if atType, ok := xJsonld["@type"].(string); ok && atType == typeName {
log.Debugf(ctx, "Found schema by x-jsonld.@type match: %s (mapped to %s)", typeName, name)
return schema, nil
}
}
}
return nil, fmt.Errorf("no schema found for @type: %s", typeName)
}
// isAllowedDomain checks if the URL domain is in the whitelist.
func isAllowedDomain(schemaURL string, allowedDomains []string) bool {
if len(allowedDomains) == 0 {
return true // No whitelist = all allowed
}
for _, domain := range allowedDomains {
if strings.Contains(schemaURL, domain) {
return true
}
}
return false
}
// validateReferencedObject validates a single object with @context.
func (c *schemaCache) validateReferencedObject(
ctx context.Context,
obj referencedObject,
ttl, timeout time.Duration,
allowedDomains []string,
) error {
// Domain whitelist check
if !isAllowedDomain(obj.Context, allowedDomains) {
log.Warnf(ctx, "Domain not in whitelist: %s", obj.Context)
return fmt.Errorf("domain not allowed: %s", obj.Context)
}
// Transform @context to schema path (URL or file)
schemaPath := transformContextToSchemaURL(obj.Context)
log.Debugf(ctx, "Transformed %s -> %s", obj.Context, schemaPath)
// Load schema with timeout (supports URL or local file)
doc, err := c.loadSchemaFromPath(ctx, schemaPath, ttl, timeout)
if err != nil {
return fmt.Errorf("at %s: %w", obj.Path, err)
}
// Find schema by @type
schema, err := findSchemaByType(ctx, doc, obj.Type)
if err != nil {
log.Errorf(ctx, err, "Schema not found for @type: %s at path: %s", obj.Type, obj.Path)
return fmt.Errorf("at %s: %w", obj.Path, err)
}
// Strip JSON-LD metadata before validation
domainData := make(map[string]interface{}, len(obj.Data)-2)
for k, v := range obj.Data {
if k != "@context" && k != "@type" {
domainData[k] = v
}
}
// Validate domain-specific data against schema
opts := []openapi3.SchemaValidationOption{
openapi3.VisitAsRequest(),
openapi3.EnableFormatValidation(),
}
if err := schema.Value.VisitJSON(domainData, opts...); err != nil {
log.Debugf(ctx, "Validation failed for @type: %s at path: %s: %v", obj.Type, obj.Path, err)
return err
}
log.Debugf(ctx, "Validation passed for @type: %s at path: %s", obj.Type, obj.Path)
return nil
}

View File

@@ -0,0 +1,709 @@
package schemav2validator
import (
"context"
"os"
"testing"
"time"
"github.com/getkin/kin-openapi/openapi3"
"github.com/stretchr/testify/assert"
)
func TestIsCoreSchema(t *testing.T) {
tests := []struct {
name string
contextURL string
want bool
}{
{
name: "core schema URL",
contextURL: "https://raw.githubusercontent.com/beckn/protocol-specifications-new/refs/heads/draft/schema/core/v2/context.jsonld",
want: true,
},
{
name: "domain schema URL",
contextURL: "https://raw.githubusercontent.com/beckn/protocol-specifications-new/refs/heads/draft/schema/EvChargingOffer/v1/context.jsonld",
want: false,
},
{
name: "empty URL",
contextURL: "",
want: false,
},
{
name: "URL without schema/core",
contextURL: "https://example.com/some/path/context.jsonld",
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := isCoreSchema(tt.contextURL)
assert.Equal(t, tt.want, got)
})
}
}
func TestFindReferencedObjects(t *testing.T) {
tests := []struct {
name string
data interface{}
path string
want int // number of objects found
}{
{
name: "single domain object",
data: map[string]interface{}{
"@context": "https://example.com/schema/DomainType/v1/context.jsonld",
"@type": "DomainType",
"field": "value",
},
path: "message",
want: 1,
},
{
name: "core schema object - should be skipped",
data: map[string]interface{}{
"@context": "https://example.com/schema/core/v2/context.jsonld",
"@type": "beckn:Order",
"field": "value",
},
path: "message",
want: 0,
},
{
name: "nested domain objects",
data: map[string]interface{}{
"order": map[string]interface{}{
"@context": "https://example.com/schema/core/v2/context.jsonld",
"@type": "beckn:Order",
"orderAttributes": map[string]interface{}{
"@context": "https://example.com/schema/ChargingSession/v1/context.jsonld",
"@type": "ChargingSession",
"field": "value",
},
},
},
path: "message",
want: 1, // Only domain object, core skipped
},
{
name: "array with domain objects",
data: map[string]interface{}{
"items": []interface{}{
map[string]interface{}{
"@context": "https://example.com/schema/DomainType/v1/context.jsonld",
"@type": "DomainType",
},
map[string]interface{}{
"@context": "https://example.com/schema/AnotherType/v1/context.jsonld",
"@type": "AnotherType",
},
},
},
path: "message",
want: 2,
},
{
name: "object without @context",
data: map[string]interface{}{
"field": "value",
},
path: "message",
want: 0,
},
{
name: "object with @context but no @type",
data: map[string]interface{}{
"@context": "https://example.com/schema/DomainType/v1/context.jsonld",
"field": "value",
},
path: "message",
want: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := findReferencedObjects(tt.data, tt.path)
assert.Equal(t, tt.want, len(got))
})
}
}
func TestTransformContextToSchemaURL(t *testing.T) {
tests := []struct {
name string
contextURL string
want string
}{
{
name: "standard transformation",
contextURL: "https://example.com/schema/EvChargingOffer/v1/context.jsonld",
want: "https://example.com/schema/EvChargingOffer/v1/attributes.yaml",
},
{
name: "already attributes.yaml",
contextURL: "https://example.com/schema/EvChargingOffer/v1/attributes.yaml",
want: "https://example.com/schema/EvChargingOffer/v1/attributes.yaml",
},
{
name: "no context.jsonld in URL",
contextURL: "https://example.com/schema/EvChargingOffer/v1/",
want: "https://example.com/schema/EvChargingOffer/v1/",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := transformContextToSchemaURL(tt.contextURL)
assert.Equal(t, tt.want, got)
})
}
}
func TestHashURL(t *testing.T) {
tests := []struct {
name string
url string
}{
{
name: "consistent hashing",
url: "https://example.com/schema.yaml",
},
{
name: "empty string",
url: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
hash1 := hashURL(tt.url)
hash2 := hashURL(tt.url)
// Same URL should produce same hash
assert.Equal(t, hash1, hash2)
// Hash should be 64 characters (SHA256 hex)
assert.Equal(t, 64, len(hash1))
})
}
}
func TestIsValidSchemaPath(t *testing.T) {
tests := []struct {
name string
schemaPath string
want bool
}{
{
name: "http URL",
schemaPath: "http://example.com/schema.yaml",
want: true,
},
{
name: "https URL",
schemaPath: "https://example.com/schema.yaml",
want: true,
},
{
name: "file URL",
schemaPath: "file:///path/to/schema.yaml",
want: true,
},
{
name: "local path",
schemaPath: "/path/to/schema.yaml",
want: true,
},
{
name: "relative path",
schemaPath: "./schema.yaml",
want: true,
},
{
name: "empty path",
schemaPath: "",
want: true, // url.Parse("") succeeds, returns empty scheme
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := isValidSchemaPath(tt.schemaPath)
assert.Equal(t, tt.want, got)
})
}
}
func TestNewSchemaCache(t *testing.T) {
tests := []struct {
name string
maxSize int
}{
{
name: "default size",
maxSize: 100,
},
{
name: "custom size",
maxSize: 50,
},
{
name: "zero size",
maxSize: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := newSchemaCache(tt.maxSize)
assert.NotNil(t, cache)
assert.Equal(t, tt.maxSize, cache.maxSize)
assert.NotNil(t, cache.schemas)
assert.Equal(t, 0, len(cache.schemas))
})
}
}
func TestSchemaCache_GetSet(t *testing.T) {
cache := newSchemaCache(10)
// Create a simple schema doc
doc := &openapi3.T{
OpenAPI: "3.1.0",
}
urlHash := hashURL("https://example.com/schema.yaml")
ttl := 1 * time.Hour
// Test Set
cache.set(urlHash, doc, ttl)
// Test Get - should find it
retrieved, found := cache.get(urlHash)
assert.True(t, found)
assert.Equal(t, doc, retrieved)
// Test Get - non-existent key
_, found = cache.get("non-existent-hash")
assert.False(t, found)
}
func TestSchemaCache_LRUEviction(t *testing.T) {
cache := newSchemaCache(2) // Small cache for testing
doc1 := &openapi3.T{OpenAPI: "3.1.0"}
doc2 := &openapi3.T{OpenAPI: "3.1.1"}
doc3 := &openapi3.T{OpenAPI: "3.1.2"}
ttl := 1 * time.Hour
// Add first two items
cache.set("hash1", doc1, ttl)
cache.set("hash2", doc2, ttl)
// Access first item to make it more recent
cache.get("hash1")
// Add third item - should evict hash2 (least recently used)
cache.set("hash3", doc3, ttl)
// Verify hash1 and hash3 exist, hash2 was evicted
_, found1 := cache.get("hash1")
_, found2 := cache.get("hash2")
_, found3 := cache.get("hash3")
assert.True(t, found1, "hash1 should exist (recently accessed)")
assert.False(t, found2, "hash2 should be evicted (LRU)")
assert.True(t, found3, "hash3 should exist (just added)")
}
func TestSchemaCache_TTLExpiry(t *testing.T) {
cache := newSchemaCache(10)
doc := &openapi3.T{OpenAPI: "3.1.0"}
urlHash := "test-hash"
// Set with very short TTL
cache.set(urlHash, doc, 1*time.Millisecond)
// Should be found immediately
_, found := cache.get(urlHash)
assert.True(t, found)
// Wait for expiry
time.Sleep(10 * time.Millisecond)
// Should not be found after expiry
_, found = cache.get(urlHash)
assert.False(t, found)
}
func TestSchemaCache_CleanupExpired(t *testing.T) {
cache := newSchemaCache(10)
doc := &openapi3.T{OpenAPI: "3.1.0"}
// Add items with short TTL
cache.set("hash1", doc, 1*time.Millisecond)
cache.set("hash2", doc, 1*time.Millisecond)
cache.set("hash3", doc, 1*time.Hour) // This one won't expire
// Wait for expiry
time.Sleep(10 * time.Millisecond)
// Cleanup expired
count := cache.cleanupExpired()
// Should have cleaned up 2 expired items
assert.Equal(t, 2, count)
// Verify only hash3 remains
cache.mu.RLock()
assert.Equal(t, 1, len(cache.schemas))
_, exists := cache.schemas["hash3"]
assert.True(t, exists)
cache.mu.RUnlock()
}
func TestIsAllowedDomain(t *testing.T) {
tests := []struct {
name string
schemaURL string
allowedDomains []string
want bool
}{
{
name: "empty whitelist - all allowed",
schemaURL: "https://example.com/schema.yaml",
allowedDomains: []string{},
want: true,
},
{
name: "nil whitelist - all allowed",
schemaURL: "https://example.com/schema.yaml",
allowedDomains: nil,
want: true,
},
{
name: "domain in whitelist",
schemaURL: "https://raw.githubusercontent.com/beckn/schema.yaml",
allowedDomains: []string{"raw.githubusercontent.com", "schemas.beckn.org"},
want: true,
},
{
name: "domain not in whitelist",
schemaURL: "https://malicious.com/schema.yaml",
allowedDomains: []string{"raw.githubusercontent.com", "schemas.beckn.org"},
want: false,
},
{
name: "partial domain match",
schemaURL: "https://raw.githubusercontent.com/beckn/schema.yaml",
allowedDomains: []string{"githubusercontent.com"},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := isAllowedDomain(tt.schemaURL, tt.allowedDomains)
assert.Equal(t, tt.want, got)
})
}
}
func TestFindReferencedObjects_PathBuilding(t *testing.T) {
data := map[string]interface{}{
"order": map[string]interface{}{
"beckn:orderItems": []interface{}{
map[string]interface{}{
"beckn:acceptedOffer": map[string]interface{}{
"beckn:offerAttributes": map[string]interface{}{
"@context": "https://example.com/schema/ChargingOffer/v1/context.jsonld",
"@type": "ChargingOffer",
},
},
},
},
},
}
objects := findReferencedObjects(data, "message")
assert.Equal(t, 1, len(objects))
assert.Equal(t, "message.order.beckn:orderItems[0].beckn:acceptedOffer.beckn:offerAttributes", objects[0].Path)
assert.Equal(t, "ChargingOffer", objects[0].Type)
}
// Integration tests for the 4 remaining functions
func TestLoadSchemaFromPath_LocalFile(t *testing.T) {
cache := newSchemaCache(10)
ctx := context.Background()
tmpFile, err := os.CreateTemp("", "test-schema-*.yaml")
assert.NoError(t, err)
defer os.Remove(tmpFile.Name())
schemaContent := `openapi: 3.1.0
info:
title: Test Schema
version: 1.0.0
components:
schemas:
TestType:
type: object
properties:
field1:
type: string`
_, err = tmpFile.Write([]byte(schemaContent))
assert.NoError(t, err)
tmpFile.Close()
doc, err := cache.loadSchemaFromPath(ctx, tmpFile.Name(), 1*time.Hour, 30*time.Second)
assert.NoError(t, err)
assert.NotNil(t, doc)
assert.Equal(t, "3.1.0", doc.OpenAPI)
}
func TestLoadSchemaFromPath_CacheHit(t *testing.T) {
cache := newSchemaCache(10)
ctx := context.Background()
tmpFile, err := os.CreateTemp("", "test-schema-*.yaml")
assert.NoError(t, err)
defer os.Remove(tmpFile.Name())
schemaContent := `openapi: 3.1.0
info:
title: Test Schema
version: 1.0.0`
tmpFile.Write([]byte(schemaContent))
tmpFile.Close()
doc1, err := cache.loadSchemaFromPath(ctx, tmpFile.Name(), 1*time.Hour, 30*time.Second)
assert.NoError(t, err)
doc2, err := cache.loadSchemaFromPath(ctx, tmpFile.Name(), 1*time.Hour, 30*time.Second)
assert.NoError(t, err)
assert.Equal(t, doc1, doc2)
}
func TestLoadSchemaFromPath_InvalidPath(t *testing.T) {
cache := newSchemaCache(10)
ctx := context.Background()
_, err := cache.loadSchemaFromPath(ctx, "/nonexistent/schema.yaml", 1*time.Hour, 30*time.Second)
assert.Error(t, err)
}
func TestFindSchemaByType_DirectMatch(t *testing.T) {
cache := newSchemaCache(10)
ctx := context.Background()
tmpFile, err := os.CreateTemp("", "test-schema-*.yaml")
assert.NoError(t, err)
defer os.Remove(tmpFile.Name())
schemaContent := `openapi: 3.1.0
info:
title: Test Schema
version: 1.0.0
components:
schemas:
TestType:
type: object
properties:
field1:
type: string`
tmpFile.Write([]byte(schemaContent))
tmpFile.Close()
doc, err := cache.loadSchemaFromPath(ctx, tmpFile.Name(), 1*time.Hour, 30*time.Second)
assert.NoError(t, err)
schema, err := findSchemaByType(ctx, doc, "TestType")
assert.NoError(t, err)
assert.NotNil(t, schema)
}
func TestFindSchemaByType_NotFound(t *testing.T) {
cache := newSchemaCache(10)
ctx := context.Background()
tmpFile, err := os.CreateTemp("", "test-schema-*.yaml")
assert.NoError(t, err)
defer os.Remove(tmpFile.Name())
schemaContent := `openapi: 3.1.0
info:
title: Test Schema
version: 1.0.0
components:
schemas:
TestType:
type: object`
tmpFile.Write([]byte(schemaContent))
tmpFile.Close()
doc, err := cache.loadSchemaFromPath(ctx, tmpFile.Name(), 1*time.Hour, 30*time.Second)
assert.NoError(t, err)
_, err = findSchemaByType(ctx, doc, "NonExistentType")
assert.Error(t, err)
assert.Contains(t, err.Error(), "no schema found")
}
func TestValidateReferencedObject_Valid(t *testing.T) {
cache := newSchemaCache(10)
ctx := context.Background()
tmpFile, err := os.CreateTemp("", "test-schema-*.yaml")
assert.NoError(t, err)
defer os.Remove(tmpFile.Name())
schemaContent := `openapi: 3.1.0
info:
title: Test Schema
version: 1.0.0
components:
schemas:
TestType:
type: object
additionalProperties: false
x-jsonld:
"@context": ./context.jsonld
"@type": TestType
properties:
field1:
type: string
required:
- field1`
tmpFile.Write([]byte(schemaContent))
tmpFile.Close()
obj := referencedObject{
Path: "message.test",
Context: tmpFile.Name(),
Type: "TestType",
Data: map[string]interface{}{
"@context": tmpFile.Name(),
"@type": "TestType",
"field1": "value1",
},
}
err = cache.validateReferencedObject(ctx, obj, 1*time.Hour, 30*time.Second, nil)
assert.NoError(t, err)
}
func TestValidateReferencedObject_Invalid(t *testing.T) {
cache := newSchemaCache(10)
ctx := context.Background()
tmpFile, err := os.CreateTemp("", "test-schema-*.yaml")
assert.NoError(t, err)
defer os.Remove(tmpFile.Name())
schemaContent := `openapi: 3.1.0
info:
title: Test Schema
version: 1.0.0
components:
schemas:
TestType:
type: object
additionalProperties: false
x-jsonld:
"@context": ./context.jsonld
"@type": TestType
properties:
field1:
type: string
required:
- field1`
tmpFile.Write([]byte(schemaContent))
tmpFile.Close()
obj := referencedObject{
Path: "message.test",
Context: tmpFile.Name(),
Type: "TestType",
Data: map[string]interface{}{
"@context": tmpFile.Name(),
"@type": "TestType",
},
}
err = cache.validateReferencedObject(ctx, obj, 1*time.Hour, 30*time.Second, nil)
assert.Error(t, err)
}
func TestValidateReferencedObject_DomainNotAllowed(t *testing.T) {
cache := newSchemaCache(10)
ctx := context.Background()
obj := referencedObject{
Path: "message.test",
Context: "https://malicious.com/schema.yaml",
Type: "TestType",
Data: map[string]interface{}{},
}
allowedDomains := []string{"trusted.com"}
err := cache.validateReferencedObject(ctx, obj, 1*time.Hour, 30*time.Second, allowedDomains)
assert.Error(t, err)
assert.Contains(t, err.Error(), "domain not allowed")
}
func TestValidateExtendedSchemas_NoObjects(t *testing.T) {
v := &schemav2Validator{
config: &Config{
EnableExtendedSchema: true,
ExtendedSchemaConfig: ExtendedSchemaConfig{},
},
schemaCache: newSchemaCache(10),
}
ctx := context.Background()
body := map[string]interface{}{
"message": map[string]interface{}{
"field": "value",
},
}
err := v.validateExtendedSchemas(ctx, body)
assert.NoError(t, err)
}
func TestValidateExtendedSchemas_MissingMessage(t *testing.T) {
v := &schemav2Validator{
config: &Config{
EnableExtendedSchema: true,
},
schemaCache: newSchemaCache(10),
}
ctx := context.Background()
body := map[string]interface{}{
"context": map[string]interface{}{},
}
err := v.validateExtendedSchemas(ctx, body)
assert.Error(t, err)
assert.Contains(t, err.Error(), "missing 'message' field")
}

View File

@@ -27,6 +27,7 @@ type schemav2Validator struct {
config *Config
spec *cachedSpec
specMutex sync.RWMutex
schemaCache *schemaCache // cache for extended schemas
}
// cachedSpec holds a cached OpenAPI spec.
@@ -41,6 +42,10 @@ type Config struct {
Type string // "url", "file", or "dir"
Location string // URL, file path, or directory path
CacheTTL int
// Extended Schema configuration
EnableExtendedSchema bool
ExtendedSchemaConfig ExtendedSchemaConfig
}
// New creates a new Schemav2Validator instance.
@@ -66,6 +71,16 @@ func New(ctx context.Context, config *Config) (*schemav2Validator, func() error,
config: config,
}
// Initialize extended schema cache if enabled
if config.EnableExtendedSchema {
maxSize := 100
if config.ExtendedSchemaConfig.MaxCacheSize > 0 {
maxSize = config.ExtendedSchemaConfig.MaxCacheSize
}
v.schemaCache = newSchemaCache(maxSize)
log.Infof(ctx, "Initialized extended schema cache with max size: %d", maxSize)
}
if err := v.initialise(ctx); err != nil {
return nil, nil, fmt.Errorf("failed to initialise schemav2Validator: %v", err)
}
@@ -119,6 +134,19 @@ func (v *schemav2Validator) Validate(ctx context.Context, reqURL *url.URL, data
return v.formatValidationError(err)
}
log.Debugf(ctx, "base schema validation passed for action: %s", action)
// Extended Schema validation (if enabled)
if v.config.EnableExtendedSchema && v.schemaCache != nil {
log.Debugf(ctx, "Starting Extended Schema validation for action: %s", action)
if err := v.validateExtendedSchemas(ctx, jsonData); err != nil {
// Extended Schema failure - return error
log.Debugf(ctx, "Extended Schema validation failed for action %s: %v", action, err)
return err
}
log.Debugf(ctx, "Extended Schema validation passed for action: %s", action)
}
return nil
}
@@ -181,15 +209,36 @@ func (v *schemav2Validator) loadSpec(ctx context.Context) error {
// refreshLoop periodically reloads expired specs based on TTL.
func (v *schemav2Validator) refreshLoop(ctx context.Context) {
ticker := time.NewTicker(time.Duration(v.config.CacheTTL) * time.Second)
defer ticker.Stop()
coreTicker := time.NewTicker(time.Duration(v.config.CacheTTL) * time.Second)
defer coreTicker.Stop()
// Ticker for extended schema cleanup
var refTicker *time.Ticker
var refTickerCh <-chan time.Time // Default nil, blocks forever
if v.config.EnableExtendedSchema {
ttl := v.config.ExtendedSchemaConfig.CacheTTL
if ttl <= 0 {
ttl = 86400 // Default 24 hours
}
refTicker = time.NewTicker(time.Duration(ttl) * time.Second)
defer refTicker.Stop()
refTickerCh = refTicker.C
}
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
case <-coreTicker.C:
v.reloadExpiredSpec(ctx)
case <-refTickerCh:
if v.schemaCache != nil {
count := v.schemaCache.cleanupExpired()
if count > 0 {
log.Debugf(ctx, "Cleaned up %d expired extended schemas", count)
}
}
}
}
}