Implement Policy Enforcer Plugin
- Added a new Policy Enforcer plugin to evaluate incoming messages against OPA policies. - Configurable via YAML with options for policy sources, actions, and query. - Integrated into existing configuration files for BAP and BPP. - Updated related tests and documentation for the new functionality. - Enhanced plugin manager to support Policy Enforcer instantiation.
This commit is contained in:
17
pkg/plugin/definition/policyEnforcer.go
Normal file
17
pkg/plugin/definition/policyEnforcer.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package definition
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/model"
|
||||
)
|
||||
|
||||
// PolicyEnforcer interface for policy enforcement on incoming messages.
|
||||
type PolicyEnforcer interface {
|
||||
Run(ctx *model.StepContext) error
|
||||
}
|
||||
|
||||
// PolicyEnforcerProvider interface for creating policy enforcers.
|
||||
type PolicyEnforcerProvider interface {
|
||||
New(ctx context.Context, config map[string]string) (PolicyEnforcer, func(), error)
|
||||
}
|
||||
68
pkg/plugin/implementation/policyenforcer/README.md
Normal file
68
pkg/plugin/implementation/policyenforcer/README.md
Normal file
@@ -0,0 +1,68 @@
|
||||
# Policy Enforcer Plugin
|
||||
|
||||
OPA/Rego-based policy enforcement for beckn-onix adapters. Evaluates incoming beckn messages against configurable policies and NACKs non-compliant requests.
|
||||
|
||||
## Overview
|
||||
|
||||
The `policyenforcer` plugin is a **Step plugin** that:
|
||||
- Loads `.rego` policy files from local directories, files, URLs, or local paths
|
||||
- Evaluates incoming messages against compiled OPA policies
|
||||
- Returns a `BadReqErr` (NACK) when policy violations are detected
|
||||
- Fails closed on evaluation errors (treats as NACK)
|
||||
- Is strictly **opt-in** — adapters that don't reference it are unaffected
|
||||
|
||||
## Configuration
|
||||
|
||||
All config keys are passed via `map[string]string` in the adapter YAML config.
|
||||
|
||||
| Key | Required | Default | Description |
|
||||
|-----|----------|---------|-------------|
|
||||
| `policyDir` | One of `policyDir`, `policyFile`, or `policyUrls` required | — | Local directory containing `.rego` files |
|
||||
| `policyFile` | | — | Single local `.rego` file path |
|
||||
| `policyUrls` | | — | Comma-separated list of URLs or local paths to `.rego` files |
|
||||
| `query` | No | `data.policy.violations` | Rego query returning violation strings |
|
||||
| `actions` | No | `confirm` | Comma-separated beckn actions to enforce |
|
||||
| `enabled` | No | `true` | Enable/disable the plugin |
|
||||
| `debugLogging` | No | `false` | Enable verbose logging |
|
||||
| *any other key* | No | — | Forwarded to Rego as `data.config.<key>` |
|
||||
|
||||
### Policy URLs
|
||||
|
||||
`policyUrls` accepts both remote URLs and local file paths, separated by commas:
|
||||
|
||||
```yaml
|
||||
config:
|
||||
policyUrls: "https://policies.example.com/compliance.rego,/etc/policies/local.rego,https://policies.example.com/safety.rego"
|
||||
```
|
||||
|
||||
### Air-Gapped Deployments
|
||||
|
||||
For environments without internet access, replace any URL with a local file path or volume mount:
|
||||
|
||||
```yaml
|
||||
config:
|
||||
policyUrls: "/mounted-policies/compliance.rego,/mounted-policies/safety.rego"
|
||||
```
|
||||
|
||||
## Example Config
|
||||
|
||||
```yaml
|
||||
plugins:
|
||||
steps:
|
||||
- id: policyenforcer
|
||||
config:
|
||||
policyUrls: "https://policies.example.com/compliance.rego,/local/policies/safety.rego"
|
||||
actions: "confirm,init"
|
||||
query: "data.policy.violations"
|
||||
minDeliveryLeadHours: "4"
|
||||
debugLogging: "true"
|
||||
```
|
||||
|
||||
## Relationship with Schema Validator
|
||||
|
||||
`policyenforcer` and `schemavalidator`/`schemav2validator` are **separate plugins** with different responsibilities:
|
||||
|
||||
- **Schema Validator**: Validates message **structure** against OpenAPI/JSON Schema specs
|
||||
- **Policy Enforcer**: Evaluates **business rules** via OPA/Rego policies
|
||||
|
||||
They use different plugin interfaces (`SchemaValidator` vs `Step`), different engines, and different error types. Configure them side-by-side in your adapter config as needed.
|
||||
26
pkg/plugin/implementation/policyenforcer/cmd/plugin.go
Normal file
26
pkg/plugin/implementation/policyenforcer/cmd/plugin.go
Normal file
@@ -0,0 +1,26 @@
|
||||
// Package main provides the plugin entry point for the Policy Enforcer plugin.
|
||||
// This file is compiled as a Go plugin (.so) and loaded by beckn-onix at runtime.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/plugin/definition"
|
||||
"github.com/beckn-one/beckn-onix/pkg/plugin/implementation/policyenforcer"
|
||||
)
|
||||
|
||||
// provider implements the PolicyEnforcerProvider interface for plugin loading.
|
||||
type provider struct{}
|
||||
|
||||
// New creates a new PolicyEnforcer instance.
|
||||
func (p provider) New(ctx context.Context, cfg map[string]string) (definition.PolicyEnforcer, func(), error) {
|
||||
enforcer, err := policyenforcer.New(cfg)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return enforcer, enforcer.Close, nil
|
||||
}
|
||||
|
||||
// Provider is the exported symbol that beckn-onix plugin manager looks up.
|
||||
var Provider = provider{}
|
||||
129
pkg/plugin/implementation/policyenforcer/config.go
Normal file
129
pkg/plugin/implementation/policyenforcer/config.go
Normal file
@@ -0,0 +1,129 @@
|
||||
package policyenforcer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Config holds the configuration for the Policy Enforcer plugin.
|
||||
type Config struct {
|
||||
// PolicyDir is a local directory containing .rego policy files (all loaded).
|
||||
// At least one policy source (PolicyDir, PolicyFile, or PolicyUrls) is required.
|
||||
PolicyDir string
|
||||
|
||||
// PolicyFile is a single local .rego file path.
|
||||
PolicyFile string
|
||||
|
||||
// PolicyUrls is a list of URLs (or local file paths) pointing to .rego files,
|
||||
// fetched at startup or read from disk.
|
||||
// Parsed from the comma-separated "policyUrls" config key.
|
||||
PolicyUrls []string
|
||||
|
||||
// Query is the Rego query that returns a set of violation strings.
|
||||
// Default: "data.policy.violations"
|
||||
Query string
|
||||
|
||||
// Actions is the list of beckn actions to enforce policies on.
|
||||
// Default: ["confirm"]
|
||||
Actions []string
|
||||
|
||||
// Enabled controls whether the plugin is active.
|
||||
Enabled bool
|
||||
|
||||
// DebugLogging enables verbose logging.
|
||||
DebugLogging bool
|
||||
|
||||
// RuntimeConfig holds arbitrary key-value pairs passed to Rego as data.config.
|
||||
// Keys like minDeliveryLeadHours are forwarded here.
|
||||
RuntimeConfig map[string]string
|
||||
}
|
||||
|
||||
// Known config keys that are handled directly (not forwarded to RuntimeConfig).
|
||||
var knownKeys = map[string]bool{
|
||||
"policyDir": true,
|
||||
"policyFile": true,
|
||||
"policyUrls": true,
|
||||
"query": true,
|
||||
"actions": true,
|
||||
"enabled": true,
|
||||
"debugLogging": true,
|
||||
}
|
||||
|
||||
// DefaultConfig returns a Config with sensible defaults.
|
||||
func DefaultConfig() *Config {
|
||||
return &Config{
|
||||
Query: "data.policy.violations",
|
||||
Actions: []string{"confirm"},
|
||||
Enabled: true,
|
||||
DebugLogging: false,
|
||||
RuntimeConfig: make(map[string]string),
|
||||
}
|
||||
}
|
||||
|
||||
// ParseConfig parses the plugin configuration map into a Config struct.
|
||||
func ParseConfig(cfg map[string]string) (*Config, error) {
|
||||
config := DefaultConfig()
|
||||
|
||||
if dir, ok := cfg["policyDir"]; ok && dir != "" {
|
||||
config.PolicyDir = dir
|
||||
}
|
||||
if file, ok := cfg["policyFile"]; ok && file != "" {
|
||||
config.PolicyFile = file
|
||||
}
|
||||
|
||||
// Legacy: comma-separated policyUrls
|
||||
if urls, ok := cfg["policyUrls"]; ok && urls != "" {
|
||||
for _, u := range strings.Split(urls, ",") {
|
||||
u = strings.TrimSpace(u)
|
||||
if u != "" {
|
||||
config.PolicyUrls = append(config.PolicyUrls, u)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if config.PolicyDir == "" && config.PolicyFile == "" && len(config.PolicyUrls) == 0 {
|
||||
return nil, fmt.Errorf("at least one policy source is required (policyDir, policyFile, or policyUrls)")
|
||||
}
|
||||
|
||||
if query, ok := cfg["query"]; ok && query != "" {
|
||||
config.Query = query
|
||||
}
|
||||
|
||||
if actions, ok := cfg["actions"]; ok && actions != "" {
|
||||
actionList := strings.Split(actions, ",")
|
||||
config.Actions = make([]string, 0, len(actionList))
|
||||
for _, action := range actionList {
|
||||
action = strings.TrimSpace(action)
|
||||
if action != "" {
|
||||
config.Actions = append(config.Actions, action)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if enabled, ok := cfg["enabled"]; ok {
|
||||
config.Enabled = enabled == "true" || enabled == "1"
|
||||
}
|
||||
|
||||
if debug, ok := cfg["debugLogging"]; ok {
|
||||
config.DebugLogging = debug == "true" || debug == "1"
|
||||
}
|
||||
|
||||
// Forward unknown keys to RuntimeConfig (e.g., minDeliveryLeadHours)
|
||||
for k, v := range cfg {
|
||||
if !knownKeys[k] {
|
||||
config.RuntimeConfig[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// IsActionEnabled checks if the given action is in the configured actions list.
|
||||
func (c *Config) IsActionEnabled(action string) bool {
|
||||
for _, a := range c.Actions {
|
||||
if a == action {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
106
pkg/plugin/implementation/policyenforcer/enforcer.go
Normal file
106
pkg/plugin/implementation/policyenforcer/enforcer.go
Normal file
@@ -0,0 +1,106 @@
|
||||
package policyenforcer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/log"
|
||||
"github.com/beckn-one/beckn-onix/pkg/model"
|
||||
)
|
||||
|
||||
// PolicyEnforcer is a Step plugin that evaluates beckn messages against
|
||||
// OPA policies and NACKs non-compliant messages.
|
||||
type PolicyEnforcer struct {
|
||||
config *Config
|
||||
evaluator *Evaluator
|
||||
}
|
||||
|
||||
// New creates a new PolicyEnforcer instance.
|
||||
func New(cfg map[string]string) (*PolicyEnforcer, error) {
|
||||
config, err := ParseConfig(cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("policyenforcer: config error: %w", err)
|
||||
}
|
||||
|
||||
evaluator, err := NewEvaluator(config.PolicyDir, config.PolicyFile, config.PolicyUrls, config.Query, config.RuntimeConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("policyenforcer: failed to initialize OPA evaluator: %w", err)
|
||||
}
|
||||
|
||||
log.Infof(context.TODO(), "PolicyEnforcer initialized (actions=%v, query=%s, policies=%v, debugLogging=%v)",
|
||||
config.Actions, config.Query, evaluator.ModuleNames(), config.DebugLogging)
|
||||
|
||||
return &PolicyEnforcer{
|
||||
config: config,
|
||||
evaluator: evaluator,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run implements the Step interface. It evaluates the message body against
|
||||
// loaded OPA policies. Returns a BadReqErr (causing NACK) if violations are found.
|
||||
// Returns an error on evaluation failure (fail closed).
|
||||
func (e *PolicyEnforcer) Run(ctx *model.StepContext) error {
|
||||
if !e.config.Enabled {
|
||||
log.Debug(ctx, "PolicyEnforcer: plugin disabled, skipping")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Extract action from the message
|
||||
action := extractAction(ctx.Request.URL.Path, ctx.Body)
|
||||
|
||||
if !e.config.IsActionEnabled(action) {
|
||||
if e.config.DebugLogging {
|
||||
log.Debugf(ctx, "PolicyEnforcer: action %q not in configured actions %v, skipping", action, e.config.Actions)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if e.config.DebugLogging {
|
||||
log.Debugf(ctx, "PolicyEnforcer: evaluating policies for action %q (modules=%v)", action, e.evaluator.ModuleNames())
|
||||
}
|
||||
|
||||
violations, err := e.evaluator.Evaluate(ctx, ctx.Body)
|
||||
if err != nil {
|
||||
// Fail closed: evaluation error → NACK
|
||||
log.Errorf(ctx, err, "PolicyEnforcer: policy evaluation failed: %v", err)
|
||||
return model.NewBadReqErr(fmt.Errorf("policy evaluation error: %w", err))
|
||||
}
|
||||
|
||||
if len(violations) == 0 {
|
||||
if e.config.DebugLogging {
|
||||
log.Debugf(ctx, "PolicyEnforcer: message compliant for action %q", action)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Non-compliant: NACK with all violation messages
|
||||
msg := fmt.Sprintf("policy violation(s): %s", strings.Join(violations, "; "))
|
||||
log.Warnf(ctx, "PolicyEnforcer: %s", msg)
|
||||
return model.NewBadReqErr(fmt.Errorf("%s", msg))
|
||||
}
|
||||
|
||||
// Close is a no-op for the policy enforcer (no resources to release).
|
||||
func (e *PolicyEnforcer) Close() {}
|
||||
|
||||
// extractAction gets the beckn action from the URL path or message body.
|
||||
func extractAction(urlPath string, body []byte) string {
|
||||
// Try URL path first: /bap/receiver/{action} or /bpp/caller/{action}
|
||||
parts := strings.Split(strings.Trim(urlPath, "/"), "/")
|
||||
if len(parts) >= 3 {
|
||||
return parts[len(parts)-1]
|
||||
}
|
||||
|
||||
// Fallback: extract from body context.action
|
||||
var payload struct {
|
||||
Context struct {
|
||||
Action string `json:"action"`
|
||||
} `json:"context"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &payload); err == nil && payload.Context.Action != "" {
|
||||
return payload.Context.Action
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
518
pkg/plugin/implementation/policyenforcer/enforcer_test.go
Normal file
518
pkg/plugin/implementation/policyenforcer/enforcer_test.go
Normal file
@@ -0,0 +1,518 @@
|
||||
package policyenforcer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/beckn-one/beckn-onix/pkg/model"
|
||||
)
|
||||
|
||||
// Helper: create a StepContext with the given action path and JSON body.
|
||||
func makeStepCtx(action string, body string) *model.StepContext {
|
||||
req, _ := http.NewRequest("POST", "/bpp/caller/"+action, nil)
|
||||
return &model.StepContext{
|
||||
Context: context.Background(),
|
||||
Request: req,
|
||||
Body: []byte(body),
|
||||
}
|
||||
}
|
||||
|
||||
// Helper: write a .rego file to a temp dir and return the dir path.
|
||||
func writePolicyDir(t *testing.T, filename, content string) string {
|
||||
t.Helper()
|
||||
dir := t.TempDir()
|
||||
err := os.WriteFile(filepath.Join(dir, filename), []byte(content), 0644)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to write policy file: %v", err)
|
||||
}
|
||||
return dir
|
||||
}
|
||||
|
||||
// --- Config Tests ---
|
||||
|
||||
func TestParseConfig_RequiresPolicySource(t *testing.T) {
|
||||
_, err := ParseConfig(map[string]string{})
|
||||
if err == nil {
|
||||
t.Fatal("expected error when no policyDir, policyFile, or policyUrls given")
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseConfig_Defaults(t *testing.T) {
|
||||
cfg, err := ParseConfig(map[string]string{"policyDir": "/tmp"})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if cfg.Query != "data.policy.violations" {
|
||||
t.Errorf("expected default query, got %q", cfg.Query)
|
||||
}
|
||||
if len(cfg.Actions) != 1 || cfg.Actions[0] != "confirm" {
|
||||
t.Errorf("expected default actions [confirm], got %v", cfg.Actions)
|
||||
}
|
||||
if !cfg.Enabled {
|
||||
t.Error("expected enabled=true by default")
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseConfig_RuntimeConfigForwarding(t *testing.T) {
|
||||
cfg, err := ParseConfig(map[string]string{
|
||||
"policyDir": "/tmp",
|
||||
"minDeliveryLeadHours": "6",
|
||||
"customParam": "value",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if cfg.RuntimeConfig["minDeliveryLeadHours"] != "6" {
|
||||
t.Errorf("expected minDeliveryLeadHours=6, got %q", cfg.RuntimeConfig["minDeliveryLeadHours"])
|
||||
}
|
||||
if cfg.RuntimeConfig["customParam"] != "value" {
|
||||
t.Errorf("expected customParam=value, got %q", cfg.RuntimeConfig["customParam"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseConfig_CustomActions(t *testing.T) {
|
||||
cfg, err := ParseConfig(map[string]string{
|
||||
"policyDir": "/tmp",
|
||||
"actions": "confirm, select, init",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if len(cfg.Actions) != 3 {
|
||||
t.Fatalf("expected 3 actions, got %d: %v", len(cfg.Actions), cfg.Actions)
|
||||
}
|
||||
expected := []string{"confirm", "select", "init"}
|
||||
for i, want := range expected {
|
||||
if cfg.Actions[i] != want {
|
||||
t.Errorf("action[%d] = %q, want %q", i, cfg.Actions[i], want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseConfig_PolicyUrls(t *testing.T) {
|
||||
cfg, err := ParseConfig(map[string]string{
|
||||
"policyUrls": "https://example.com/a.rego, https://example.com/b.rego",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if len(cfg.PolicyUrls) != 2 {
|
||||
t.Fatalf("expected 2 URLs, got %d: %v", len(cfg.PolicyUrls), cfg.PolicyUrls)
|
||||
}
|
||||
if cfg.PolicyUrls[0] != "https://example.com/a.rego" {
|
||||
t.Errorf("url[0] = %q", cfg.PolicyUrls[0])
|
||||
}
|
||||
}
|
||||
|
||||
// Note: policySources support was removed; we intentionally only support
|
||||
// comma-separated policyUrls and local paths via policyUrls entries.
|
||||
|
||||
// --- Evaluator Tests (with inline policies) ---
|
||||
|
||||
func TestEvaluator_NoViolations(t *testing.T) {
|
||||
policy := `
|
||||
package policy
|
||||
import rego.v1
|
||||
violations contains msg if {
|
||||
input.value < 0
|
||||
msg := "value is negative"
|
||||
}
|
||||
`
|
||||
dir := writePolicyDir(t, "test.rego", policy)
|
||||
eval, err := NewEvaluator(dir, "", nil, "data.policy.violations", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("NewEvaluator failed: %v", err)
|
||||
}
|
||||
|
||||
violations, err := eval.Evaluate(context.Background(), []byte(`{"value": 10}`))
|
||||
if err != nil {
|
||||
t.Fatalf("Evaluate failed: %v", err)
|
||||
}
|
||||
if len(violations) != 0 {
|
||||
t.Errorf("expected 0 violations, got %d: %v", len(violations), violations)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluator_WithViolation(t *testing.T) {
|
||||
policy := `
|
||||
package policy
|
||||
import rego.v1
|
||||
violations contains msg if {
|
||||
input.value < 0
|
||||
msg := "value is negative"
|
||||
}
|
||||
`
|
||||
dir := writePolicyDir(t, "test.rego", policy)
|
||||
eval, err := NewEvaluator(dir, "", nil, "data.policy.violations", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("NewEvaluator failed: %v", err)
|
||||
}
|
||||
|
||||
violations, err := eval.Evaluate(context.Background(), []byte(`{"value": -5}`))
|
||||
if err != nil {
|
||||
t.Fatalf("Evaluate failed: %v", err)
|
||||
}
|
||||
if len(violations) != 1 {
|
||||
t.Fatalf("expected 1 violation, got %d: %v", len(violations), violations)
|
||||
}
|
||||
if violations[0] != "value is negative" {
|
||||
t.Errorf("unexpected violation: %q", violations[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluator_RuntimeConfig(t *testing.T) {
|
||||
policy := `
|
||||
package policy
|
||||
import rego.v1
|
||||
violations contains msg if {
|
||||
input.value > to_number(data.config.maxValue)
|
||||
msg := "value exceeds maximum"
|
||||
}
|
||||
`
|
||||
dir := writePolicyDir(t, "test.rego", policy)
|
||||
eval, err := NewEvaluator(dir, "", nil, "data.policy.violations", map[string]string{"maxValue": "100"})
|
||||
if err != nil {
|
||||
t.Fatalf("NewEvaluator failed: %v", err)
|
||||
}
|
||||
|
||||
// Under limit
|
||||
violations, err := eval.Evaluate(context.Background(), []byte(`{"value": 50}`))
|
||||
if err != nil {
|
||||
t.Fatalf("Evaluate failed: %v", err)
|
||||
}
|
||||
if len(violations) != 0 {
|
||||
t.Errorf("expected 0 violations for value=50, got %v", violations)
|
||||
}
|
||||
|
||||
// Over limit
|
||||
violations, err = eval.Evaluate(context.Background(), []byte(`{"value": 150}`))
|
||||
if err != nil {
|
||||
t.Fatalf("Evaluate failed: %v", err)
|
||||
}
|
||||
if len(violations) != 1 {
|
||||
t.Errorf("expected 1 violation for value=150, got %v", violations)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluator_SkipsTestFiles(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
policy := `
|
||||
package policy
|
||||
import rego.v1
|
||||
violations contains "always" if { true }
|
||||
`
|
||||
os.WriteFile(filepath.Join(dir, "policy.rego"), []byte(policy), 0644)
|
||||
|
||||
// Test file would cause compilation issues if loaded (different package)
|
||||
testFile := `
|
||||
package policy_test
|
||||
import rego.v1
|
||||
import data.policy
|
||||
test_something if { count(policy.violations) > 0 }
|
||||
`
|
||||
os.WriteFile(filepath.Join(dir, "policy_test.rego"), []byte(testFile), 0644)
|
||||
|
||||
eval, err := NewEvaluator(dir, "", nil, "data.policy.violations", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("NewEvaluator should skip _test.rego files, but failed: %v", err)
|
||||
}
|
||||
|
||||
violations, err := eval.Evaluate(context.Background(), []byte(`{}`))
|
||||
if err != nil {
|
||||
t.Fatalf("Evaluate failed: %v", err)
|
||||
}
|
||||
if len(violations) != 1 {
|
||||
t.Errorf("expected 1 violation, got %d", len(violations))
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluator_InvalidJSON(t *testing.T) {
|
||||
policy := `
|
||||
package policy
|
||||
import rego.v1
|
||||
violations := set()
|
||||
`
|
||||
dir := writePolicyDir(t, "test.rego", policy)
|
||||
eval, err := NewEvaluator(dir, "", nil, "data.policy.violations", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("NewEvaluator failed: %v", err)
|
||||
}
|
||||
|
||||
_, err = eval.Evaluate(context.Background(), []byte(`not json`))
|
||||
if err == nil {
|
||||
t.Error("expected error for invalid JSON")
|
||||
}
|
||||
}
|
||||
|
||||
// --- Evaluator URL Fetch Tests ---
|
||||
|
||||
func TestEvaluator_FetchFromURL(t *testing.T) {
|
||||
policy := `
|
||||
package policy
|
||||
import rego.v1
|
||||
violations contains msg if {
|
||||
input.value < 0
|
||||
msg := "value is negative"
|
||||
}
|
||||
`
|
||||
// Serve the policy via a local HTTP server
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/plain")
|
||||
w.Write([]byte(policy))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
eval, err := NewEvaluator("", "", []string{srv.URL + "/test_policy.rego"}, "data.policy.violations", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("NewEvaluator with URL failed: %v", err)
|
||||
}
|
||||
|
||||
// Compliant
|
||||
violations, err := eval.Evaluate(context.Background(), []byte(`{"value": 10}`))
|
||||
if err != nil {
|
||||
t.Fatalf("Evaluate failed: %v", err)
|
||||
}
|
||||
if len(violations) != 0 {
|
||||
t.Errorf("expected 0 violations, got %v", violations)
|
||||
}
|
||||
|
||||
// Non-compliant
|
||||
violations, err = eval.Evaluate(context.Background(), []byte(`{"value": -1}`))
|
||||
if err != nil {
|
||||
t.Fatalf("Evaluate failed: %v", err)
|
||||
}
|
||||
if len(violations) != 1 {
|
||||
t.Errorf("expected 1 violation, got %v", violations)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluator_FetchURL_NotFound(t *testing.T) {
|
||||
srv := httptest.NewServer(http.NotFoundHandler())
|
||||
defer srv.Close()
|
||||
|
||||
_, err := NewEvaluator("", "", []string{srv.URL + "/missing.rego"}, "data.policy.violations", nil)
|
||||
if err == nil {
|
||||
t.Fatal("expected error for 404 URL")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluator_FetchURL_InvalidScheme(t *testing.T) {
|
||||
_, err := NewEvaluator("", "", []string{"ftp://example.com/policy.rego"}, "data.policy.violations", nil)
|
||||
if err == nil {
|
||||
t.Fatal("expected error for ftp:// scheme")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluator_MixedLocalAndURL(t *testing.T) {
|
||||
// Local policy
|
||||
localPolicy := `
|
||||
package policy
|
||||
import rego.v1
|
||||
violations contains "local_violation" if { input.local_bad }
|
||||
`
|
||||
dir := writePolicyDir(t, "local.rego", localPolicy)
|
||||
|
||||
// Remote policy (different rule, same package)
|
||||
remotePolicy := `
|
||||
package policy
|
||||
import rego.v1
|
||||
violations contains "remote_violation" if { input.remote_bad }
|
||||
`
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write([]byte(remotePolicy))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
eval, err := NewEvaluator(dir, "", []string{srv.URL + "/remote.rego"}, "data.policy.violations", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("NewEvaluator failed: %v", err)
|
||||
}
|
||||
|
||||
// Trigger both violations
|
||||
violations, err := eval.Evaluate(context.Background(), []byte(`{"local_bad": true, "remote_bad": true}`))
|
||||
if err != nil {
|
||||
t.Fatalf("Evaluate failed: %v", err)
|
||||
}
|
||||
if len(violations) != 2 {
|
||||
t.Errorf("expected 2 violations (local+remote), got %d: %v", len(violations), violations)
|
||||
}
|
||||
}
|
||||
|
||||
// --- Evaluator with local file path in policySources ---
|
||||
|
||||
func TestEvaluator_LocalFilePath(t *testing.T) {
|
||||
policy := `
|
||||
package policy
|
||||
import rego.v1
|
||||
violations contains "from_file" if { input.bad }
|
||||
`
|
||||
dir := t.TempDir()
|
||||
policyPath := filepath.Join(dir, "local_policy.rego")
|
||||
os.WriteFile(policyPath, []byte(policy), 0644)
|
||||
|
||||
eval, err := NewEvaluator("", "", []string{policyPath}, "data.policy.violations", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("NewEvaluator with local path failed: %v", err)
|
||||
}
|
||||
|
||||
violations, err := eval.Evaluate(context.Background(), []byte(`{"bad": true}`))
|
||||
if err != nil {
|
||||
t.Fatalf("Evaluate failed: %v", err)
|
||||
}
|
||||
if len(violations) != 1 || violations[0] != "from_file" {
|
||||
t.Errorf("expected [from_file], got %v", violations)
|
||||
}
|
||||
}
|
||||
|
||||
// --- Enforcer Integration Tests ---
|
||||
|
||||
func TestEnforcer_Compliant(t *testing.T) {
|
||||
policy := `
|
||||
package policy
|
||||
import rego.v1
|
||||
violations contains "blocked" if { input.context.action == "confirm"; input.block }
|
||||
`
|
||||
dir := writePolicyDir(t, "test.rego", policy)
|
||||
|
||||
enforcer, err := New(map[string]string{
|
||||
"policyDir": dir,
|
||||
"actions": "confirm",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New failed: %v", err)
|
||||
}
|
||||
|
||||
ctx := makeStepCtx("confirm", `{"context": {"action": "confirm"}, "block": false}`)
|
||||
err = enforcer.Run(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("expected nil error for compliant message, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnforcer_NonCompliant(t *testing.T) {
|
||||
policy := `
|
||||
package policy
|
||||
import rego.v1
|
||||
violations contains "blocked" if { input.context.action == "confirm" }
|
||||
`
|
||||
dir := writePolicyDir(t, "test.rego", policy)
|
||||
|
||||
enforcer, err := New(map[string]string{
|
||||
"policyDir": dir,
|
||||
"actions": "confirm",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New failed: %v", err)
|
||||
}
|
||||
|
||||
ctx := makeStepCtx("confirm", `{"context": {"action": "confirm"}}`)
|
||||
err = enforcer.Run(ctx)
|
||||
if err == nil {
|
||||
t.Fatal("expected error for non-compliant message, got nil")
|
||||
}
|
||||
|
||||
// Should be a BadReqErr
|
||||
if _, ok := err.(*model.BadReqErr); !ok {
|
||||
t.Errorf("expected *model.BadReqErr, got %T: %v", err, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnforcer_SkipsNonMatchingAction(t *testing.T) {
|
||||
policy := `
|
||||
package policy
|
||||
import rego.v1
|
||||
violations contains "blocked" if { true }
|
||||
`
|
||||
dir := writePolicyDir(t, "test.rego", policy)
|
||||
|
||||
enforcer, err := New(map[string]string{
|
||||
"policyDir": dir,
|
||||
"actions": "confirm",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New failed: %v", err)
|
||||
}
|
||||
|
||||
// Non-compliant body, but action is "search" — not in configured actions
|
||||
ctx := makeStepCtx("search", `{"context": {"action": "search"}}`)
|
||||
err = enforcer.Run(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("expected nil for non-matching action, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnforcer_DisabledPlugin(t *testing.T) {
|
||||
policy := `
|
||||
package policy
|
||||
import rego.v1
|
||||
violations contains "blocked" if { true }
|
||||
`
|
||||
dir := writePolicyDir(t, "test.rego", policy)
|
||||
|
||||
enforcer, err := New(map[string]string{
|
||||
"policyDir": dir,
|
||||
"enabled": "false",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New failed: %v", err)
|
||||
}
|
||||
|
||||
ctx := makeStepCtx("confirm", `{"context": {"action": "confirm"}}`)
|
||||
err = enforcer.Run(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("expected nil for disabled plugin, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// --- Enforcer with URL-sourced policy ---
|
||||
|
||||
func TestEnforcer_PolicyFromURL(t *testing.T) {
|
||||
policy := `
|
||||
package policy
|
||||
import rego.v1
|
||||
violations contains "blocked" if { input.context.action == "confirm" }
|
||||
`
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write([]byte(policy))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
enforcer, err := New(map[string]string{
|
||||
"policyUrls": srv.URL + "/block_confirm.rego",
|
||||
"actions": "confirm",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New failed: %v", err)
|
||||
}
|
||||
|
||||
ctx := makeStepCtx("confirm", `{"context": {"action": "confirm"}}`)
|
||||
err = enforcer.Run(ctx)
|
||||
if err == nil {
|
||||
t.Fatal("expected error from URL-sourced policy, got nil")
|
||||
}
|
||||
if _, ok := err.(*model.BadReqErr); !ok {
|
||||
t.Errorf("expected *model.BadReqErr, got %T", err)
|
||||
}
|
||||
}
|
||||
|
||||
// --- extractAction Tests ---
|
||||
|
||||
func TestExtractAction_FromURL(t *testing.T) {
|
||||
action := extractAction("/bpp/caller/confirm", nil)
|
||||
if action != "confirm" {
|
||||
t.Errorf("expected 'confirm', got %q", action)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractAction_FromBody(t *testing.T) {
|
||||
body := []byte(`{"context": {"action": "select"}}`)
|
||||
action := extractAction("/x", body)
|
||||
if action != "select" {
|
||||
t.Errorf("expected 'select', got %q", action)
|
||||
}
|
||||
}
|
||||
238
pkg/plugin/implementation/policyenforcer/evaluator.go
Normal file
238
pkg/plugin/implementation/policyenforcer/evaluator.go
Normal file
@@ -0,0 +1,238 @@
|
||||
package policyenforcer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/open-policy-agent/opa/v1/ast"
|
||||
"github.com/open-policy-agent/opa/v1/rego"
|
||||
"github.com/open-policy-agent/opa/v1/storage/inmem"
|
||||
)
|
||||
|
||||
// Evaluator wraps the OPA engine: loads and compiles .rego files at startup,
|
||||
// then evaluates messages against the compiled policy set.
|
||||
type Evaluator struct {
|
||||
preparedQuery rego.PreparedEvalQuery
|
||||
query string
|
||||
runtimeConfig map[string]string
|
||||
moduleNames []string // names of loaded .rego modules
|
||||
}
|
||||
|
||||
// ModuleNames returns the names of the loaded .rego policy modules.
|
||||
func (e *Evaluator) ModuleNames() []string {
|
||||
return e.moduleNames
|
||||
}
|
||||
|
||||
// policyFetchTimeout is the HTTP timeout for fetching remote .rego files.
|
||||
const policyFetchTimeout = 30 * time.Second
|
||||
|
||||
// maxPolicySize is the maximum size of a single .rego file fetched from a URL (1 MB).
|
||||
const maxPolicySize = 1 << 20
|
||||
|
||||
// NewEvaluator creates an Evaluator by loading .rego files from local paths
|
||||
// and/or URLs, then compiling them. runtimeConfig is passed to Rego as data.config.
|
||||
func NewEvaluator(policyDir, policyFile string, policyUrls []string, query string, runtimeConfig map[string]string) (*Evaluator, error) {
|
||||
modules := make(map[string]string)
|
||||
|
||||
// Load from local directory
|
||||
if policyDir != "" {
|
||||
entries, err := os.ReadDir(policyDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read policy directory %s: %w", policyDir, err)
|
||||
}
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
continue
|
||||
}
|
||||
if !strings.HasSuffix(entry.Name(), ".rego") {
|
||||
continue
|
||||
}
|
||||
// Skip test files — they shouldn't be compiled into the runtime evaluator
|
||||
if strings.HasSuffix(entry.Name(), "_test.rego") {
|
||||
continue
|
||||
}
|
||||
fpath := filepath.Join(policyDir, entry.Name())
|
||||
data, err := os.ReadFile(fpath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read policy file %s: %w", fpath, err)
|
||||
}
|
||||
modules[entry.Name()] = string(data)
|
||||
}
|
||||
}
|
||||
|
||||
// Load single local file
|
||||
if policyFile != "" {
|
||||
data, err := os.ReadFile(policyFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read policy file %s: %w", policyFile, err)
|
||||
}
|
||||
modules[filepath.Base(policyFile)] = string(data)
|
||||
}
|
||||
|
||||
// Load from URLs and local file paths (policyUrls)
|
||||
for _, rawSource := range policyUrls {
|
||||
if isURL(rawSource) {
|
||||
name, content, err := fetchPolicy(rawSource)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch policy from %s: %w", rawSource, err)
|
||||
}
|
||||
modules[name] = content
|
||||
} else {
|
||||
// Treat as local file path
|
||||
data, err := os.ReadFile(rawSource)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read local policy source %s: %w", rawSource, err)
|
||||
}
|
||||
modules[filepath.Base(rawSource)] = string(data)
|
||||
}
|
||||
}
|
||||
|
||||
if len(modules) == 0 {
|
||||
return nil, fmt.Errorf("no .rego policy files found from any configured source")
|
||||
}
|
||||
|
||||
// Compile modules to catch syntax errors early
|
||||
compiler, err := ast.CompileModulesWithOpt(modules, ast.CompileOpts{ParserOptions: ast.ParserOptions{RegoVersion: ast.RegoV1}})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to compile rego modules: %w", err)
|
||||
}
|
||||
|
||||
// Build data.config from runtime config
|
||||
store := map[string]interface{}{
|
||||
"config": toInterfaceMap(runtimeConfig),
|
||||
}
|
||||
|
||||
pq, err := rego.New(
|
||||
rego.Query(query),
|
||||
rego.Compiler(compiler),
|
||||
rego.Store(inmem.NewFromObject(store)),
|
||||
).PrepareForEval(context.Background())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to prepare rego query %q: %w", query, err)
|
||||
}
|
||||
|
||||
names := make([]string, 0, len(modules))
|
||||
for name := range modules {
|
||||
names = append(names, name)
|
||||
}
|
||||
|
||||
return &Evaluator{
|
||||
preparedQuery: pq,
|
||||
query: query,
|
||||
runtimeConfig: runtimeConfig,
|
||||
moduleNames: names,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// isURL checks if a source string looks like a remote URL.
|
||||
func isURL(source string) bool {
|
||||
return strings.HasPrefix(source, "http://") || strings.HasPrefix(source, "https://")
|
||||
}
|
||||
|
||||
// fetchPolicy downloads a .rego file from a URL and returns (filename, content, error).
|
||||
func fetchPolicy(rawURL string) (string, string, error) {
|
||||
parsed, err := url.Parse(rawURL)
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("invalid URL: %w", err)
|
||||
}
|
||||
|
||||
if parsed.Scheme != "http" && parsed.Scheme != "https" {
|
||||
return "", "", fmt.Errorf("unsupported URL scheme %q (only http and https are supported)", parsed.Scheme)
|
||||
}
|
||||
|
||||
client := &http.Client{Timeout: policyFetchTimeout}
|
||||
resp, err := client.Get(rawURL)
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("HTTP request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return "", "", fmt.Errorf("HTTP %d from %s", resp.StatusCode, rawURL)
|
||||
}
|
||||
|
||||
// Read with size limit
|
||||
limited := io.LimitReader(resp.Body, maxPolicySize+1)
|
||||
body, err := io.ReadAll(limited)
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("failed to read response body: %w", err)
|
||||
}
|
||||
if len(body) > maxPolicySize {
|
||||
return "", "", fmt.Errorf("policy file exceeds maximum size of %d bytes", maxPolicySize)
|
||||
}
|
||||
|
||||
// Derive filename from URL path
|
||||
name := path.Base(parsed.Path)
|
||||
if name == "" || name == "." || name == "/" {
|
||||
name = "policy.rego"
|
||||
}
|
||||
if !strings.HasSuffix(name, ".rego") {
|
||||
name += ".rego"
|
||||
}
|
||||
|
||||
return name, string(body), nil
|
||||
}
|
||||
|
||||
// Evaluate runs the compiled policy against a JSON message body.
|
||||
// Returns a list of violation strings (empty = compliant).
|
||||
func (e *Evaluator) Evaluate(ctx context.Context, body []byte) ([]string, error) {
|
||||
var input interface{}
|
||||
if err := json.Unmarshal(body, &input); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse message body as JSON: %w", err)
|
||||
}
|
||||
|
||||
rs, err := e.preparedQuery.Eval(ctx, rego.EvalInput(input))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("rego evaluation failed: %w", err)
|
||||
}
|
||||
|
||||
return extractViolations(rs)
|
||||
}
|
||||
|
||||
// extractViolations pulls string violations from the OPA result set.
|
||||
// The query is expected to return a set of strings.
|
||||
func extractViolations(rs rego.ResultSet) ([]string, error) {
|
||||
if len(rs) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var violations []string
|
||||
for _, result := range rs {
|
||||
for _, expr := range result.Expressions {
|
||||
switch v := expr.Value.(type) {
|
||||
case []interface{}:
|
||||
// Result is a list (from set)
|
||||
for _, item := range v {
|
||||
if s, ok := item.(string); ok {
|
||||
violations = append(violations, s)
|
||||
}
|
||||
}
|
||||
case map[string]interface{}:
|
||||
// OPA sometimes returns sets as maps with string keys
|
||||
for key := range v {
|
||||
violations = append(violations, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return violations, nil
|
||||
}
|
||||
|
||||
// toInterfaceMap converts map[string]string to map[string]interface{} for OPA store.
|
||||
func toInterfaceMap(m map[string]string) map[string]interface{} {
|
||||
result := make(map[string]interface{}, len(m))
|
||||
for k, v := range m {
|
||||
result[k] = v
|
||||
}
|
||||
return result
|
||||
}
|
||||
@@ -257,6 +257,23 @@ func (m *Manager) Step(ctx context.Context, cfg *Config) (definition.Step, error
|
||||
return step, error
|
||||
}
|
||||
|
||||
// PolicyEnforcer returns a PolicyEnforcer instance based on the provided configuration.
|
||||
// It registers a cleanup function for resource management.
|
||||
func (m *Manager) PolicyEnforcer(ctx context.Context, cfg *Config) (definition.PolicyEnforcer, error) {
|
||||
pp, err := provider[definition.PolicyEnforcerProvider](m.plugins, cfg.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load provider for %s: %w", cfg.ID, err)
|
||||
}
|
||||
enforcer, closer, err := pp.New(ctx, cfg.Config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if closer != nil {
|
||||
m.closers = append(m.closers, closer)
|
||||
}
|
||||
return enforcer, nil
|
||||
}
|
||||
|
||||
// Cache returns a Cache instance based on the provided configuration.
|
||||
// It registers a cleanup function for resource management.
|
||||
func (m *Manager) Cache(ctx context.Context, cfg *Config) (definition.Cache, error) {
|
||||
|
||||
Reference in New Issue
Block a user