operonx.core.ops¶
Op base classes, flow primitives, and graph composition.
ops
¶
Core op types and markers for the Operon workflow engine.
BaseOp— base class for all workflow opsDummyOp— placeholder for START/END markersGraphOp— container managing a sub-graph with parallel executionBranchOp— conditional routing with precompiled conditionsFuncOp— wraps a Python function (supports generators for streaming)ParserOp— extracts structured data from text (XML/JSON/etc.)
Markers: START, END, PARENT, PENDING.
Decorators: op, graph, if_.
Attributes¶
OpType
module-attribute
¶
OpType = Literal[
"data",
"llm",
"embedding",
"rerank",
"branch",
"for",
"while",
"stream",
"code",
"lambda",
"parser",
"prompt",
"doc-processor",
"milvus",
"mongo",
"s3",
"graph",
"default",
"dummy",
"tool-executor",
"mcp",
]
Các loại node được hỗ trợ trong workflow graph.
Classes¶
EOF
dataclass
¶
Marker that an op's async generator has exhausted.
Created by Scheduler._pump() after op.run() stops yielding
(i.e. the underlying function returned or its generator was exhausted).
User code never yields EOF — it is emitted implicitly when the op finishes.
Frame
dataclass
¶
One result yielded by an op during execution.
Created by Scheduler._pump() for every (ctx, result) tuple that
op.run() yields. User code never constructs Frame directly — just
return or yield from an @op function.
Interrupt
dataclass
¶
In-band scheduler cancellation event.
Returned/yielded by user op bodies to cancel queued frames + in-flight
tasks at ctx_to_cancel (and its descendants). op and ctx
record the emitter for tracing; ctx_to_cancel is the explicit target
— typically the prior turn's ctx, stored in SCRATCH when long-running
work began.
The scheduler
- Drops Frame/EOF items at ctx_to_cancel from the queue.
- Cancels in-flight
_pumptasks at ctx_to_cancel and descendants (skipping the emitter to avoid self-cancel). - Clears bookkeeping (ready/seq_active/seq_origins/collect_bufs).
- Forwards a synthetic
("__interrupt__", emitter_ctx, {...})tuple tooutput_queuesoExecutionHandleconsumers see it.
Best-effort: data already pushed to consumer-owned queues (e.g. a
user-supplied asyncio.Queue) is NOT drained — consumer must handle
that itself (see plan §4.6a).
BaseOp
¶
BaseOp(
id: str = None,
name: str = None,
description: str = "",
inputs: Dict[str, Any] = None,
outputs: Dict[str, Any] = None,
sources: List[str] = None,
targets: List[str] = None,
stream: bool = False,
start: bool = False,
end: bool = False,
contain_generation: bool = False,
verbose: bool = True,
enabled: bool = True,
executor: Optional[str] = None,
bound: Optional[str] = None,
cache: Union[bool, str, None] = None,
delay: float = 0,
)
Bases: ABC
Base class for all ops in a workflow.
An op is the fundamental processing unit. Each op declares typed inputs
and outputs (via Param), and implements a core() method that
contains the execution logic. Ops are wired together inside a GraphOp
using edge operators.
Sections (read top-to-bottom)::
1. INIT __init__, __slots__, param helpers
2. EDGE OPERATORS >>, >>~, >, <, [], ~ — wiring ops in a graph
3. EXECUTE run(), get_inputs/outputs, store_result, _exec_core
4. OBSERVABILITY _log(), _store_metrics()
5. SERIALIZATION serialize(), metadata — for Rust backend & tracing
Example::
from operonx.core import GraphOp, op, START, END, PARENT
@op
def double(x: int):
return {"result": x * 2}
with GraphOp(name="main") as graph:
d = double(x=PARENT["x"])
START >> d >> END
Source code in operonx/core/ops/base.py
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | |
Attributes¶
full_name
property
¶
Fully-qualified hierarchical path of this op. Cached after build().
specific_metadata
property
¶
Return subclass-specific metadata. Override in subclasses.
Functions¶
warmup
¶
Called by Operon engine after graph.build() when a ResourceHub is available.
Override in provider ops (LLMOp, EmbeddingOp, etc.) to eagerly initialize backends and eliminate cold-start latency on the first user request.
The default implementation is a no-op — subclasses opt in by overriding.
Source code in operonx/core/ops/base.py
get_inputs
¶
Retrieve input values from state based on connection mappings.
Uses cached cell indices to avoid per-call schema.get_index() lookups. Falls back to standard path on first call to build the cache.
Source code in operonx/core/ops/base.py
get_outputs
¶
Read output values from state.
Reads directly from this op's output variables. Output connections (outputs={...}) are resolved by the schema at build time — they create refs at the destination, not at this op.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
MemoryState
|
Workflow state. |
required |
context_id
|
str
|
Context of this op. |
required |
Source code in operonx/core/ops/base.py
normalize_trace_io
¶
Produce a trace-time view of this op's I/O.
Called by _extract_trace_io before media extraction. Subclasses
override when their I/O carries media in a non-Media shape (e.g.
LLMOp wraps OpenAI chat-format image_url blocks into Media
instances). The real state value is untouched — this returns copies
used only for trace capture.
Default is identity: most ops never override.
Source code in operonx/core/ops/base.py
store_result
¶
Store result dict into state.
Uses state[op, var, ctx] = value for O(1) index-based storage. Extracts $tags special key for dynamic tagging.
Source code in operonx/core/ops/base.py
save_all_caches
staticmethod
¶
Save all file-backed caches. Returns total entries saved.
Source code in operonx/core/ops/base.py
run
async
¶
run(
state: MemoryState, context_id: Optional[str] = None
) -> AsyncGenerator[tuple[Optional[str], Dict[str, Any]], None]
Execute this op as a uniform async generator.
Every op — whether it uses return or yield — is driven through
the same three-layer model:
- User function (
@op): plainreturn {"k": v}oryield {"k": v}. No awareness of Frame/EOF. BaseOp.run()(this method): wraps the user function via_exec_core()into a uniform async generator that yields(context_id, result)tuples. Normal op → one yield. Generator op → N yields, each in its own stream context[i].Scheduler._pump(): consumes this generator. Each yield becomes aFrameevent on the queue; when the generator exhausts naturally,_pumpemits oneEOFevent. The user never writes Frame or EOF.
Yields:
| Type | Description |
|---|---|
AsyncGenerator[tuple[Optional[str], Dict[str, Any]], None]
|
|
Source code in operonx/core/ops/base.py
680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 | |
serialize
¶
Serialize this op to a config dict for the Rust backend.
Source code in operonx/core/ops/base.py
DummyOp
¶
Bases: BaseOp
Sentinel op used as START, END, and PARENT markers.
Source code in operonx/core/ops/_edges.py
Functions¶
shared
¶
Declare shared vars on current graph. Only valid on PARENT.
Shared vars persist across all stream contexts within the graph. Normal PARENT vars are copied per stream context.
Usage::
@graph
def pipeline():
PARENT.shared(current_state="REMINDER", history=[])
# PARENT["current_state"] now shared across all stream contexts
Source code in operonx/core/ops/_edges.py
ScratchAccessor
¶
Dict-like accessor for per-call scratch space.
- Inside an op body (ContextVar bound): reads/writes the live scratch dict on the current MemoryState.
- At graph-construction time (ContextVar unbound):
__getitem__returns aScratchRefmarker, post-resolved byBaseOp.get_inputs().__setitem__raises — write-outside-run is a programming error.
SoftEdge
¶
Branch
¶
Fluent builder for creating a BranchOp.
Example::
router = (if_(PARENT["score"] >= 90, "excellent")
.if_(PARENT["score"] >= 70, "good")
.else_("fail"))
Initialise the builder.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
Optional[str]
|
Op name. If None, auto-inferred from the variable name. |
None
|
Source code in operonx/core/ops/flow/branch_op.py
Functions¶
if_
¶
Add a condition–target case.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
condition
|
Ref
|
Ref with comparison (e.g., |
required |
target
|
Union[str, BaseOp]
|
Target op or op name. |
required |
Returns:
| Type | Description |
|---|---|
Branch
|
self for chaining. |
Source code in operonx/core/ops/flow/branch_op.py
else_
¶
Set default target and build the BranchOp.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target
|
Union[str, BaseOp]
|
Fallback target when no condition matches. |
required |
Returns:
| Type | Description |
|---|---|
BranchOp
|
The constructed BranchOp. |
Source code in operonx/core/ops/flow/branch_op.py
BranchOp
¶
BranchOp(
cases: Optional[List[Tuple[Ref, str]]] = None,
candidates: Optional[List[str]] = None,
default: Optional[str] = None,
inputs: Dict[str, Any] = None,
outputs: Dict[str, Any] = None,
**kwargs,
)
Bases: BaseOp
Op that evaluates conditions and routes execution to different targets.
Conditions are Ref objects with comparison operators. The first matching
condition determines the target. An optional anchor input overrides all
conditions. Use soft edges (>>~) to connect branch targets to a merge op.
Inputs
anchor (str, optional): Hard-coded target name that overrides conditions. (any): Variables referenced in condition Refs (auto-extracted).
Outputs
target (str): Name of the selected target op. matched (str): Description of which condition matched.
Example::
router = if_(PARENT["score"] >= 90, "excellent").else_("fail")
START >> router >> ~excellent >> merge >> END
router >> ~fail >> merge
Source code in operonx/core/ops/flow/branch_op.py
Attributes¶
Functions¶
get_target
¶
serialize
¶
Serialize branch op with conditions for Rust backend.
Source code in operonx/core/ops/flow/branch_op.py
GraphOp
¶
Bases: BaseOp
Container op that holds and executes a directed graph of child ops.
Lifecycle::
1. DEFINE with GraphOp(name="wf") as g:
a = double(x=PARENT["x"])
b = add(a=a["result"], b=PARENT["y"])
START >> a >> b >> END
Ops auto-register via context manager. Edges via >> operator.
Inputs/outputs auto-discovered from PARENT refs.
2. BUILD g.build() (or auto on first run)
_setup_schema scan PARENT refs → graph inputs/outputs
_setup_endpoints find entry/exit ops from topology
_build() adj list + ready counts + stream ready counts
validate branch targets, cycles, reachability, refs
3. EXECUTE g.run(state, context_id) — async generator
→ run_task_scheduler() drives ops via Frame/EOF events
→ yields (ctx, outputs) per batch or per stream frame
→ loop iteration handled inside scheduler EOF handler
4. EXPORT serialize() config dict for Rust backend
validate() graph structure validation
show() debug display
Source code in operonx/core/ops/graph/graph_op.py
Functions¶
loop
classmethod
¶
loop(
name: Optional[str] = None,
until: Optional[Union[str, Callable]] = None,
max_iterations: int = 100,
**initial_state: Any,
)
Create a GraphOp configured for feedback-loop execution.
Each iteration re-runs the graph's scheduler, carrying forward outputs
as the next iteration's inputs. Stops when until evaluates to True
or max_iterations is reached.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
Optional[str]
|
Graph name. |
None
|
until
|
Optional[Union[str, Callable]]
|
Stop condition — a string expression (evaluated against outputs)
or a callable |
None
|
max_iterations
|
int
|
Safety cap on iterations (default 100). |
100
|
**initial_state
|
Any
|
Initial values for loop variables, injected as inputs. |
{}
|
Example::
with GraphOp.loop(name="counter", until="count >= 5", count=0) as g:
inc = increment(counter=PARENT["count"])
inc["counter"] >> PARENT["count"]
START >> inc >> END
Source code in operonx/core/ops/graph/graph_op.py
get_current_graph
staticmethod
¶
add_op
¶
Add an op to the graph.
Source code in operonx/core/ops/graph/graph_op.py
add_edge
¶
Add an edge between two ops.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str
|
Source op name. |
required |
target
|
str
|
Target op name. |
required |
type
|
EdgeType
|
Edge type (normal, lookback, condition). |
'normal'
|
soft
|
bool
|
If True, edge does not count toward ready_count. Used for branch outputs when only one branch executes. |
False
|
Source code in operonx/core/ops/graph/graph_op.py
build
¶
Build graph: children first, then schema → endpoints → topology → validation.
Source code in operonx/core/ops/graph/graph_op.py
run
async
¶
run(
state: MemoryState, context_id: Optional[tuple] = None
) -> AsyncGenerator[Tuple[tuple, Dict[str, Any]], None]
Execute graph: get inputs → schedule ops → loop if needed → store results.
Source code in operonx/core/ops/graph/graph_op.py
serialize
¶
Serialize full graph to config dict for the Rust backend.
Note: the key "initial_ready_count" is kept as-is for Rust backend
compatibility even though the internal Python attribute was renamed to
_initial_ready during the scheduler rewrite.
Source code in operonx/core/ops/graph/graph_op.py
validate
¶
Run all validations and return result.
show
¶
Display graph structure (debug).
Source code in operonx/core/ops/graph/graph_op.py
FuncOp
¶
FuncOp(
code_fn: Optional[Callable] = None,
return_keys: Optional[List[str]] = None,
inputs: Dict[str, Any] = None,
outputs: Dict[str, Any] = None,
_mappings: Dict[str, Any] = None,
**kwargs,
)
Bases: BaseOp
Op that executes a Python function.
Inputs and outputs are auto-extracted from the function's signature and
return-statement AST. Both sync and async functions are supported.
Prefer the @op decorator over instantiating FuncOp directly.
Inputs
Auto-parsed from the function's parameter list.
Outputs
Auto-parsed from return {"key": ...} via AST, or from
explicit return_keys.
Example::
@op
def add(a: int, b: int):
return {"sum": a + b}
with GraphOp(name="main") as graph:
result = add(a=PARENT["x"], b=PARENT["y"])
START >> result >> END
Source code in operonx/core/ops/transform/func_op.py
Attributes¶
Functions¶
run
async
¶
run(
state: MemoryState, context_id: Optional[str] = None
) -> AsyncGenerator[Tuple[Optional[str], Dict[str, Any]], None]
Execute FuncOp with CodeError wrapping.
Delegates to BaseOp.run() (async generator) and re-raises any
exception as a CodeError with full op context attached.
Source code in operonx/core/ops/transform/func_op.py
ParserOp
¶
ParserOp(
format: ParserType = "xml",
extract: Optional[List[str]] = None,
inputs: Dict[str, Any] = None,
outputs: Dict[str, Any] = None,
**kwargs,
)
Bases: BaseOp
Op that parses text into structured data.
Supports multiple formats (JSON, XML, YAML) and extracts fields using
dot-separated chain paths (e.g. "user.address.city: str"). Commonly
used as the final stage inside a ChainOp pipeline.
Inputs
text (str): Raw text to parse (e.g. LLM output).
Outputs
Dynamically generated from the extract list — one output key
per extracted field.
Example::
parser = ParserOp(
format="json",
extract=["user.name: str", "user.age: int"],
inputs={"text": llm["content"]},
)
Source code in operonx/core/ops/transform/parser_op.py
Functions¶
shorthand
¶
Decorator for Op.of() classmethods.
Registers the function for auto-naming frame skip via register_skip()
and wraps as classmethod.
Usage::
class MyOp(BaseOp):
@shorthand
def of(cls, my_param=None, **kwargs):
inputs, init_kwargs = split_shorthand_kwargs(kwargs)
return cls(my_param=my_param, inputs=inputs or None, **init_kwargs)
Source code in operonx/core/ops/_shortcuts.py
split_shorthand_kwargs
¶
Split flat kwargs into (inputs, init_kwargs).
Used by shorthand functions (llm_, for_, op, etc.) to separate op constructor kwargs from input mappings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
kwargs
|
dict
|
Flat keyword arguments from shorthand function. |
required |
extra_init_keys
|
set
|
Additional op-specific init keys beyond base keys (e.g., {'max_concurrency', 'callback'} for iteration ops). |
None
|
Returns:
| Type | Description |
|---|---|
tuple
|
(inputs, init_kwargs) tuple where: |
tuple
|
|
tuple
|
|
Example
Provider ops - just base keys¶
inputs, init_kwargs = split_shorthand_kwargs(kwargs)
Iteration ops - with extra keys¶
inputs, init_kwargs = split_shorthand_kwargs( kwargs, {'max_concurrency', 'until', 'callback'} )
Source code in operonx/core/ops/_shortcuts.py
if_
¶
Start a branch declaration with the first condition.
Example::
router = if_(PARENT["score"] >= 90, "excellent").else_("fail")
Source code in operonx/core/ops/flow/branch_op.py
op
¶
op(
func: Optional[Callable] = None,
*,
executor: Optional[str] = None,
bound: Optional[str] = None,
cache=None,
delay: float = 0,
)
Decorator that turns a plain function into a FuncOp factory.
Can be used bare or with keyword arguments::
@op
def double(x: int):
return {"result": x * 2}
@op(bound="cpu")
def heavy_compute(data: list):
return {"result": process(data)}
@op(bound="io")
async def call_api(url: str):
return {"data": await fetch(url)}
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bound
|
Optional[str]
|
Execution bound hint for the scheduler.
|
None
|
executor
|
Optional[str]
|
Deprecated — use |
None
|