GFQL Policy Hooks#
Policy hooks provide external control over GFQL query execution, enabling security, resource management, and usage tracking.
Quick Start#
from graphistry.compute.gfql.policy import PolicyException
def my_policy(context):
# Deny remote data loading for specific datasets
if context.get('is_remote'):
# For remote operations, current_ast is ASTRemoteGraph
ast = context.get('current_ast')
if hasattr(ast, 'dataset_id') and ast.dataset_id == 'forbidden':
raise PolicyException('preload', 'Access denied', code=403)
# Apply policy to query
g.gfql(query, policy={'preload': my_policy})
Policy Phases#
Policies are invoked at ten distinct phases:
- preload
Before data is loaded (local or remote). Can prevent data access.
- postload
After data is loaded. Can check size/content and deny further processing.
- prelet
Before
let()DAG execution starts. Can control entire DAG execution and validate DAG structure.- postlet
After
let()DAG execution completes (even on error). Can track DAG-level performance and enforce DAG-level policies.- prechain
Before chain operations execute. Can control entire chain execution and validate chain structure.
- postchain
After chain operations complete (even on error). Can track chain-level performance and enforce chain-level policies.
- preletbinding
Before each binding execution in
let()DAGs. Can control per-binding execution and validate dependencies.- postletbinding
After each binding execution (even on error). Can track binding performance and enforce per-binding policies.
- precall
Before method execution (hop, filter, etc.). Can control operations and validate parameters.
- postcall
After method execution. Can validate result size, track execution time, and log performance.
Context Fields#
The context dictionary passed to policy functions contains:
Always present:
phase: Current phase (‘preload’, ‘postload’, ‘prelet’, ‘postlet’, ‘prechain’, ‘postchain’, ‘precall’, ‘postcall’, ‘preletbinding’, ‘postletbinding’)hook: Hook name (same as phase, useful for shared handlers)_policy_depth: Internal recursion counter
Usually present:
query: Global/original query AST (None in call context)current_ast: Current sub-AST being executed (None in call context for method calls)query_type: Type of query (‘chain’, ‘dag’, ‘single’, ‘call’)
Phase-specific:
plottable: Graph instance (postload/precall/postcall phases)graph_stats: Data statistics as GraphStats TypedDict (postload/precall/postcall phases)call_op: Operation name (precall/postcall phases only)call_params: Operation parameters (precall/postcall phases only)execution_time: Method execution duration in seconds (postcall phase only)success: Execution success flag (postcall/postlet/postchain/postletbinding phases)error: Error message string (post* phases when success=False)error_type: Error type name (post* phases when success=False)
Binding-specific (preletbinding/postletbinding phases only):
binding_name: Name of the current binding being executedbinding_index: Execution order of this binding (0-indexed)total_bindings: Total number of bindings in the let expressionbinding_dependencies: List of binding names this binding depends onbinding_ast: The AST object being bound (the value in let({name: ast}))
Hierarchy/Tracing fields (all phases):
execution_depth: Nesting depth (0=query, 1=let/chain, 2=binding/op, 3=call)operation_path: Unique operation identifier like “query.dag.binding:hg.call:hypergraph”parent_operation: Parent operation path (for OpenTelemetry span relationships)
Context-specific:
is_remote: True for remote data operations (ASTRemoteGraph)engine: Current engine value when available
GraphStats Type#
The graph_stats field provides typed statistics:
from graphistry.compute.gfql.policy import GraphStats
# GraphStats is a TypedDict with:
# - nodes: int (number of nodes)
# - edges: int (number of edges)
# - node_bytes: int (memory usage)
# - edge_bytes: int (memory usage)
Examples#
Limit Data Size
def size_limit_policy(context):
if context['phase'] == 'postload':
stats = context.get('graph_stats', {})
if stats.get('nodes', 0) > 10000:
raise PolicyException(
'postload',
f"Too many nodes: {stats['nodes']}",
code=413
)
g.gfql(query, policy={'postload': size_limit_policy})
Control Operation Execution and Performance
def operation_control_policy(context):
if context['phase'] == 'precall':
# Validate operation parameters before execution
op = context.get('call_op', '')
params = context.get('call_params', {})
# Deny expensive operations
if op == 'hop' and params.get('hops', 0) > 3:
raise PolicyException(
'precall',
f"Too many hops: {params['hops']} > 3",
code=413
)
elif context['phase'] == 'postcall':
# Track execution performance
exec_time = context.get('execution_time', 0)
success = context.get('success', False)
if not success:
raise PolicyException(
'postcall',
'Operation failed',
code=500
)
# Log slow operations
if exec_time > 5.0: # 5 seconds
print(f"Slow operation detected: {exec_time:.2f}s")
# Validate result size
stats = context.get('graph_stats', {})
if stats.get('nodes', 0) > 50000:
raise PolicyException(
'postcall',
f"Result too large: {stats['nodes']} nodes",
code=413
)
g.gfql(query, policy={
'precall': operation_control_policy,
'postcall': operation_control_policy
})
Control Remote Access
def remote_access_policy(context):
if context.get('is_remote'):
# Check JWT token for remote operations
ast = context['current_ast']
if hasattr(ast, 'token') and not ast.token:
raise PolicyException(
'preload',
'Authentication required',
code=401
)
g.gfql(query, policy={'preload': remote_access_policy})
Per-Binding Control
def binding_policy(context):
# Control execution of specific bindings
if context['phase'] == 'preletbinding':
binding_name = context.get('binding_name')
deps = context.get('binding_dependencies', [])
# Deny bindings with too many dependencies
if len(deps) > 5:
raise PolicyException(
'preletbinding',
f"Binding '{binding_name}' has too many dependencies: {len(deps)}",
code=413
)
elif context['phase'] == 'postletbinding':
# Track binding performance
binding_name = context.get('binding_name')
success = context.get('success', False)
if not success:
error = context.get('error', 'Unknown error')
print(f"Binding '{binding_name}' failed: {error}")
from graphistry.compute.ast import ASTLet, n, call
dag = ASTLet({
'people': n({'type': 'person'}),
'orgs': n({'type': 'org'}),
'connections': call('hypergraph', {})
})
g.gfql(dag, policy={
'preletbinding': binding_policy,
'postletbinding': binding_policy
})
Track Usage
def create_usage_tracker():
stats = {'calls': 0, 'data_loaded': 0, 'execution_times': []}
def track(context):
if context['phase'] == 'precall':
stats['calls'] += 1
elif context['phase'] == 'postcall':
# Track execution performance
exec_time = context.get('execution_time', 0)
stats['execution_times'].append(exec_time)
elif context['phase'] == 'postload':
data = context.get('graph_stats', {})
stats['data_loaded'] += data.get('nodes', 0)
return track, stats
tracker, stats = create_usage_tracker()
g.gfql(query, policy={
'postload': tracker,
'precall': tracker,
'postcall': tracker
})
print(f"Usage: {stats}")
Shared Handler
def universal_policy(context):
hook = context['hook'] # Which hook fired
if hook == 'preload':
# Pre-execution checks
pass
elif hook == 'postload':
# Data validation
pass
elif hook == 'precall':
# Operation control and parameter validation
pass
elif hook == 'postcall':
# Performance tracking and result validation
pass
# Use same handler for all phases
g.gfql(query, policy={
'preload': universal_policy,
'postload': universal_policy,
'precall': universal_policy,
'postcall': universal_policy
})
Policy Shortcuts#
To reduce boilerplate in common patterns, GFQL policies support shortcuts that expand to multiple hooks automatically. This is especially useful for cross-cutting concerns like telemetry, authentication, and resource management.
Shortcuts Reference
Shortcut |
Expands To |
Use Case |
|---|---|---|
|
All 5 pre* hooks (preload, prelet, prechain, preletbinding, precall) |
OpenTelemetry span creation, authentication, pre-execution validation |
|
All 5 post* hooks (postload, postlet, postchain, postletbinding, postcall) |
OpenTelemetry span cleanup, resource cleanup, post-execution validation |
|
preload + postload |
Query-level hooks for data loading control |
|
prelet + postlet |
DAG-level hooks for let() execution control |
|
prechain + postchain |
Chain-level hooks for chain operation control |
|
preletbinding + postletbinding |
Binding-level hooks for per-binding control |
|
precall + postcall |
Operation-level hooks for method call control |
Before/After Comparison
Without shortcuts (10 keys):
# Traditional approach - verbose
policy = {
'preload': create_span,
'postload': end_span,
'prelet': create_span,
'postlet': end_span,
'prechain': create_span,
'postchain': end_span,
'preletbinding': create_span,
'postletbinding': end_span,
'precall': create_span,
'postcall': end_span
}
With shortcuts (2 keys):
# Shortcuts approach - concise
policy = {
'pre': create_span,
'post': end_span
}
Both are functionally equivalent and produce the same behavior.
Composition Behavior
When multiple shortcuts apply to the same hook, their handlers automatically compose:
from graphistry.compute.gfql.policy import expand_policy, debug_policy
def auth_check(ctx):
"""General authentication check"""
pass
def rate_limit(ctx):
"""Rate limiting for calls"""
pass
def validate_params(ctx):
"""Specific parameter validation"""
pass
policy = {
'pre': auth_check, # Applies to ALL pre* hooks
'call': rate_limit, # Applies to precall + postcall
'precall': validate_params # Applies only to precall
}
# At precall, handlers execute in order: auth_check → rate_limit → validate_params
# At postcall, handlers execute in reverse (LIFO): rate_limit → auth_check
Composition Order Rules
Pre hooks execute in forward order: general → scope → specific
Post hooks execute in reverse order (LIFO cleanup): specific → scope → general
This ensures proper setup/cleanup semantics (like try/finally blocks)
Multi-Policy Server Pattern
Shortcuts compose naturally for scenarios where multiple orthogonal policies need to be applied:
# Server scenario: telemetry + security + resource limits
policy = {
'pre': create_otel_span, # OpenTelemetry tracing
'post': end_otel_span, # Span cleanup
'postload': check_size_limits, # Resource limits after data load
'precall': validate_jwt_token # Security validation before operations
}
# This composes cleanly:
# - All pre* hooks get telemetry spans
# - postload gets both telemetry cleanup + size checking
# - precall gets telemetry + JWT validation
# - Other post* hooks get just telemetry cleanup
Debug Helper
Use debug_policy() to see how shortcuts expand:
from graphistry.compute.gfql.policy import debug_policy
policy = {
'pre': auth,
'call': rate_limit,
'precall': validate
}
# Show expansion and composition order
debug_policy(policy)
Output:
preload [auth (from 'pre')]
prelet [auth (from 'pre')]
prechain [auth (from 'pre')]
preletbinding [auth (from 'pre')]
precall [auth (from 'pre'), rate_limit (from 'call'), validate (from 'precall')]
postcall [rate_limit (from 'call'), auth (from 'pre')] ← reversed
postload [auth (from 'pre')]
postlet [auth (from 'pre')]
postchain [auth (from 'pre')]
postletbinding [auth (from 'pre')]
Backward Compatibility
Full hook names (like
'preload') still work and can be mixed with shortcutsShortcuts are entirely optional - use them only when they simplify your code
No performance overhead - expansion happens once per query
OpenTelemetry Example
Using shortcuts, OpenTelemetry span tracing reduces from 10 hook keys to just 2:
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
tracer = trace.get_tracer(__name__)
span_map = {} # operation_path → span
def create_span(ctx):
"""Start span in pre* hooks"""
# Get parent span using parent_operation
parent_span = span_map.get(ctx.get('parent_operation'))
# Create span with unique operation_path as name
span = tracer.start_span(
ctx['operation_path'],
parent=parent_span
)
# Add span attributes from context
span.set_attribute('execution_depth', ctx['execution_depth'])
span.set_attribute('query_type', ctx.get('query_type', 'unknown'))
if ctx.get('binding_name'):
span.set_attribute('binding_name', ctx['binding_name'])
if ctx.get('call_op'):
span.set_attribute('call_op', ctx['call_op'])
# Store span for children and post hook
span_map[ctx['operation_path']] = span
def end_span(ctx):
"""End span in post* hooks"""
span = span_map.pop(ctx['operation_path'], None)
if not span:
return
# Add result attributes
if ctx.get('graph_stats'):
stats = ctx['graph_stats']
span.set_attribute('nodes', stats.get('nodes', 0))
span.set_attribute('edges', stats.get('edges', 0))
# Handle errors
if not ctx.get('success', True):
span.set_status(
Status(StatusCode.ERROR, ctx.get('error', 'Unknown error'))
)
span.end()
# Apply to all hook phases using shortcuts (2 keys instead of 10!)
policy = {
'pre': create_span, # Expands to all 5 pre* hooks
'post': end_span # Expands to all 5 post* hooks
}
result = g.gfql(my_query, policy=policy)
This creates a proper span hierarchy matching the query execution tree, with each operation having a unique operation_path and correct parent relationships.
PolicyException#
Deny operations by raising PolicyException:
from graphistry.compute.gfql.policy import PolicyException
raise PolicyException(
phase='preload', # Which phase denied
reason='Forbidden', # Human-readable reason
code=403, # HTTP-like status code
**kwargs # Additional context
)
The exception can be enriched with additional fields for logging/debugging.
Thread Safety#
Policy execution is thread-safe with built-in recursion prevention. Policies are not invoked recursively when operations trigger internal queries (depth limit of 1).
Remote Data Loading#
Policies can control remote data operations (ASTRemoteGraph). When is_remote is True in the context, the operation involves loading data from a remote source:
def remote_data_policy(context):
# Check remote operations in preload phase
if context['phase'] == 'preload' and context.get('is_remote'):
ast = context.get('current_ast')
# For ASTRemoteGraph, check dataset_id
if hasattr(ast, 'dataset_id'):
if ast.dataset_id in banned_datasets:
raise PolicyException('preload', 'Dataset blocked')
# Check for JWT token
if hasattr(ast, 'token') and not validate_jwt(ast.token):
raise PolicyException('preload', 'Invalid token', code=401)
# Check size after remote data loads
elif context['phase'] == 'postload' and context.get('is_remote'):
stats = context.get('graph_stats', {})
if stats.get('nodes', 0) > remote_limit:
raise PolicyException('postload', 'Remote data too large')
Remote operations trigger both preload and postload hooks, allowing control before and after data transfer.
Query Types#
Policies work with different GFQL query patterns:
Chain queries - Sequential operations:
# query_type will be 'chain'
g.gfql([n(), e(), n()], policy=policy_dict)
DAG queries - Named bindings with dependencies:
# query_type will be 'dag'
g.gfql({'persons': n({'type': 'person'})}, policy=policy_dict)
Call operations - Method invocations:
# query_type will be 'call', precall and postcall phases triggered
from graphistry.compute.ast import call
g.gfql(call('hop', {'hops': 2}), policy={
'precall': my_precall_policy,
'postcall': my_postcall_policy
})
Each query type provides appropriate context to the policy for decision making.
Integration with Hub#
The policy system is designed for Graphistry Hub integration:
Hub creates policies based on user tier/permissions
Policies enforce resource limits and feature access
Usage tracking for billing/analytics
JWT token validation for remote operations
# Hub example
def create_tier_policy(tier='free'):
limits = {
'free': {'max_nodes': 1000},
'pro': {'max_nodes': 100000}
}
def policy(context):
if context['phase'] == 'postload':
stats = context.get('graph_stats', {})
if stats.get('nodes', 0) > limits[tier]['max_nodes']:
raise PolicyException(
'postload',
f'{tier} tier limit exceeded',
code=403
)
return policy
Advanced Topics#
Policy Composition
Combine multiple policies using composition patterns:
def compose_policies(*policies):
"""Compose multiple policies into one."""
def composed(context):
for policy in policies:
policy(context) # Each can raise PolicyException
return composed
# Use composed policy
combined = compose_policies(
size_limit_policy,
rate_limit_policy,
tier_policy
)
g.gfql(query, policy={'postload': combined})
Stateful Policies with Closures
Track state across multiple queries:
def create_rate_limiter(max_per_minute=60):
from collections import deque
from time import time
calls = deque()
def policy(context):
if context['phase'] == 'preload':
now = time()
# Remove calls older than 1 minute
while calls and calls[0] < now - 60:
calls.popleft()
if len(calls) >= max_per_minute:
raise PolicyException(
'preload',
'Rate limit exceeded',
code=429
)
calls.append(now)
return policy
Testing Policies
Test policies in isolation:
def test_policy():
# Create mock context
context = {
'phase': 'postload',
'graph_stats': {'nodes': 5000},
'_policy_depth': 0
}
# Test acceptance
my_policy(context) # Should not raise
# Test denial
context['graph_stats']['nodes'] = 50000
with pytest.raises(PolicyException) as exc:
my_policy(context)
assert exc.value.code == 413
Performance Considerations
Policies execute synchronously - keep them lightweight
Use caching for expensive validations
Consider async patterns for external calls (future enhancement)
Recursion prevention adds minimal overhead (depth limit of 1)
Debugging Policies
Use logging to debug policy decisions:
import logging
logger = logging.getLogger(__name__)
def debug_policy(context):
phase = context['phase']
logger.debug(f"Policy called: phase={phase}")
if phase == 'postload':
stats = context.get('graph_stats', {})
logger.debug(f"Graph stats: {stats}")
if stats.get('nodes', 0) > limit:
logger.warning(f"Denying: {stats['nodes']} > {limit}")
raise PolicyException(...)
logger.debug(f"Policy accepted in {phase}")
API Reference#
Main Interface
# Using full hook names
g.gfql(query, policy={
'preload': preload_function, # Optional
'postload': postload_function, # Optional
'prelet': prelet_function, # Optional
'postlet': postlet_function, # Optional
'prechain': prechain_function, # Optional
'postchain': postchain_function, # Optional
'preletbinding': preletbinding_function, # Optional
'postletbinding': postletbinding_function,# Optional
'precall': precall_function, # Optional
'postcall': postcall_function # Optional
})
# Or using shortcuts (expands to full hook names)
g.gfql(query, policy={
'pre': pre_function, # Expands to all pre* hooks
'post': post_function, # Expands to all post* hooks
'load': load_function, # Expands to preload + postload
'let': let_function, # Expands to prelet + postlet
'chain': chain_function, # Expands to prechain + postchain
'binding': binding_fn, # Expands to preletbinding + postletbinding
'call': call_function # Expands to precall + postcall
})
# Shortcuts can be mixed with full hook names
g.gfql(query, policy={
'pre': general_handler,
'postload': specific_size_check # Overrides 'post' for postload
})
Imports
from graphistry.compute.gfql.policy import (
PolicyException, # Exception class for denying operations
PolicyContext, # TypedDict for context parameter
GraphStats, # TypedDict for graph statistics
PolicyFunction, # Type alias for policy functions
PolicyDict, # Type alias for policy dictionary
expand_policy, # Expand shortcuts to full hook names (internal use)
debug_policy # Debug helper to visualize expansion
)
PolicyException Parameters
phase(str): Phase where denial occurred (‘preload’, ‘postload’, ‘prelet’, ‘postlet’, ‘prechain’, ‘postchain’, ‘preletbinding’, ‘postletbinding’, ‘precall’, ‘postcall’)reason(str): Human-readable explanationcode(int): HTTP-like status code (default: 403)query_type(str, optional): Type of query being executeddata_size(dict, optional): Graph statistics at time of denial
Common HTTP Status Codes
401: Unauthorized (authentication required)403: Forbidden (authenticated but not allowed)413: Payload too large (data size limit exceeded)429: Too many requests (rate limit exceeded)503: Service unavailable (resource constraints)