Skip to content

operonx.core

Engine, op decorators, graph composition, state markers, and middleware. This page is the primary public surface — everything you need to build and run a workflow without touching providers or telemetry.

Engine

Operon

Operon(
    graph: Union[GraphOp, Callable[..., GraphOp]],
    *,
    params: Optional[Dict[str, Any]] = None,
    tracer: Optional[Union[Tracer, List[Tracer]]] = None,
)

Workflow execution engine.

Operon takes a GraphOp and provides execution capabilities: - Builds and validates the graph structure - Creates state schema for data flow - Executes workflows with fresh state per run - Integrates with tracers for observability

Attributes:

Name Type Description
graph

The GraphOp to execute

name

Workflow name (from graph)

schema StateSchema

State schema for the workflow

Example
# Define graph
with GraphOp(name="chatbot") as graph:
    prompt = PromptOp(name="prompt", ...)
    llm = LLMOp(name="llm", ...)
    START >> prompt >> llm >> END

# Create engine (builds automatically)
engine = Operon(graph)

# Run multiple times with fresh state
result = await engine.run(inputs={"query": "Hello!"})
print(result["response"])      # workflow output
print(result["$state"])        # MemoryState for debugging

# Or use callable syntax
result = await engine({"query": "Goodbye!"})

Initialize Operon engine with a GraphOp or a graph factory.

Pure orchestrator — does not load .env or resources.yaml. Call :func:operonx.bootstrap (or :meth:ResourceHub.from_yaml directly) before constructing the engine if your graph uses provider ops. Pure-compute graphs need no setup.

Parameters:

Name Type Description Default
graph Union[GraphOp, Callable[..., GraphOp]]

A GraphOp workflow, or a callable that returns one. When a callable is passed, it is invoked with **params immediately — call :func:operonx.bootstrap first if the factory needs the hub.

required
params Optional[Dict[str, Any]]

Keyword arguments passed to the graph factory. Ignored when graph is already a GraphOp. Defaults to {}.

None
tracer Optional[Union[Tracer, List[Tracer]]]

Default tracer(s) for all run() calls. Can be overridden per-run.

None

Raises:

Type Description
RuntimeError

If a provider op needs the hub but none has been installed. The message points at operonx.bootstrap().

Source code in operonx/core/engine.py
def __init__(
    self,
    graph: Union[GraphOp, Callable[..., GraphOp]],
    *,
    params: Optional[Dict[str, Any]] = None,
    tracer: Optional[Union["Tracer", List["Tracer"]]] = None,
):
    """Initialize Operon engine with a GraphOp or a graph factory.

    Pure orchestrator — does **not** load ``.env`` or ``resources.yaml``.
    Call :func:`operonx.bootstrap` (or :meth:`ResourceHub.from_yaml` directly)
    before constructing the engine if your graph uses provider ops.
    Pure-compute graphs need no setup.

    Args:
        graph: A GraphOp workflow, or a callable that returns one.
               When a callable is passed, it is invoked with ``**params``
               immediately — call :func:`operonx.bootstrap` first if the
               factory needs the hub.
        params: Keyword arguments passed to the graph factory. Ignored
                when *graph* is already a GraphOp. Defaults to ``{}``.
        tracer: Default tracer(s) for all run() calls. Can be overridden per-run.

    Raises:
        RuntimeError: If a provider op needs the hub but none has been
            installed. The message points at ``operonx.bootstrap()``.
    """
    if callable(graph) and not isinstance(graph, GraphOp):
        graph = graph(**(params or {}))

    self.graph = graph
    self.name = graph.name
    self._tracer = tracer

    # Build graph and create schema immediately
    self.graph.build()
    self._schema = StateSchema(self.graph)
    self._middleware: List[Middleware] = []

    # Eagerly init backends if a hub is already configured
    self._warmup_ops()

    LOGGER.debug(
        "Operon engine initialized for workflow [highlight]%s[/highlight]",
        self.name,
    )

Attributes

schema property

schema: StateSchema

Access the workflow state schema.

Functions

use

use(middleware: Middleware) -> Operon

Add middleware to the engine. Returns self for chaining.

Parameters:

Name Type Description Default
middleware Middleware

A Middleware instance to add.

required

Returns:

Type Description
Operon

self, for fluent chaining: engine.use(m1).use(m2)

Source code in operonx/core/engine.py
def use(self, middleware: Middleware) -> "Operon":
    """Add middleware to the engine. Returns self for chaining.

    Args:
        middleware: A Middleware instance to add.

    Returns:
        self, for fluent chaining: ``engine.use(m1).use(m2)``
    """
    self._middleware.append(middleware)
    return self

start

start(
    inputs: Dict[str, Any],
    *,
    user_id: Optional[str] = None,
    session_id: Optional[str] = None,
    request_id: Optional[str] = None,
    tracer: Optional[Union[Tracer, List[Tracer]]] = None,
    scratch: Optional[Dict[str, Any]] = None,
) -> ExecutionHandle

Start workflow execution and return a streaming handle immediately.

Does not block — the graph runs in the background. Use the handle to stream frames, await specific outputs, or collect the final result.

Tracer flush happens automatically when the scheduler completes — no explicit finalize step needed.

Parameters:

Name Type Description Default
inputs Dict[str, Any]

Input data for the workflow

required
user_id Optional[str]

Optional user identifier (auto-generated if not provided)

None
session_id Optional[str]

Optional session identifier (auto-generated if not provided)

None
request_id Optional[str]

Optional request identifier (auto-generated if not provided)

None
tracer Optional[Union[Tracer, List[Tracer]]]

Optional tracer(s) — overrides engine default for this execution.

None
scratch Optional[Dict[str, Any]]

Optional initial values for per-call scratch space. Applied synchronously before the scheduler task is created — race-free. Equivalent to writing handle.scratch[k] = v before the first await after start(), but guaranteed to be visible to entry ops.

None

Returns:

Type Description
ExecutionHandle

ExecutionHandle — async-iterable, supports await handle["op","var"]

ExecutionHandle

and await handle.collect()

Source code in operonx/core/engine.py
def start(
    self,
    inputs: Dict[str, Any],
    *,
    user_id: Optional[str] = None,
    session_id: Optional[str] = None,
    request_id: Optional[str] = None,
    tracer: Optional[Union["Tracer", List["Tracer"]]] = None,
    scratch: Optional[Dict[str, Any]] = None,
) -> "ExecutionHandle":
    """Start workflow execution and return a streaming handle immediately.

    Does not block — the graph runs in the background. Use the handle to
    stream frames, await specific outputs, or collect the final result.

    Tracer flush happens automatically when the scheduler completes — no
    explicit finalize step needed.

    Args:
        inputs: Input data for the workflow
        user_id: Optional user identifier (auto-generated if not provided)
        session_id: Optional session identifier (auto-generated if not provided)
        request_id: Optional request identifier (auto-generated if not provided)
        tracer: Optional tracer(s) — overrides engine default for this execution.
        scratch: Optional initial values for per-call scratch space. Applied
            synchronously before the scheduler task is created — race-free.
            Equivalent to writing ``handle.scratch[k] = v`` before the first
            ``await`` after ``start()``, but guaranteed to be visible to
            entry ops.

    Returns:
        ExecutionHandle — async-iterable, supports ``await handle["op","var"]``
        and ``await handle.collect()``
    """
    user_id = user_id or str(uuid.uuid4())
    session_id = session_id or str(uuid.uuid4())
    request_id = request_id or str(uuid.uuid4())

    # Resolve tracer: per-call overrides engine default. The only
    # tracer shape supported now is ``TracePipeline`` (or a subclass —
    # ``LangfuseTracer`` is one). Legacy Tracer subclasses were retired
    # in T2.13.
    effective = tracer if tracer is not None else self._tracer
    tracers_raw = (
        effective if isinstance(effective, list) else ([effective] if effective else [])
    )

    from operonx.core.tracing.pipeline import TracePipeline as _TracePipeline

    pipelines = [t for t in tracers_raw if isinstance(t, _TracePipeline)]
    invalid = [t for t in tracers_raw if not isinstance(t, _TracePipeline)]
    if invalid:
        raise TypeError(
            f"Engine accepts only TracePipeline instances as `tracer=`; "
            f"got {[type(t).__name__ for t in invalid]}. The legacy Tracer "
            "API was removed in T2.13 — use ``LangfuseTracer`` (a "
            "TracePipeline subclass) or build a ``TracePipeline`` directly."
        )

    state = self._schema.create_state(
        inputs=inputs,
        user_id=user_id,
        session_id=session_id,
        request_id=request_id,
    )
    # Legacy ``state.tracing`` flag retained for back-compat with code
    # that reads it (e.g. inside op.run() for per-op metric writes).
    # No longer load-bearing for trace dispatch.
    state.tracing = False

    # Seed scratch synchronously before the scheduler task is created.
    if scratch:
        state._scratch.update(scratch)

    # Bind emitter for the event-stream pipeline. NullEmitter when no
    # pipeline configured — keeps op code's ``current_emitter().emit_*``
    # calls cheap (one method-table dispatch).
    from operonx.core.tracing.emitter import (
        NullEmitter as _NullEmitter,
    )
    from operonx.core.tracing.emitter import (
        _current_emitter_var,
    )

    if pipelines:
        # First pipeline drives the emitter; secondary pipelines (rare)
        # could subscribe via _push fanout if needed in the future.
        primary = pipelines[0]
        primary_state_tags = list(state.tags) if getattr(state, "tags", None) else []
        new_emitter = primary.emitter(
            request_id=request_id,
            workflow_name=self.name,
            user_id=user_id,
            session_id=session_id,
            tags=primary_state_tags,
        )
    else:
        primary = None
        new_emitter = _NullEmitter()

    LOGGER.info(format_event("workflow_start", request_id=request_id, graph_name=self.name))

    graph_name = self.name
    queue: asyncio.Queue = asyncio.Queue()

    async def _run() -> None:
        emitter_token = _current_emitter_var.set(new_emitter)
        try:
            await self.graph._scheduler.run(state, ("main",), output_queue=queue)
        except Exception as e:
            queue.put_nowait(e)
        except asyncio.CancelledError:
            queue.put_nowait(None)
            raise
        finally:
            # Pipeline final flush — drains buffered events through
            # processors + exporters. Awaited inline so engine.run()
            # returns only after every exporter has finished posting.
            if primary is not None:
                try:
                    await primary.flush(partial=False)
                except Exception:
                    LOGGER.exception("trace pipeline final flush failed")
            _current_emitter_var.reset(emitter_token)
            LOGGER.info(
                format_event("workflow_done", request_id=request_id, graph_name=graph_name)
            )

    scheduler_task = asyncio.create_task(_run())
    return ExecutionHandle(queue, scheduler_task, state)

