Comprehensive Examples
This page demonstrates Padas Domain Language (PDL) usage through input → PDL → output walkthroughs. JSON shown as expected output is illustrative and non-normative (field order, envelope metadata, and numeric details may differ by deployment). For exact syntax, operator semantics, wildcard rules, window validation, and runtime behavior, use Reference; for a compact operator and command list, use Quick Reference.
Table of Contents
- Example format
- Syntax recap
- Single-event examples
- Parse examples
- Aggregation examples
- End-to-end pipelines
- Real-world scenarios
- Advanced patterns
- Production considerations
- Running examples
Example format
Unless stated otherwise, each worked example uses:
| Block | Content |
|---|---|
| Purpose | What the snippet demonstrates. |
| Input | One JSON event, or an array of events for windowed aggregation. |
| PDL | Query, pipeline stage list, or aggregation clause. |
| Expected output | Representative JSON after the stage (or aggregate emit). |
| Notes | Window boundaries, group_key shape, engine version caveats. |
Grouped aggregate rows follow flat emit shape with group_key (see Reference → Aggregations and Glossary → Aggregation).
Syntax recap
Minimal reminders only—not a substitute for Quick Reference or Reference.
Query filters
status = "active" AND score > 80
(user_id = 123 OR user_id = 456) AND NOT deleted = true
action IN ["login", "logout"]
email ~= ".*@example\\.com$"
tags ?= "urgent"
?= on arrays checks membership (example: tags ?= "urgent"). For string matching on email domains, prefer ~= as above.
Transformations
eval total = price * qty, tier = if(total > 100, "high", "low")
rename user_id AS id
fields id, total, tier
output result type=string
Parsing
parse_kv raw
parse_json envelope
parse_csv line header="ts,user,evt"
Aggregations
count AS n timespan=5m
sum(amount) AS rev timespan=1h group_by tenant_id
avg(latency) AS avg_ms timespan=1m group_by svc where ok = true
Partitioning
partition_by user_id
partition_by tenant_id, user_id
parse_json body | partition_by user_id | count AS c timespan=5m group_by user_id
Single-event examples
Purpose
Retain events where status is active and score exceeds 80.
Input
{
"user_id": 123,
"status": "active",
"score": 85,
"ts": 1704067300000
}
PDL
status = "active" AND score > 80
Expected output
{
"user_id": 123,
"status": "active",
"score": 85,
"ts": 1704067300000
}
Notes
A non-matching event is dropped for this branch (no output row).
Purpose
Combine numeric threshold with deterministic string match on email using regex (avoid ?= on strings for domain checks).
Input
{
"user_id": 456,
"email": "user@example.com",
"login_count": 5
}
PDL
login_count > 3 AND email ~= ".*@example\\.com$"
Expected output
{
"user_id": 456,
"email": "user@example.com",
"login_count": 5
}
Notes
Escape . in the pattern as needed for your regex engine.
Purpose
Array membership with ?=.
Input
{
"event_id": "e1",
"tags": ["prod", "security", "urgent"]
}
PDL
tags ?= "urgent"
Expected output
{
"event_id": "e1",
"tags": ["prod", "security", "urgent"]
}
Purpose
eval adds derived fields; prior keys remain unless trimmed.
Input
{
"first_name": "John",
"last_name": "Doe",
"age": 30
}
PDL
eval full_name = first_name + " " + last_name, age_group = if(age >= 18, "adult", "minor")
Expected output
{
"first_name": "John",
"last_name": "Doe",
"age": 30,
"full_name": "John Doe",
"age_group": "adult"
}
Purpose
rename then project with fields.
Input
{
"user_id": 1,
"user_name": "jdoe",
"user_email": "jdoe@example.com"
}
PDL
rename user_id AS id, user_name AS username, user_email AS email | fields id, username
Expected output
{
"id": 1,
"username": "jdoe"
}
Parse examples
Purpose
parse_csv without header yields generic column names (field1, …).
Input
{
"line": "2024-01-01,user123,login,success"
}
PDL
parse_csv line
Expected output
{
"line": "2024-01-01,user123,login,success",
"field1": "2024-01-01",
"field2": "user123",
"field3": "login",
"field4": "success"
}
Notes
Supply header= when you want stable names (Reference / Quick Reference).
Purpose
parse_kv on space-separated key=value tokens.
Input
{
"raw": "user=john action=login ts=1704067300"
}
PDL
parse_kv raw
Expected output
{
"raw": "user=john action=login ts=1704067300",
"user": "john",
"action": "login",
"ts": "1704067300"
}
Purpose
parse_json merges object keys from a string field.
Input
{
"payload": "{\"user_id\": 123, \"name\": \"John\", \"active\": true}"
}
PDL
parse_json payload
Expected output
{
"payload": "{\"user_id\": 123, \"name\": \"John\", \"active\": true}",
"user_id": 123,
"name": "John",
"active": true
}
Purpose
parse_regex with named captures ((?P<name>…)).
Input
{
"log_line": "2024-01-20 14:30:25 [ERROR] connection failed host=db-01"
}
PDL
parse_regex log_line "(?P<ts>[^\\s]+ [^\\s]+) \\[(?P<level>[^\\]]+)\\] (?P<msg>.+) host=(?P<host>\\S+)"
Expected output
{
"log_line": "2024-01-20 14:30:25 [ERROR] connection failed host=db-01",
"ts": "2024-01-20 14:30:25",
"level": "ERROR",
"msg": "connection failed",
"host": "db-01"
}
Notes
Tune the pattern to your log format; catastrophic backtracking is a runtime risk on hot paths.
Aggregation examples
Assume all sample rows fall in the same window [window_start, window_end) for illustration.
Purpose
count only, no group_by — single flat row.
Input
[
{"user_id": 123, "ok": true},
{"user_id": 456, "ok": true},
{"user_id": 789, "ok": false}
]
PDL
count AS total_events timespan=5m
Expected output
{
"window_start": 1704067200000,
"window_end": 1704067500000,
"total_events": 3
}
Purpose
Multiple reducers, one window, no group_by.
Input
[
{"score": 85},
{"score": 92},
{"score": 45}
]
PDL
count AS total, avg(score) AS avg_score, max(score) AS highest timespan=5m
Expected output
{
"window_start": 1704067200000,
"window_end": 1704067500000,
"total": 3,
"avg_score": 74.0,
"highest": 92
}
Purpose
group_by — one row per key; group_key string matches group field value (single dimension).
Input
[
{"user_id": 123, "score": 85, "department": "engineering"},
{"user_id": 456, "score": 92, "department": "engineering"},
{"user_id": 789, "score": 45, "department": "sales"}
]
PDL
count AS dept_count, avg(score) AS dept_avg timespan=5m group_by department
Expected output
[
{
"group_key": "engineering",
"window_start": 1704067200000,
"window_end": 1704067500000,
"dept_count": 2,
"dept_avg": 88.5
},
{
"group_key": "sales",
"window_start": 1704067200000,
"window_end": 1704067500000,
"dept_count": 1,
"dept_avg": 45.0
}
]
Notes
Multi-group emissions are often represented as a JSON array at the API boundary; tasks may fan out one sink record per element (Reference).
Purpose
Composite group_key when group_by lists multiple fields (order preserved; key segments joined with |).
Input
[
{"tenant": "t1", "region": "us", "bytes": 100},
{"tenant": "t1", "region": "us", "bytes": 200},
{"tenant": "t1", "region": "eu", "bytes": 50}
]
PDL
sum(bytes) AS b timespan=5m group_by tenant, region
Expected output
[
{
"group_key": "t1|us",
"window_start": 1704067200000,
"window_end": 1704067500000,
"b": 300
},
{
"group_key": "t1|eu",
"window_start": 1704067200000,
"window_end": 1704067500000,
"b": 50
}
]
Purpose
collect embeds matching raw events in the aggregate row.
Input
[
{"user_id": 123, "score": 85},
{"user_id": 456, "score": 92},
{"user_id": 789, "score": 45}
]
PDL
collect(score > 80) AS high_scores timespan=5m
Expected output
{
"window_start": 1704067200000,
"window_end": 1704067500000,
"high_scores": [
{"user_id": 123, "score": 85},
{"user_id": 456, "score": 92}
]
}
Notes
collect increases memory vs scalar reducers; cap volume in production designs.
End-to-end pipelines
Purpose
parse_kv → eval → query gate.
Input
{
"raw_log": "user=john score=85 status=active ts=1704067300"
}
PDL
parse_kv raw_log | eval score_num = to_number(score) | score_num > 80
Expected output
{
"raw_log": "user=john score=85 status=active ts=1704067300",
"user": "john",
"score": "85",
"status": "active",
"ts": "1704067300",
"score_num": 85
}
Purpose
Filter high scores, then aggregate per department with group_key.
Input
[
{"user_id": 123, "score": 85, "department": "engineering"},
{"user_id": 456, "score": 92, "department": "engineering"},
{"user_id": 789, "score": 45, "department": "sales"}
]
PDL
score > 80 | count AS high_scores, collect(score > 80) AS top_performers timespan=5m group_by department
Expected output
[
{
"group_key": "engineering",
"window_start": 1704067200000,
"window_end": 1704067500000,
"high_scores": 2,
"top_performers": [
{"user_id": 123, "score": 85, "department": "engineering"},
{"user_id": 456, "score": 92, "department": "engineering"}
]
}
]
Notes
sales emits only if any row in that group passes score > 80 before the aggregate; otherwise that group may be absent from the array.
Real-world scenarios
Purpose
Chained eval on a nested order payload (indices are a fixed-size illustration, not a generic cart fold).
Input
{
"order": {
"id": "ORD-1",
"items": [
{"price": 999.99, "quantity": 1},
{"price": 29.99, "quantity": 2}
],
"discount_code": "SAVE10",
"shipping_address": {"state": "CA"}
}
}
PDL
eval item_total = order.items[0].price * order.items[0].quantity + order.items[1].price * order.items[1].quantity | eval discount_amount = if(order.discount_code = "SAVE10", item_total * 0.1, 0) | eval final_total = item_total - discount_amount | eval tax = if(order.shipping_address.state = "CA", final_total * 0.08, 0) | eval order_total = final_total + tax
Expected output
{
"item_total": 1059.97,
"discount_amount": 105.997,
"final_total": 953.973,
"tax": 76.31784,
"order_total": 1030.29084
}
Notes
Production pipelines usually unnest line items upstream instead of hard-coding indices.
Purpose
Structured log line → fields → alert label using nested metrics.
Input
{
"log_entry": "2024-01-20 14:30:25 [ERROR] Database connection failed",
"metrics": {"cpu_usage": 85.5, "memory_usage": 78.2}
}
PDL
parse_regex log_entry "(?P<ts>\\S+ \\S+) \\[(?P<level>[^\\]]+)\\] (?P<message>.+)" | eval high_cpu = metrics.cpu_usage > 80 | eval high_memory = metrics.memory_usage > 75 | eval system_stress = high_cpu OR high_memory | eval alert_level = case(level = "ERROR" AND system_stress, "critical", level = "ERROR", "warning", "info")
Expected output
{
"ts": "2024-01-20 14:30:25",
"level": "ERROR",
"message": "Database connection failed",
"high_cpu": true,
"high_memory": true,
"system_stress": true,
"alert_level": "critical"
}
Purpose
Flat clickstream rows: windowed counts per action (valid group_by on scalar fields).
Input
[
{"user_id": 123, "action": "view", "page": "/products", "ts": 1},
{"user_id": 123, "action": "click", "page": "/products", "ts": 2},
{"user_id": 123, "action": "purchase", "page": "/checkout", "ts": 3}
]
PDL
count AS n timespan=1h group_by action
Expected output
[
{"group_key": "view", "window_start": 0, "window_end": 3600000, "n": 1},
{"group_key": "click", "window_start": 0, "window_end": 3600000, "n": 1},
{"group_key": "purchase", "window_start": 0, "window_end": 3600000, "n": 1}
]
Notes
window_start / window_end here are placeholders; real values come from event-time assignment in the engine.
Purpose
API-style flat events: request count and average latency per endpoint in one windowed statement.
Input
[
{"endpoint": "/api/users", "status_code": 200, "latency_ms": 150},
{"endpoint": "/api/users", "status_code": 500, "latency_ms": 5000},
{"endpoint": "/api/orders", "status_code": 201, "latency_ms": 300}
]
PDL
count AS requests, avg(latency_ms) AS avg_latency_ms timespan=1m group_by endpoint
Expected output
[
{
"group_key": "/api/users",
"window_start": 1704067200000,
"window_end": 1704067260000,
"requests": 2,
"avg_latency_ms": 2575.0
},
{
"group_key": "/api/orders",
"window_start": 1704067200000,
"window_end": 1704067260000,
"requests": 1,
"avg_latency_ms": 300.0
}
]
Notes
Add a separate where status_code >= 400 aggregate or per-event eval if you need error-only metrics (Reference).
Advanced patterns
Purpose
Partition key then aggregate with rekey=true for aligned downstream routing.
Input
[
{"user_id": "u1", "amount": 10},
{"user_id": "u1", "amount": 20}
]
PDL
partition_by user_id | sum(amount) AS total timespan=1h group_by user_id rekey=true
Expected output
{
"group_key": "u1",
"window_start": 1704067200000,
"window_end": 1711843200000,
"total": 30
}
Notes
rekey behavior and routing contract: Reference.
Purpose
first / last (processing order) vs earliest / latest (envelope time)—see Reference for definitions.
PDL
count AS n, first(phase) AS first_phase, last(phase) AS last_phase timespan=5m group_by job_id
earliest(message) AS first_msg, latest(message) AS last_msg timespan=5m group_by correlation_id
Notes
Envelope timestamps must be consistent with window assignment for earliest/latest to match operator expectations.
Production considerations
- Filter early with cheap queries before
parse_*when predicates do not depend on parsed fields. - Project early with
fieldsto drop large blobs beforeevalor aggregations. - Bound
~=andparse_regexpatterns to reduce backtracking cost. - Size
timespan,slide, andgroup_bycardinality to control state and emit rate; avoid largecollectwindows without caps. - Guard divisions with
if; usecoalescefor optional paths. - Validate PDL with the same
padas-pdl/ Core version as production.
Running examples
Embed the padas-pdl crate (layout varies by repository).
use padas_pdl::{NewPdlProcessor, PdlInput};
use serde_json::json;
let mut processor = NewPdlProcessor::new();
let query_input = PdlInput::json(json!({"status": "active", "score": 85}));
let _ = processor.process_query_with_input(r#"status = "active" AND score > 80"#, &query_input)?;
let pipe_input = PdlInput::json(json!({"raw": "user=u1 score=85 status=active"}));
let _ = processor.process_pipeline_with_input(
r#"parse_kv raw | eval n = to_number(score) | n > 80"#,
&pipe_input,
)?;
See the examples/ directory in padas-pdl for runnable snippets.