Streaming¶
Operonx is streaming-first. The classic ForOp / MapOp / WhileOp
classes were replaced by two patterns: generator ops and GraphOp.loop.
Per-yield dispatch¶
When a generator op yields, the scheduler treats each yield as an
independent frame and dispatches downstream ops in parallel — not in
a serial loop:
sequenceDiagram
autonumber
participant S as Scheduler
participant G as each_item (gen)
participant D1 as double #1
participant D2 as double #2
participant D3 as double #3
participant E as END
S->>G: dispatch (items=[1,2,3])
par per yield
G-->>S: Frame(value=1)
S->>D1: dispatch (value=1)
and
G-->>S: Frame(value=2)
S->>D2: dispatch (value=2)
and
G-->>S: Frame(value=3)
S->>D3: dispatch (value=3)
end
G-->>S: EOF
par per result
D1-->>S: Frame(result=2)
D2-->>S: Frame(result=4)
D3-->>S: Frame(result=6)
end
S-->>E: collected results
G doesn't wait for D1 to finish before yielding the second item;
the scheduler picks frames off G as fast as G can emit them, and
each downstream double runs concurrently. Concurrency is bounded by
the graph's max_stream_concurrent (per-op semaphore) and the
runtime's tokio / asyncio thread pools.
If you want collected-list semantics — wait for all yields, then run
the next op once — apply Ref.collect() on the consumer's input:
Generator ops¶
Use yield inside an @op to iterate. Downstream ops run in parallel per
yield by default (streaming scheduler).
from operonx.core import GraphOp, op, START, END, PARENT
@op
def each_item(items: list):
for item in items:
yield {"value": item}
@op
def double(value: int):
return {"result": value * 2}
with GraphOp(name="iterate") as graph:
gen = each_item(items=PARENT["numbers"])
step = double(value=gen["value"])
START >> gen >> step >> END
For numbers = [1, 2, 3], each_item yields three frames; double runs
three times, in parallel — not in a serial for-loop.
If you need ordered output, collect downstream into a list op or use the ordered-collect helper (see API reference).
Loops with GraphOp.loop¶
For feedback loops where the iteration depends on the previous frame's state, use the loop builder:
with GraphOp.loop(until="count >= 5", count=0) as loop:
inc = increment(counter=PARENT["count"])
inc["counter"] >> PARENT["count"]
START >> inc >> END
The until= expression is evaluated against the current loop state after
each iteration. inc["counter"] >> PARENT["count"] writes the new value
back to the loop state for the next iteration to read.
Frame consumption¶
Outside the engine, engine.run(...) returns the final state. To consume
frames as they're emitted, use the streaming entry point:
A frame carries the op name, the outputs dict, span context, and timing. This is what tracers consume internally.
Performance notes¶
- Generator ops are the default unit of fan-out. Prefer them over manual asyncio.gather patterns.
- Loops have a small per-iteration overhead from state propagation. For
tight numeric loops, write the loop inside a single op instead of using
GraphOp.loop.