run async

run(
    inputs: Dict[str, Any],
    *,
    user_id: Optional[str] = None,
    session_id: Optional[str] = None,
    request_id: Optional[str] = None,
    tracer: Optional[Union[Tracer, List[Tracer]]] = None,
    scratch: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]

Execute the workflow with given inputs.

Each call creates a fresh state, so the same engine can be used for multiple independent executions. Equivalent to::

handle = engine.start(inputs, tracer=tracer, ...)
result = await handle.collect(unwrap=True)

Tracer flush happens automatically inside start() when the scheduler completes.

Parameters:

Name Type Description Default
inputs Dict[str, Any]

Input data for the workflow

required
user_id Optional[str]

Optional user identifier (auto-generated if not provided)

None
session_id Optional[str]

Optional session identifier (auto-generated if not provided)

None
request_id Optional[str]

Optional request identifier (auto-generated if not provided)

None
tracer Optional[Union[Tracer, List[Tracer]]]

Optional tracer or list of tracers for observability. Overrides the default tracer set in Operon(..., tracer=...).

None
scratch Optional[Dict[str, Any]]

Optional initial values for per-call scratch space.

None

Returns:

Type Description
Dict[str, Any]

Dictionary containing workflow outputs plus "$state" key

Dict[str, Any]

with the MemoryState for debugging/tracing access.

Source code in operonx/core/engine.py
async def run(
    self,
    inputs: Dict[str, Any],
    *,
    user_id: Optional[str] = None,
    session_id: Optional[str] = None,
    request_id: Optional[str] = None,
    tracer: Optional[Union["Tracer", List["Tracer"]]] = None,
    scratch: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
    """Execute the workflow with given inputs.

    Each call creates a fresh state, so the same engine can be
    used for multiple independent executions.  Equivalent to::

        handle = engine.start(inputs, tracer=tracer, ...)
        result = await handle.collect(unwrap=True)

    Tracer flush happens automatically inside ``start()`` when the
    scheduler completes.

    Args:
        inputs: Input data for the workflow
        user_id: Optional user identifier (auto-generated if not provided)
        session_id: Optional session identifier (auto-generated if not provided)
        request_id: Optional request identifier (auto-generated if not provided)
        tracer: Optional tracer or list of tracers for observability.
                Overrides the default tracer set in ``Operon(..., tracer=...)``.
        scratch: Optional initial values for per-call scratch space.

    Returns:
        Dictionary containing workflow outputs plus "$state" key
        with the MemoryState for debugging/tracing access.
    """
    user_id = user_id or str(uuid.uuid4())
    session_id = session_id or str(uuid.uuid4())
    request_id = request_id or str(uuid.uuid4())

    context: Dict[str, Any] = {
        "user_id": user_id,
        "session_id": session_id,
        "request_id": request_id,
    }

    # Apply before_run middleware (in order)
    for mw in self._middleware:
        inputs = await mw.before_run(self.graph, inputs, context)

    handle = self.start(
        inputs,
        user_id=user_id,
        session_id=session_id,
        request_id=request_id,
        tracer=tracer,
        scratch=scratch,
    )

    try:
        result = await handle.collect(unwrap=True)
    except Exception as e:
        for mw in reversed(self._middleware):
            await mw.on_error(self.graph, inputs, e, context)
        raise

    result["$state"] = handle.state

    # Apply after_run middleware (in reverse order)
    for mw in reversed(self._middleware):
        result = await mw.after_run(self.graph, inputs, result, context)

    return result

serve

serve(
    *,
    path: str = "/",
    host: str = "0.0.0.0",
    port: int = 8000,
    stream: Optional[bool] = None,
    websocket: bool = False,
    backend: str = "python",
    **kwargs: Any,
) -> None

Serve this workflow as an HTTP API.

Convenience wrapper around operonx.serve.OperonApp. Requires operonx-serve to be installed.

Parameters:

Name Type Description Default
path str

URL path for the endpoint (default: "/").

'/'
host str

Bind address.

'0.0.0.0'
port int

Bind port.

8000
stream Optional[bool]

Enable SSE streaming endpoint. None = auto-detect.

None
websocket bool

Enable WebSocket endpoint.

False
backend str

"python" (FastAPI/uvicorn) or "rust" (Axum).

'python'
**kwargs Any

Extra arguments forwarded to OperonApp.serve().

{}
Source code in operonx/core/engine.py
def serve(
    self,
    *,
    path: str = "/",
    host: str = "0.0.0.0",
    port: int = 8000,
    stream: Optional[bool] = None,
    websocket: bool = False,
    backend: str = "python",
    **kwargs: Any,
) -> None:
    """Serve this workflow as an HTTP API.

    Convenience wrapper around ``operonx.serve.OperonApp``. Requires operonx-serve
    to be installed.

    Args:
        path: URL path for the endpoint (default: "/").
        host: Bind address.
        port: Bind port.
        stream: Enable SSE streaming endpoint. None = auto-detect.
        websocket: Enable WebSocket endpoint.
        backend: "python" (FastAPI/uvicorn) or "rust" (Axum).
        **kwargs: Extra arguments forwarded to ``OperonApp.serve()``.
    """
    try:
        from operonx.serve import OperonApp
    except ImportError:
        raise ImportError(
            "operonx-serve is required for engine.serve(). Install it with: pip install operonx-serve"
        ) from None

    app = OperonApp(tracer=self._tracer)
    app.endpoint(path, graph=self.graph, stream=stream, websocket=websocket)
    app.serve(host=host, port=port, backend=backend, **kwargs)

batch async

batch(
    inputs_list: List[Dict[str, Any]], *, concurrency: int = 10, **kwargs: Any
) -> List[Dict[str, Any]]

Run the workflow concurrently on multiple inputs.

Parameters:

Name Type Description Default
inputs_list List[Dict[str, Any]]

List of input dicts to process.

required
concurrency int

Max concurrent executions (default: 10).

10
**kwargs Any

Extra arguments forwarded to run().

{}

Returns:

Type Description
List[Dict[str, Any]]

List of result dicts in the same order as inputs.

Source code in operonx/core/engine.py
async def batch(
    self,
    inputs_list: List[Dict[str, Any]],
    *,
    concurrency: int = 10,
    **kwargs: Any,
) -> List[Dict[str, Any]]:
    """Run the workflow concurrently on multiple inputs.

    Args:
        inputs_list: List of input dicts to process.
        concurrency: Max concurrent executions (default: 10).
        **kwargs: Extra arguments forwarded to ``run()``.

    Returns:
        List of result dicts in the same order as inputs.
    """
    sem = asyncio.Semaphore(concurrency)

    async def _run(inp: Dict[str, Any]) -> Dict[str, Any]:
        async with sem:
            return await self.run(inp, **kwargs)

    return list(await asyncio.gather(*[_run(inp) for inp in inputs_list]))

cli

cli() -> None

Interactive CLI mode — read JSON from stdin, print result to stdout.

Source code in operonx/core/engine.py
def cli(self) -> None:
    """Interactive CLI mode — read JSON from stdin, print result to stdout."""
    inputs = json.load(sys.stdin)
    result = asyncio.run(self.run(inputs))
    # Filter internal keys for clean output
    output = {k: v for k, v in result.items() if not k.startswith("$")}
    json.dump(output, sys.stdout, indent=2)
    sys.stdout.write("\n")

input_schema

input_schema() -> Dict[str, Any]

Return JSON Schema describing the workflow's expected inputs.

Source code in operonx/core/engine.py
def input_schema(self) -> Dict[str, Any]:
    """Return JSON Schema describing the workflow's expected inputs."""
    return self._params_to_schema(self.graph.inputs or {}, f"{self.name}_input")

output_schema

output_schema() -> Dict[str, Any]

Return JSON Schema describing the workflow's outputs.

Source code in operonx/core/engine.py
def output_schema(self) -> Dict[str, Any]:
    """Return JSON Schema describing the workflow's outputs."""
    return self._params_to_schema(self.graph.outputs or {}, f"{self.name}_output")

show

show() -> None

Display workflow structure for debugging.

Source code in operonx/core/engine.py
def show(self) -> None:
    """Display workflow structure for debugging."""
    print(f"\n=== Operon Engine: {self.name} ===")
    self.graph.show()
    print()
    self._schema.show()

Decorators

The two decorators that turn ordinary Python into Operonx ops:

op

op(
    func: Optional[Callable] = None,
    *,
    executor: Optional[str] = None,
    bound: Optional[str] = None,
    cache=None,
    delay: float = 0,
)

Decorator that turns a plain function into a FuncOp factory.

Can be used bare or with keyword arguments::

@op
def double(x: int):
    return {"result": x * 2}

@op(bound="cpu")
def heavy_compute(data: list):
    return {"result": process(data)}

@op(bound="io")
async def call_api(url: str):
    return {"data": await fetch(url)}

Parameters:

Name Type Description Default
bound Optional[str]

Execution bound hint for the scheduler. None (default) auto-detects: async → "io", sync → "sync". "sync" — inline dispatch, no asyncio task (fastest). "io" — asyncio task, for network/disk I/O. "cpu" — asyncio.to_thread(), for heavy compute (C extensions release GIL).

None
executor Optional[str]

Deprecated — use bound="cpu" instead of executor="thread".

None
Source code in operonx/core/ops/transform/func_op.py
def op(
    func: Optional[Callable] = None,
    *,
    executor: Optional[str] = None,
    bound: Optional[str] = None,
    cache=None,
    delay: float = 0,
):
    """Decorator that turns a plain function into a FuncOp factory.

    Can be used bare or with keyword arguments::

        @op
        def double(x: int):
            return {"result": x * 2}

        @op(bound="cpu")
        def heavy_compute(data: list):
            return {"result": process(data)}

        @op(bound="io")
        async def call_api(url: str):
            return {"data": await fetch(url)}

    Args:
        bound: Execution bound hint for the scheduler.
            ``None`` (default) auto-detects: async → ``"io"``, sync → ``"sync"``.
            ``"sync"``  — inline dispatch, no asyncio task (fastest).
            ``"io"``    — asyncio task, for network/disk I/O.
            ``"cpu"``   — asyncio.to_thread(), for heavy compute (C extensions release GIL).
        executor: Deprecated — use ``bound="cpu"`` instead of ``executor="thread"``.
    """
    # Backward compat: executor="thread" → bound="cpu"
    if executor == "thread" and bound is None:
        bound = "cpu"

    def decorator(fn):
        module = fn.__module__ or ""
        if module in ("__main__", "") or "." not in module:
            fn._func_name = fn.__name__
        else:
            fn._func_name = f"{module}.{fn.__name__}"
        if bound is not None:
            fn._op_bound = bound
        if cache is not None:
            fn._op_cache = cache
        sig = inspect.signature(fn)
        collisions = set(sig.parameters.keys()) & _BASE_INIT_KEYS
        if collisions:
            LOGGER.warning(
                "@op function '%s' has parameter(s) %s that collide with reserved op keywords %s. "
                "When called via shorthand (e.g. %s(name=PARENT['name'])), these may be misinterpreted "
                "as op constructor args instead of function inputs. Consider renaming them.",
                fn.__name__,
                sorted(collisions),
                sorted(_BASE_INIT_KEYS),
                fn.__name__,
            )

        @wraps(fn)
        def wrapper(**kwargs):
            mappings, init_kwargs = split_shorthand_kwargs(kwargs, {"return_keys"})
            op_bound = init_kwargs.pop("bound", bound)
            # Backward compat: call-time executor="thread" → bound="cpu"
            op_executor = init_kwargs.pop("executor", None)
            if op_executor == "thread" and op_bound is None:
                op_bound = "cpu"
            op_delay = init_kwargs.pop("delay", delay)
            return FuncOp(
                code_fn=fn,
                bound=op_bound,
                delay=op_delay,
                _mappings=mappings or None,
                **init_kwargs,
            )

        register_skip(wrapper)
        wrapper.__wrapped__ = fn
        return wrapper

    if func is not None:
        # @op without parentheses
        return decorator(func)
    # @op(bound="cpu") with parentheses
    return decorator

graph

GraphOp — container op that manages a graph of child ops.

Classes

GraphOp

GraphOp(concurrency: int = 64, **kwargs)

Bases: BaseOp

Container op that holds and executes a directed graph of child ops.

Lifecycle::

1. DEFINE        with GraphOp(name="wf") as g:
                     a = double(x=PARENT["x"])
                     b = add(a=a["result"], b=PARENT["y"])
                     START >> a >> b >> END
                 Ops auto-register via context manager. Edges via >> operator.
                 Inputs/outputs auto-discovered from PARENT refs.

2. BUILD         g.build()  (or auto on first run)
                 _setup_schema    scan PARENT refs → graph inputs/outputs
                 _setup_endpoints find entry/exit ops from topology
                 _build()         adj list + ready counts + stream ready counts
                 validate         branch targets, cycles, reachability, refs

3. EXECUTE       g.run(state, context_id)  — async generator
                 → run_task_scheduler()  drives ops via Frame/EOF events
                 → yields (ctx, outputs) per batch or per stream frame
                 → loop iteration handled inside scheduler EOF handler

4. EXPORT        serialize()  config dict for Rust backend
                 validate()   graph structure validation
                 show()       debug display
Source code in operonx/core/ops/graph/graph_op.py
def __init__(self, concurrency: int = 64, **kwargs):
    super().__init__(**kwargs)
    self._token = None
    self._is_building = True
    self._ops: Dict[str, BaseOp] = {}
    self._edges = {}
    self.entries = []
    self.exits = []
    self.prevs = defaultdict(list)
    self.nexts = defaultdict(list)
    self.concurrency = concurrency
    self._loop_config = None
    self._shared_vars = {}  # {var_name: initial_value} — set by PARENT.shared()
    self._adj = {}  # {op_name: [Link(dst, soft), ...]}
    self._initial_ready = {}  # {op_name: ready_count}
    self._stream_initial_ready = {}  # {gen_name: {op_name: ready_count_for_stream_ctx}}
    self._scheduler = None  # set by build()
    self._out_vars: Dict[
        str, dict
    ] = {}  # {op_name: {src_var: dest_var}} — vars mapped to PARENT output
Functions
loop classmethod
loop(
    name: Optional[str] = None,
    until: Optional[Union[str, Callable]] = None,
    max_iterations: int = 100,
    **initial_state: Any,
)

Create a GraphOp configured for feedback-loop execution.

Each iteration re-runs the graph's scheduler, carrying forward outputs as the next iteration's inputs. Stops when until evaluates to True or max_iterations is reached.

Parameters:

Name Type Description Default
name Optional[str]

Graph name.

None
until Optional[Union[str, Callable]]

Stop condition — a string expression (evaluated against outputs) or a callable (outputs_dict) -> bool.

None
max_iterations int

Safety cap on iterations (default 100).

100
**initial_state Any

Initial values for loop variables, injected as inputs.

{}

Example::

with GraphOp.loop(name="counter", until="count >= 5", count=0) as g:
    inc = increment(counter=PARENT["count"])
    inc["counter"] >> PARENT["count"]
    START >> inc >> END
Source code in operonx/core/ops/graph/graph_op.py
@classmethod
def loop(
    cls,
    name: Optional[str] = None,
    until: Optional[Union[str, Callable]] = None,
    max_iterations: int = 100,
    **initial_state: Any,
):
    """Create a GraphOp configured for feedback-loop execution.

    Each iteration re-runs the graph's scheduler, carrying forward outputs
    as the next iteration's inputs. Stops when ``until`` evaluates to True
    or ``max_iterations`` is reached.

    Args:
        name: Graph name.
        until: Stop condition — a string expression (evaluated against outputs)
               or a callable ``(outputs_dict) -> bool``.
        max_iterations: Safety cap on iterations (default 100).
        **initial_state: Initial values for loop variables, injected as inputs.

    Example::

        with GraphOp.loop(name="counter", until="count >= 5", count=0) as g:
            inc = increment(counter=PARENT["count"])
            inc["counter"] >> PARENT["count"]
            START >> inc >> END
    """
    g = cls(name=name, inputs=initial_state or None)
    g._loop_config = LoopConfig(
        until=until,
        max_iterations=max_iterations,
    )
    return g
get_current_graph staticmethod
get_current_graph() -> Optional[GraphOp]

Return the current graph from context.

Source code in operonx/core/ops/graph/graph_op.py
@staticmethod
def get_current_graph() -> Optional["GraphOp"]:
    """Return the current graph from context."""
    try:
        return _current_graph.get()
    except LookupError:
        return None
add_op
add_op(op: BaseOp) -> BaseOp

Add an op to the graph.

Source code in operonx/core/ops/graph/graph_op.py
def add_op(self, op: BaseOp) -> BaseOp:
    """Add an op to the graph."""
    if not self._is_building:
        raise RuntimeError("Cannot add op after graph has been built")

    if getattr(op, "_is_operonx_builder", False):
        name = getattr(op, "_name", None) or type(op).__name__
        LOGGER.error(
            "%s '%s' is not built. Call .build() or .else_() before adding to graph.",
            type(op).__name__,
            name,
        )
        raise TypeError(
            f"{type(op).__name__} '{name}' is not built. "
            f"Call .build() or .else_() to create the op."
        )

    if op in [START, END]:
        return op

    if op.name in self._ops:
        LOGGER.warning(
            "Graph [highlight]%s[/highlight]: op [highlight]%s[/highlight] already exists and will be overwritten",
            self.name,
            op.name,
        )

    self._ops[op.name] = op

    if hasattr(op, "start") and op.start:
        if op.name not in self.entries:
            self.entries.append(op.name)

    if hasattr(op, "end") and op.end:
        if op.name not in self.exits:
            self.exits.append(op.name)

    return op
add_edge
add_edge(
    source: str, target: str, type: EdgeType = "normal", soft: bool = False
)

Add an edge between two ops.

Parameters:

Name Type Description Default
source str

Source op name.

required
target str

Target op name.

required
type EdgeType

Edge type (normal, lookback, condition).

'normal'
soft bool

If True, edge does not count toward ready_count. Used for branch outputs when only one branch executes.

False
Source code in operonx/core/ops/graph/graph_op.py
def add_edge(self, source: str, target: str, type: EdgeType = "normal", soft: bool = False):
    """Add an edge between two ops.

    Args:
        source: Source op name.
        target: Target op name.
        type: Edge type (normal, lookback, condition).
        soft: If True, edge does not count toward ready_count.
              Used for branch outputs when only one branch executes.
    """
    if not self._is_building:
        raise RuntimeError("Cannot add edge after graph has been built!")

    if source == START.name:
        if target not in self._ops:
            raise ValueError(f"Target op '{target}' not found")

        target_node = self._ops[target]
        target_node.start = True

        if target not in self.entries:
            self.entries.append(target)

        return

    if target == END.name:
        if source not in self._ops:
            raise ValueError(f"Source op '{source}' not found")

        source_node = self._ops[source]
        source_node.end = True

        if source not in self.exits:
            self.exits.append(source)

        return

    if target == PARENT.name:
        return

    if source not in self._ops:
        raise ValueError(f"Source op '{source}' not found")
    if target not in self._ops:
        raise ValueError(f"Target op '{target}' not found")

    new_edge = EdgeConfig(from_node=source, to_node=target, type=type, soft=soft)
    if (source, target) not in self._edges:
        self._edges[source, target] = new_edge
        self.nexts[source].append(target)
        self.prevs[target].append(source)
build
build()

Build graph: children first, then schema → endpoints → topology → validation.

Source code in operonx/core/ops/graph/graph_op.py
def build(self):
    """Build graph: children first, then schema → endpoints → topology → validation."""
    for child in self._ops.values():
        if hasattr(child, "build"):
            child.build()

    self._setup_schema()
    self._setup_endpoints()

    result = self.validate()
    result.raise_if_errors()

    self._build()

    self._scheduler = Scheduler(self)
    self._is_building = False
    self._cache_full_names()

    # Auto-detect bound from children, with user override.
    # If user explicitly set bound on the graph, respect it.
    # Otherwise: all children sync → graph is sync (inline); any io/cpu → task.
    if self.bound is None:
        if all(getattr(op, "bound", None) == "sync" for op in self._ops.values()):
            self.bound = "sync"
        else:
            self.bound = "io"
run async
run(
    state: MemoryState, context_id: Optional[tuple] = None
) -> AsyncGenerator[Tuple[tuple, Dict[str, Any]], None]

Execute graph: get inputs → schedule ops → loop if needed → store results.

Source code in operonx/core/ops/graph/graph_op.py
async def run(
    self,
    state: "MemoryState",
    context_id: Optional[tuple] = None,
) -> AsyncGenerator[Tuple[tuple, Dict[str, Any]], None]:
    """Execute graph: get inputs → schedule ops → loop if needed → store results."""

    if context_id is None:
        context_id = DEFAULT_CONTEXT

    request_id = state.request_id
    start_time = datetime.now(timezone.utc)
    perf_start = perf_counter()
    _inputs = {}
    _outputs = {}
    error_msg = None

    try:
        _inputs = self.get_inputs(state, context_id=context_id)

        if self._is_building:
            self.build()

        _outputs, stream_ctxs = await self._scheduler.run(state, context_id)

        _has_generators = any(op.is_gen for op in self._ops.values())
        if not stream_ctxs and not _has_generators:
            self.store_result(state, _outputs, context_id)
            yield context_id, _outputs
        else:
            for sctx in stream_ctxs:
                item = self.get_outputs(state, context_id=sctx)
                if any(v is not None for v in item.values()):
                    self.store_result(state, item, sctx)
                    yield sctx, item

    except Exception:
        import sys

        error_msg = (
            traceback.format_exc()
            if LOGGER.isEnabledFor(40)
            else f"{type(sys.exc_info()[1]).__name__}: {sys.exc_info()[1]}"
        )
        LOGGER.error(
            "[title]\\[%s][/title] Error in op [highlight]%s[/highlight]:\n%s",
            request_id,
            self.name,
            error_msg.rstrip(),
        )

    finally:
        end_time = datetime.now(timezone.utc)
        duration_ms = (perf_counter() - perf_start) * 1000
        self._log(request_id, context_id, _inputs, _outputs, duration_ms)
        self._store_metrics(
            state,
            context_id,
            start_time=start_time,
            end_time=end_time,
            duration_ms=duration_ms,
        )
        if error_msg is not None:
            state[self.full_name, "error", context_id] = error_msg
serialize
serialize() -> dict

Serialize full graph to config dict for the Rust backend.

Note: the key "initial_ready_count" is kept as-is for Rust backend compatibility even though the internal Python attribute was renamed to _initial_ready during the scheduler rewrite.

Source code in operonx/core/ops/graph/graph_op.py
def serialize(self) -> dict:
    """Serialize full graph to config dict for the Rust backend.

    Note: the key ``"initial_ready_count"`` is kept as-is for Rust backend
    compatibility even though the internal Python attribute was renamed to
    ``_initial_ready`` during the scheduler rewrite.
    """
    base = super().serialize()
    base.update(
        {
            "ops": {name: op.serialize() for name, op in self._ops.items()},
            "edges": [
                {"from": src, "to": dst, "soft": edge.soft}
                for (src, dst), edge in self._edges.items()
            ],
            "entries": list(self.entries),
            "exits": list(self.exits),
            "initial_ready_count": dict(self._initial_ready),
            "compiled_adj": {
                op: [[link.dst, link.soft] for link in links] for op, links in self._adj.items()
            },
            "stream_initial_ready": self._stream_initial_ready,
            "loop_config": {
                "until": self._loop_config.until
                if isinstance(self._loop_config.until, str)
                else None,
                "max_iterations": self._loop_config.max_iterations,
            }
            if self._loop_config
            else None,
            "max_stream_concurrent": self.concurrency,
        }
    )
    return base
validate
validate() -> ValidationResult

Run all validations and return result.

Source code in operonx/core/ops/graph/graph_op.py
def validate(self) -> ValidationResult:
    """Run all validations and return result."""
    return validate_graph(
        self.name,
        self._ops,
        self._edges,
        self.prevs,
        self.nexts,
        self.entries,
        self.exits,
    )
show
show(indent=0)

Display graph structure (debug).

Source code in operonx/core/ops/graph/graph_op.py
def show(self, indent=0):
    """Display graph structure (debug)."""
    prefix = "  " * indent
    LOGGER.debug("%sGraph: %s", prefix, self.name)
    LOGGER.debug("%sOps: %s", prefix, list(self._ops.keys()))
    LOGGER.debug("%sEdges:", prefix)
    for edge in self._edges.values():
        soft_marker = " (soft)" if edge.soft else ""
        LOGGER.debug(
            "%s  %s -> %s: %s%s", prefix, edge.from_node, edge.to_node, edge.type, soft_marker
        )
    LOGGER.debug("%sReady count: %s", prefix, dict(self._initial_ready))

    for child in self._ops.values():
        if isinstance(child, GraphOp):
            child.show(indent + 1)

GraphValidationError

GraphValidationError(result: ValidationResult)

Bases: Exception

Exception raised when graph validation fails.

Source code in operonx/core/ops/graph/validation.py
def __init__(self, result: ValidationResult):
    self.result = result
    super().__init__(
        f"Graph '{result.graph_name}' validation failed with "
        f"{len(result.errors)} error(s). See logs above for details."
    )

ValidationIssue dataclass

ValidationIssue(
    level: ValidationLevel,
    category: str,
    message: str,
    op_name: Optional[str] = None,
    target_name: Optional[str] = None,
    available_nodes: List[str] = list(),
    suggestions: List[str] = list(),
)

A single validation issue found in the graph.

ValidationLevel

Bases: Enum

Severity level for validation issues.

ValidationResult dataclass

ValidationResult(graph_name: str, issues: List[ValidationIssue] = list())

Result of graph validation.

Functions
raise_if_errors
raise_if_errors()

Raise exception if there are any errors.

Source code in operonx/core/ops/graph/validation.py
def raise_if_errors(self):
    """Raise exception if there are any errors."""
    if self.has_errors:
        LOGGER.error(
            "Graph [highlight]%s[/highlight] validation found %d error(s):",
            self.graph_name,
            len(self.errors),
        )
        for issue in self.errors:
            LOGGER.error(
                "  [%s] %s: %s | Location: %s -> '%s' | Available nodes: %s",
                issue.level.value.upper(),
                issue.category,
                issue.message,
                issue.op_name,
                issue.target_name,
                issue.available_nodes,
            )
        raise GraphValidationError(self)

Op types

The base classes that compose into a workflow. Most users only touch GraphOp directly (via with GraphOp(...) as g:) — the others are constructed by decorators or factory helpers.

GraphOp

GraphOp(concurrency: int = 64, **kwargs)

Bases: BaseOp

Container op that holds and executes a directed graph of child ops.

Lifecycle::

1. DEFINE        with GraphOp(name="wf") as g:
                     a = double(x=PARENT["x"])
                     b = add(a=a["result"], b=PARENT["y"])
                     START >> a >> b >> END
                 Ops auto-register via context manager. Edges via >> operator.
                 Inputs/outputs auto-discovered from PARENT refs.

2. BUILD         g.build()  (or auto on first run)
                 _setup_schema    scan PARENT refs → graph inputs/outputs
                 _setup_endpoints find entry/exit ops from topology
                 _build()         adj list + ready counts + stream ready counts
                 validate         branch targets, cycles, reachability, refs

3. EXECUTE       g.run(state, context_id)  — async generator
                 → run_task_scheduler()  drives ops via Frame/EOF events
                 → yields (ctx, outputs) per batch or per stream frame
                 → loop iteration handled inside scheduler EOF handler

4. EXPORT        serialize()  config dict for Rust backend
                 validate()   graph structure validation
                 show()       debug display
Source code in operonx/core/ops/graph/graph_op.py
def __init__(self, concurrency: int = 64, **kwargs):
    super().__init__(**kwargs)
    self._token = None
    self._is_building = True
    self._ops: Dict[str, BaseOp] = {}
    self._edges = {}
    self.entries = []
    self.exits = []
    self.prevs = defaultdict(list)
    self.nexts = defaultdict(list)
    self.concurrency = concurrency
    self._loop_config = None
    self._shared_vars = {}  # {var_name: initial_value} — set by PARENT.shared()
    self._adj = {}  # {op_name: [Link(dst, soft), ...]}
    self._initial_ready = {}  # {op_name: ready_count}
    self._stream_initial_ready = {}  # {gen_name: {op_name: ready_count_for_stream_ctx}}
    self._scheduler = None  # set by build()
    self._out_vars: Dict[
        str, dict
    ] = {}  # {op_name: {src_var: dest_var}} — vars mapped to PARENT output

Functions

loop classmethod

loop(
    name: Optional[str] = None,
    until: Optional[Union[str, Callable]] = None,
    max_iterations: int = 100,
    **initial_state: Any,
)

Create a GraphOp configured for feedback-loop execution.

Each iteration re-runs the graph's scheduler, carrying forward outputs as the next iteration's inputs. Stops when until evaluates to True or max_iterations is reached.

Parameters:

Name Type Description Default
name Optional[str]

Graph name.

None
until Optional[Union[str, Callable]]

Stop condition — a string expression (evaluated against outputs) or a callable (outputs_dict) -> bool.

None
max_iterations int

Safety cap on iterations (default 100).

100
**initial_state Any

Initial values for loop variables, injected as inputs.

{}

Example::

with GraphOp.loop(name="counter", until="count >= 5", count=0) as g:
    inc = increment(counter=PARENT["count"])
    inc["counter"] >> PARENT["count"]
    START >> inc >> END
Source code in operonx/core/ops/graph/graph_op.py
@classmethod
def loop(
    cls,
    name: Optional[str] = None,
    until: Optional[Union[str, Callable]] = None,
    max_iterations: int = 100,
    **initial_state: Any,
):
    """Create a GraphOp configured for feedback-loop execution.

    Each iteration re-runs the graph's scheduler, carrying forward outputs
    as the next iteration's inputs. Stops when ``until`` evaluates to True
    or ``max_iterations`` is reached.

    Args:
        name: Graph name.
        until: Stop condition — a string expression (evaluated against outputs)
               or a callable ``(outputs_dict) -> bool``.
        max_iterations: Safety cap on iterations (default 100).
        **initial_state: Initial values for loop variables, injected as inputs.

    Example::

        with GraphOp.loop(name="counter", until="count >= 5", count=0) as g:
            inc = increment(counter=PARENT["count"])
            inc["counter"] >> PARENT["count"]
            START >> inc >> END
    """
    g = cls(name=name, inputs=initial_state or None)
    g._loop_config = LoopConfig(
        until=until,
        max_iterations=max_iterations,
    )
    return g

get_current_graph staticmethod

get_current_graph() -> Optional[GraphOp]

Return the current graph from context.

Source code in operonx/core/ops/graph/graph_op.py
@staticmethod
def get_current_graph() -> Optional["GraphOp"]:
    """Return the current graph from context."""
    try:
        return _current_graph.get()
    except LookupError:
        return None

add_op

add_op(op: BaseOp) -> BaseOp

Add an op to the graph.

Source code in operonx/core/ops/graph/graph_op.py
def add_op(self, op: BaseOp) -> BaseOp:
    """Add an op to the graph."""
    if not self._is_building:
        raise RuntimeError("Cannot add op after graph has been built")

    if getattr(op, "_is_operonx_builder", False):
        name = getattr(op, "_name", None) or type(op).__name__
        LOGGER.error(
            "%s '%s' is not built. Call .build() or .else_() before adding to graph.",
            type(op).__name__,
            name,
        )
        raise TypeError(
            f"{type(op).__name__} '{name}' is not built. "
            f"Call .build() or .else_() to create the op."
        )

    if op in [START, END]:
        return op

    if op.name in self._ops:
        LOGGER.warning(
            "Graph [highlight]%s[/highlight]: op [highlight]%s[/highlight] already exists and will be overwritten",
            self.name,
            op.name,
        )

    self._ops[op.name] = op

    if hasattr(op, "start") and op.start:
        if op.name not in self.entries:
            self.entries.append(op.name)

    if hasattr(op, "end") and op.end:
        if op.name not in self.exits:
            self.exits.append(op.name)

    return op

add_edge

add_edge(
    source: str, target: str, type: EdgeType = "normal", soft: bool = False
)

Add an edge between two ops.

Parameters:

Name Type Description Default
source str

Source op name.

required
target str

Target op name.

required
type EdgeType

Edge type (normal, lookback, condition).

'normal'
soft bool

If True, edge does not count toward ready_count. Used for branch outputs when only one branch executes.

False
Source code in operonx/core/ops/graph/graph_op.py
def add_edge(self, source: str, target: str, type: EdgeType = "normal", soft: bool = False):
    """Add an edge between two ops.

    Args:
        source: Source op name.
        target: Target op name.
        type: Edge type (normal, lookback, condition).
        soft: If True, edge does not count toward ready_count.
              Used for branch outputs when only one branch executes.
    """
    if not self._is_building:
        raise RuntimeError("Cannot add edge after graph has been built!")

    if source == START.name:
        if target not in self._ops:
            raise ValueError(f"Target op '{target}' not found")

        target_node = self._ops[target]
        target_node.start = True

        if target not in self.entries:
            self.entries.append(target)

        return

    if target == END.name:
        if source not in self._ops:
            raise ValueError(f"Source op '{source}' not found")

        source_node = self._ops[source]
        source_node.end = True

        if source not in self.exits:
            self.exits.append(source)

        return

    if target == PARENT.name:
        return

    if source not in self._ops:
        raise ValueError(f"Source op '{source}' not found")
    if target not in self._ops:
        raise ValueError(f"Target op '{target}' not found")

    new_edge = EdgeConfig(from_node=source, to_node=target, type=type, soft=soft)
    if (source, target) not in self._edges:
        self._edges[source, target] = new_edge
        self.nexts[source].append(target)
        self.prevs[target].append(source)

build

build()

Build graph: children first, then schema → endpoints → topology → validation.

Source code in operonx/core/ops/graph/graph_op.py
def build(self):
    """Build graph: children first, then schema → endpoints → topology → validation."""
    for child in self._ops.values():
        if hasattr(child, "build"):
            child.build()

    self._setup_schema()
    self._setup_endpoints()

    result = self.validate()
    result.raise_if_errors()

    self._build()

    self._scheduler = Scheduler(self)
    self._is_building = False
    self._cache_full_names()

    # Auto-detect bound from children, with user override.
    # If user explicitly set bound on the graph, respect it.
    # Otherwise: all children sync → graph is sync (inline); any io/cpu → task.
    if self.bound is None:
        if all(getattr(op, "bound", None) == "sync" for op in self._ops.values()):
            self.bound = "sync"
        else:
            self.bound = "io"

run async

run(
    state: MemoryState, context_id: Optional[tuple] = None
) -> AsyncGenerator[Tuple[tuple, Dict[str, Any]], None]

Execute graph: get inputs → schedule ops → loop if needed → store results.

Source code in operonx/core/ops/graph/graph_op.py
async def run(
    self,
    state: "MemoryState",
    context_id: Optional[tuple] = None,
) -> AsyncGenerator[Tuple[tuple, Dict[str, Any]], None]:
    """Execute graph: get inputs → schedule ops → loop if needed → store results."""

    if context_id is None:
        context_id = DEFAULT_CONTEXT

    request_id = state.request_id
    start_time = datetime.now(timezone.utc)
    perf_start = perf_counter()
    _inputs = {}
    _outputs = {}
    error_msg = None

    try:
        _inputs = self.get_inputs(state, context_id=context_id)

        if self._is_building:
            self.build()

        _outputs, stream_ctxs = await self._scheduler.run(state, context_id)

        _has_generators = any(op.is_gen for op in self._ops.values())
        if not stream_ctxs and not _has_generators:
            self.store_result(state, _outputs, context_id)
            yield context_id, _outputs
        else:
            for sctx in stream_ctxs:
                item = self.get_outputs(state, context_id=sctx)
                if any(v is not None for v in item.values()):
                    self.store_result(state, item, sctx)
                    yield sctx, item

    except Exception:
        import sys

        error_msg = (
            traceback.format_exc()
            if LOGGER.isEnabledFor(40)
            else f"{type(sys.exc_info()[1]).__name__}: {sys.exc_info()[1]}"
        )
        LOGGER.error(
            "[title]\\[%s][/title] Error in op [highlight]%s[/highlight]:\n%s",
            request_id,
            self.name,
            error_msg.rstrip(),
        )

    finally:
        end_time = datetime.now(timezone.utc)
        duration_ms = (perf_counter() - perf_start) * 1000
        self._log(request_id, context_id, _inputs, _outputs, duration_ms)
        self._store_metrics(
            state,
            context_id,
            start_time=start_time,
            end_time=end_time,
            duration_ms=duration_ms,
        )
        if error_msg is not None:
            state[self.full_name, "error", context_id] = error_msg

serialize

serialize() -> dict

Serialize full graph to config dict for the Rust backend.

Note: the key "initial_ready_count" is kept as-is for Rust backend compatibility even though the internal Python attribute was renamed to _initial_ready during the scheduler rewrite.

Source code in operonx/core/ops/graph/graph_op.py
def serialize(self) -> dict:
    """Serialize full graph to config dict for the Rust backend.

    Note: the key ``"initial_ready_count"`` is kept as-is for Rust backend
    compatibility even though the internal Python attribute was renamed to
    ``_initial_ready`` during the scheduler rewrite.
    """
    base = super().serialize()
    base.update(
        {
            "ops": {name: op.serialize() for name, op in self._ops.items()},
            "edges": [
                {"from": src, "to": dst, "soft": edge.soft}
                for (src, dst), edge in self._edges.items()
            ],
            "entries": list(self.entries),
            "exits": list(self.exits),
            "initial_ready_count": dict(self._initial_ready),
            "compiled_adj": {
                op: [[link.dst, link.soft] for link in links] for op, links in self._adj.items()
            },
            "stream_initial_ready": self._stream_initial_ready,
            "loop_config": {
                "until": self._loop_config.until
                if isinstance(self._loop_config.until, str)
                else None,
                "max_iterations": self._loop_config.max_iterations,
            }
            if self._loop_config
            else None,
            "max_stream_concurrent": self.concurrency,
        }
    )
    return base

validate

validate() -> ValidationResult

Run all validations and return result.

Source code in operonx/core/ops/graph/graph_op.py
def validate(self) -> ValidationResult:
    """Run all validations and return result."""
    return validate_graph(
        self.name,
        self._ops,
        self._edges,
        self.prevs,
        self.nexts,
        self.entries,
        self.exits,
    )

show

show(indent=0)

Display graph structure (debug).

Source code in operonx/core/ops/graph/graph_op.py
def show(self, indent=0):
    """Display graph structure (debug)."""
    prefix = "  " * indent
    LOGGER.debug("%sGraph: %s", prefix, self.name)
    LOGGER.debug("%sOps: %s", prefix, list(self._ops.keys()))
    LOGGER.debug("%sEdges:", prefix)
    for edge in self._edges.values():
        soft_marker = " (soft)" if edge.soft else ""
        LOGGER.debug(
            "%s  %s -> %s: %s%s", prefix, edge.from_node, edge.to_node, edge.type, soft_marker
        )
    LOGGER.debug("%sReady count: %s", prefix, dict(self._initial_ready))

    for child in self._ops.values():
        if isinstance(child, GraphOp):
            child.show(indent + 1)

BranchOp

BranchOp(
    cases: Optional[List[Tuple[Ref, str]]] = None,
    candidates: Optional[List[str]] = None,
    default: Optional[str] = None,
    inputs: Dict[str, Any] = None,
    outputs: Dict[str, Any] = None,
    **kwargs,
)

Bases: BaseOp

Op that evaluates conditions and routes execution to different targets.

Conditions are Ref objects with comparison operators. The first matching condition determines the target. An optional anchor input overrides all conditions. Use soft edges (>>~) to connect branch targets to a merge op.

Inputs

anchor (str, optional): Hard-coded target name that overrides conditions. (any): Variables referenced in condition Refs (auto-extracted).

Outputs

target (str): Name of the selected target op. matched (str): Description of which condition matched.

Example::

router = if_(PARENT["score"] >= 90, "excellent").else_("fail")
START >> router >> ~excellent >> merge >> END
router >> ~fail >> merge
Source code in operonx/core/ops/flow/branch_op.py
def __init__(
    self,
    cases: Optional[List[Tuple[Ref, str]]] = None,
    candidates: Optional[List[str]] = None,
    default: Optional[str] = None,
    inputs: Dict[str, Any] = None,
    outputs: Dict[str, Any] = None,
    **kwargs,
):
    # Parse inputs/outputs from cases
    parsed_inputs, parsed_outputs = self._parse_cases(cases or [])

    # Call super().__init__ without inputs/outputs
    super().__init__(**kwargs)

    # Merge parsed schema with user-provided
    self._init_io(parsed_inputs, parsed_outputs, inputs, outputs)

    self.default = default.name if isinstance(default, BaseOp) else default
    self.given_candidates = candidates
    self.cases = cases or []
    self._case_descriptions = [ref.describe() for ref, _ in self.cases]

    self._set_core(self._create_core_function())

Attributes

candidates property

candidates: List[str]

List of possible target op names.

specific_metadata property

specific_metadata: Dict[str, Any]

Return subclass-specific metadata.

Functions

get_target

get_target(
    state: MemoryState, context_id: Optional[str] = None
) -> Optional[str]

Get the routed target from state.

Source code in operonx/core/ops/flow/branch_op.py
def get_target(self, state: "MemoryState", context_id: Optional[str] = None) -> Optional[str]:
    """Get the routed target from state."""
    return state[self.full_name, "target", context_id]

serialize

serialize() -> dict

Serialize branch op with conditions for Rust backend.

Source code in operonx/core/ops/flow/branch_op.py
def serialize(self) -> dict:
    """Serialize branch op with conditions for Rust backend."""
    base = super().serialize()
    base.update(
        {
            "cases": [
                {"condition": ref.serialize(), "target": target} for ref, target in self.cases
            ],
            "default": self.default,
            "candidates": self.given_candidates,
        }
    )
    return base

FuncOp

FuncOp(
    code_fn: Optional[Callable] = None,
    return_keys: Optional[List[str]] = None,
    inputs: Dict[str, Any] = None,
    outputs: Dict[str, Any] = None,
    _mappings: Dict[str, Any] = None,
    **kwargs,
)

Bases: BaseOp

Op that executes a Python function.

Inputs and outputs are auto-extracted from the function's signature and return-statement AST. Both sync and async functions are supported. Prefer the @op decorator over instantiating FuncOp directly.

Inputs

Auto-parsed from the function's parameter list.

Outputs

Auto-parsed from return {"key": ...} via AST, or from explicit return_keys.

Example::

@op
def add(a: int, b: int):
    return {"sum": a + b}

with GraphOp(name="main") as graph:
    result = add(a=PARENT["x"], b=PARENT["y"])
    START >> result >> END
Source code in operonx/core/ops/transform/func_op.py
def __init__(
    self,
    code_fn: Optional[Callable] = None,
    return_keys: Optional[List[str]] = None,
    inputs: Dict[str, Any] = None,
    outputs: Dict[str, Any] = None,
    _mappings: Dict[str, Any] = None,
    **kwargs,
):
    # Parse inputs/outputs từ function signature/AST
    parsed_inputs, parsed_outputs = self._parse_function(code_fn, return_keys)

    # Split _mappings into inputs/outputs using parsed schema
    if _mappings:
        for key, value in _mappings.items():
            if key in parsed_inputs:
                if inputs is None:
                    inputs = {}
                inputs[key] = value
            elif key in parsed_outputs:
                if outputs is None:
                    outputs = {}
                outputs[key] = value
            else:
                raise TypeError(
                    f"'{key}' is not a known input or output of {code_fn.__name__}(). "
                    f"Inputs: {set(parsed_inputs)}, Outputs: {set(parsed_outputs)}"
                )

    # Resolve cache from @op(cache=...) if not set in kwargs
    if "cache" not in kwargs:
        fn_cache = getattr(code_fn, "_op_cache", None)
        if fn_cache is not None:
            kwargs["cache"] = fn_cache

    # Gọi super().__init__ không truyền inputs/outputs
    super().__init__(**kwargs)

    # Merge parsed schema with user-provided (handles {"*": PARENT} wildcard)
    self._init_io(parsed_inputs, parsed_outputs, inputs, outputs)

    self.code_fn = code_fn
    self._set_core(code_fn)

    # Lấy source code
    try:
        self.source = inspect.getsource(code_fn) if code_fn else ""
    except:
        self.source = str(code_fn) if code_fn else ""

    # Set description từ docstring nếu chưa có
    if not self.description and code_fn and code_fn.__doc__:
        self.description = code_fn.__doc__.strip().split("\n")[0]

Attributes

specific_metadata property

specific_metadata: Dict[str, Any]

Trả về metadata riêng của subclass.

Functions

run async

run(
    state: MemoryState, context_id: Optional[str] = None
) -> AsyncGenerator[Tuple[Optional[str], Dict[str, Any]], None]

Execute FuncOp with CodeError wrapping.

Delegates to BaseOp.run() (async generator) and re-raises any exception as a CodeError with full op context attached.

Source code in operonx/core/ops/transform/func_op.py
async def run(
    self,
    state: "MemoryState",
    context_id: Optional[str] = None,
) -> AsyncGenerator[Tuple[Optional[str], Dict[str, Any]], None]:
    """Execute FuncOp with CodeError wrapping.

    Delegates to ``BaseOp.run()`` (async generator) and re-raises any
    exception as a ``CodeError`` with full op context attached.
    """
    try:
        async for ctx, result in super().run(state, context_id):
            yield ctx, result
    except CodeError:
        raise  # Đã wrapped, không wrap lại
    except Exception as e:
        # Lấy inputs để có context cho error
        _inputs = self.get_inputs(state, context_id)
        raise CodeError(
            message=f"Function '{self.code_fn.__name__ if self.code_fn else 'unknown'}' raised an exception",
            function_name=self.code_fn.__name__ if self.code_fn else "unknown",
            source=self.source,
            inputs=_inputs,
            original_error=e,
        ) from e

ParserOp

ParserOp(
    format: ParserType = "xml",
    extract: Optional[List[str]] = None,
    inputs: Dict[str, Any] = None,
    outputs: Dict[str, Any] = None,
    **kwargs,
)

Bases: BaseOp

Op that parses text into structured data.

Supports multiple formats (JSON, XML, YAML) and extracts fields using dot-separated chain paths (e.g. "user.address.city: str"). Commonly used as the final stage inside a ChainOp pipeline.

Inputs

text (str): Raw text to parse (e.g. LLM output).

Outputs

Dynamically generated from the extract list — one output key per extracted field.

Example::

parser = ParserOp(
    format="json",
    extract=["user.name: str", "user.age: int"],
    inputs={"text": llm["content"]},
)
Source code in operonx/core/ops/transform/parser_op.py
def __init__(
    self,
    format: ParserType = "xml",
    extract: Optional[List[str]] = None,
    inputs: Dict[str, Any] = None,
    outputs: Dict[str, Any] = None,
    **kwargs,
):
    if not extract:
        raise TypeError("extract là bắt buộc")

    # Parse schema thành format có cấu trúc
    extract_fields = [ExtractField.from_string(schema_str) for schema_str in extract]

    # Parse inputs/outputs từ extract
    parsed_inputs = {
        "text": Param(type=str, required=True),
        "validators": Param(type=dict, required=False),
    }
    parsed_outputs = {field.output_key: Param() for field in extract_fields}

    # Gọi super().__init__ không truyền inputs/outputs
    super().__init__(**kwargs)

    # Merge parsed schema with user-provided
    self._init_io(parsed_inputs, parsed_outputs, inputs, outputs)

    self.format = format or "xml"
    self.extract = extract
    self.extract_fields = extract_fields

    # Serialize parser config as literal inputs so Rust can read them
    mode_param = Param(type=str, required=False)
    mode_param.value = self.format
    self.inputs["mode"] = mode_param

    schema_param = Param(type=list, required=False)
    schema_param.value = self.extract
    self.inputs["schema"] = schema_param

    self.backend = self._create_parser()
    self._set_core(self._process)

Attributes

specific_metadata property

specific_metadata: Dict[str, Any]

Trả về metadata riêng của subclass.

Branch helpers

Branch

Branch(name: Optional[str] = None, **kwargs)

Fluent builder for creating a BranchOp.

Example::

router = (if_(PARENT["score"] >= 90, "excellent")
          .if_(PARENT["score"] >= 70, "good")
          .else_("fail"))

Initialise the builder.

Parameters:

Name Type Description Default
name Optional[str]

Op name. If None, auto-inferred from the variable name.

None
Source code in operonx/core/ops/flow/branch_op.py
def __init__(self, name: Optional[str] = None, **kwargs):
    """Initialise the builder.

    Args:
        name: Op name. If None, auto-inferred from the variable name.
    """
    self._name = name
    self._cases: List[Tuple[Ref, str]] = []
    self._default: Optional[str] = None
    self._inputs: Dict[str, Any] = {}
    self._kwargs = kwargs

Functions

if_

if_(condition: Ref, target: Union[str, BaseOp]) -> Branch

Add a condition–target case.

Parameters:

Name Type Description Default
condition Ref

Ref with comparison (e.g., PARENT["score"] >= 90).

required
target Union[str, BaseOp]

Target op or op name.

required

Returns:

Type Description
Branch

self for chaining.

Source code in operonx/core/ops/flow/branch_op.py
def if_(self, condition: Ref, target: Union[str, BaseOp]) -> "Branch":
    """Add a condition–target case.

    Args:
        condition: Ref with comparison (e.g., ``PARENT["score"] >= 90``).
        target: Target op or op name.

    Returns:
        self for chaining.
    """
    target_name = target.name if hasattr(target, "name") else target
    self._cases.append((condition, target_name))
    return self

else_

else_(target: Union[str, BaseOp]) -> BranchOp

Set default target and build the BranchOp.

Parameters:

Name Type Description Default
target Union[str, BaseOp]

Fallback target when no condition matches.

required

Returns:

Type Description
BranchOp

The constructed BranchOp.

Source code in operonx/core/ops/flow/branch_op.py
@register_skip
def else_(self, target: Union[str, BaseOp]) -> "BranchOp":
    """Set default target and build the BranchOp.

    Args:
        target: Fallback target when no condition matches.

    Returns:
        The constructed BranchOp.
    """
    self._default = target.name if hasattr(target, "name") else target
    return self._build()

build

build() -> BranchOp

Build the BranchOp without a default target.

Returns:

Type Description
BranchOp

The constructed BranchOp.

Source code in operonx/core/ops/flow/branch_op.py
@register_skip
def build(self) -> "BranchOp":
    """Build the BranchOp without a default target.

    Returns:
        The constructed BranchOp.
    """
    return self._build()

if_

if_(condition: Ref, target: Union[str, BaseOp]) -> Branch

Start a branch declaration with the first condition.

Example::

router = if_(PARENT["score"] >= 90, "excellent").else_("fail")
Source code in operonx/core/ops/flow/branch_op.py
def if_(condition: Ref, target: Union[str, BaseOp]) -> Branch:
    """Start a branch declaration with the first condition.

    Example::

        router = if_(PARENT["score"] >= 90, "excellent").else_("fail")
    """
    return Branch().if_(condition, target)

State markers

Constants used inside with GraphOp(...) blocks to wire edges and references. None of these are real instances you'd construct — they're sentinels the graph builder recognises.

Marker Meaning
START Entry node. Every graph's first hard edge goes from START.
END Exit node. op >> END auto-forwards op's outputs as the graph result.
PARENT Reference root for inputs from engine.run(inputs={...}) or the parent graph in nested contexts. Used as PARENT["key"].
PENDING Sentinel returned by ops that absorb input without producing output.

Middleware

Hook into engine lifecycle events — see Tracing for built-in tracers and middleware patterns.

Middleware

Base class for engine middleware.

Subclass and override any of the hooks to add behavior. All hooks are async and called in order (before_run) or reverse order (after_run, on_error).

Functions

before_run async

before_run(
    graph: GraphOp, inputs: Dict[str, Any], context: Dict[str, Any]
) -> Dict[str, Any]

Called before graph execution. Can modify inputs.

Parameters:

Name Type Description Default
graph GraphOp

The GraphOp being executed

required
inputs Dict[str, Any]

The input dict (modify and return)

required
context Dict[str, Any]

Execution context (user_id, session_id, request_id, etc.)

required

Returns:

Type Description
Dict[str, Any]

The (possibly modified) inputs dict.

Source code in operonx/core/middleware.py
async def before_run(
    self, graph: GraphOp, inputs: Dict[str, Any], context: Dict[str, Any]
) -> Dict[str, Any]:
    """Called before graph execution. Can modify inputs.

    Args:
        graph: The GraphOp being executed
        inputs: The input dict (modify and return)
        context: Execution context (user_id, session_id, request_id, etc.)

    Returns:
        The (possibly modified) inputs dict.
    """
    return inputs

after_run async

after_run(
    graph: GraphOp,
    inputs: Dict[str, Any],
    result: Dict[str, Any],
    context: Dict[str, Any],
) -> Dict[str, Any]

Called after graph execution. Can modify result.

Parameters:

Name Type Description Default
graph GraphOp

The GraphOp that was executed

required
inputs Dict[str, Any]

The original inputs

required
result Dict[str, Any]

The result dict (modify and return)

required
context Dict[str, Any]

Execution context

required

Returns:

Type Description
Dict[str, Any]

The (possibly modified) result dict.

Source code in operonx/core/middleware.py
async def after_run(
    self,
    graph: GraphOp,
    inputs: Dict[str, Any],
    result: Dict[str, Any],
    context: Dict[str, Any],
) -> Dict[str, Any]:
    """Called after graph execution. Can modify result.

    Args:
        graph: The GraphOp that was executed
        inputs: The original inputs
        result: The result dict (modify and return)
        context: Execution context

    Returns:
        The (possibly modified) result dict.
    """
    return result

on_error async

on_error(
    graph: GraphOp,
    inputs: Dict[str, Any],
    error: Exception,
    context: Dict[str, Any],
) -> None

Called when graph execution fails.

Default behavior re-raises the error. Override to add logging, alerting, or error transformation.

Parameters:

Name Type Description Default
graph GraphOp

The GraphOp that failed

required
inputs Dict[str, Any]

The original inputs

required
error Exception

The exception that occurred

required
context Dict[str, Any]

Execution context

required
Source code in operonx/core/middleware.py
async def on_error(
    self, graph: GraphOp, inputs: Dict[str, Any], error: Exception, context: Dict[str, Any]
) -> None:
    """Called when graph execution fails.

    Default behavior re-raises the error. Override to add
    logging, alerting, or error transformation.

    Args:
        graph: The GraphOp that failed
        inputs: The original inputs
        error: The exception that occurred
        context: Execution context
    """
    raise error

Top-level convenience

bootstrap

bootstrap(
    *, resources: Optional[Union[str, Path]] = None, env: bool = True
) -> Optional[ResourceHub]

One-line setup for .env and :class:ResourceHub.

  • When env is True (default), load ./.env from CWD using python-dotenv (non-override; existing env wins). The path is recorded in BOOTSTRAP_ENV_PATHS for later diagnostic messages.
  • When resources is a path, install the hub via :meth:ResourceHub.from_yaml.
  • When resources is None, call :meth:ResourceHub.auto — which checks ./resources.yaml and warns on miss.
  • Idempotent: if a hub is already installed, return it unchanged.

Returns the installed hub, or None if no resources.yaml was found and none was provided. Pure-compute graphs that don't need a hub can ignore the return value.

Source code in operonx/__init__.py
def bootstrap(
    *,
    resources: Optional[Union[str, Path]] = None,
    env: bool = True,
) -> Optional[ResourceHub]:
    """One-line setup for ``.env`` and :class:`ResourceHub`.

    - When ``env`` is ``True`` (default), load ``./.env`` from CWD using
      ``python-dotenv`` (non-override; existing env wins). The path is
      recorded in ``BOOTSTRAP_ENV_PATHS`` for later diagnostic messages.
    - When ``resources`` is a path, install the hub via
      :meth:`ResourceHub.from_yaml`.
    - When ``resources`` is ``None``, call :meth:`ResourceHub.auto` —
      which checks ``./resources.yaml`` and warns on miss.
    - Idempotent: if a hub is already installed, return it unchanged.

    Returns the installed hub, or ``None`` if no ``resources.yaml`` was
    found and none was provided. Pure-compute graphs that don't need a
    hub can ignore the return value.
    """
    if env:
        _load_env_into_bootstrap()

    if ResourceHub._instance is not None:
        return ResourceHub._instance

    if resources is not None:
        hub = ResourceHub.from_yaml(resources)
        ResourceHub.set_instance(hub)
        return hub

    return ResourceHub.auto()

Provider-neutral types

The v0.7 LLMOp converter layer will translate provider-specific types to/from these at the provider boundary.

ChatMessage

Bases: TypedDict

A single message in a chat conversation.

Provider-neutral shape; backends translate to / from this at the LLMOp boundary.

Required fields

role: One of "system", "user", "assistant", "tool". content: The message body. str for plain text; providers may accept richer structured shapes (tool calls, multi-modal parts) via opt-in fields below.

Optional fields

name: Speaker identifier (for tool replies and named system prompts). tool_call_id: When role == "tool", the id of the tool call this message responds to. tool_calls: When role == "assistant", the list of tool calls the model is requesting. Shape is provider-specific; converter layers normalise this in v0.7.

ChatRole module-attribute

ChatRole = Literal['system', 'user', 'assistant', 'tool']