Skip to content

operonx.core.ops

Op base classes, flow primitives, and graph composition.

ops

Core op types and markers for the Operon workflow engine.

  • BaseOp — base class for all workflow ops
  • DummyOp — placeholder for START/END markers
  • GraphOp — container managing a sub-graph with parallel execution
  • BranchOp — conditional routing with precompiled conditions
  • FuncOp — wraps a Python function (supports generators for streaming)
  • ParserOp — extracts structured data from text (XML/JSON/etc.)

Markers: START, END, PARENT, PENDING. Decorators: op, graph, if_.

Attributes

OpType module-attribute

OpType = Literal[
    "data",
    "llm",
    "embedding",
    "rerank",
    "branch",
    "for",
    "while",
    "stream",
    "code",
    "lambda",
    "parser",
    "prompt",
    "doc-processor",
    "milvus",
    "mongo",
    "s3",
    "graph",
    "default",
    "dummy",
    "tool-executor",
    "mcp",
]

Các loại node được hỗ trợ trong workflow graph.

Classes

EOF dataclass

EOF(op: str, ctx: tuple)

Marker that an op's async generator has exhausted.

Created by Scheduler._pump() after op.run() stops yielding (i.e. the underlying function returned or its generator was exhausted). User code never yields EOF — it is emitted implicitly when the op finishes.

Frame dataclass

Frame(op: str, ctx: tuple, result: dict)

One result yielded by an op during execution.

Created by Scheduler._pump() for every (ctx, result) tuple that op.run() yields. User code never constructs Frame directly — just return or yield from an @op function.

Interrupt dataclass

Interrupt(
    op: str = "", ctx: tuple = (), ctx_to_cancel: tuple = (), reason: str = ""
)

In-band scheduler cancellation event.

Returned/yielded by user op bodies to cancel queued frames + in-flight tasks at ctx_to_cancel (and its descendants). op and ctx record the emitter for tracing; ctx_to_cancel is the explicit target — typically the prior turn's ctx, stored in SCRATCH when long-running work began.

The scheduler
  1. Drops Frame/EOF items at ctx_to_cancel from the queue.
  2. Cancels in-flight _pump tasks at ctx_to_cancel and descendants (skipping the emitter to avoid self-cancel).
  3. Clears bookkeeping (ready/seq_active/seq_origins/collect_bufs).
  4. Forwards a synthetic ("__interrupt__", emitter_ctx, {...}) tuple to output_queue so ExecutionHandle consumers see it.

Best-effort: data already pushed to consumer-owned queues (e.g. a user-supplied asyncio.Queue) is NOT drained — consumer must handle that itself (see plan §4.6a).

BaseOp

BaseOp(
    id: str = None,
    name: str = None,
    description: str = "",
    inputs: Dict[str, Any] = None,
    outputs: Dict[str, Any] = None,
    sources: List[str] = None,
    targets: List[str] = None,
    stream: bool = False,
    start: bool = False,
    end: bool = False,
    contain_generation: bool = False,
    verbose: bool = True,
    enabled: bool = True,
    executor: Optional[str] = None,
    bound: Optional[str] = None,
    cache: Union[bool, str, None] = None,
    delay: float = 0,
)

Bases: ABC

Base class for all ops in a workflow.

An op is the fundamental processing unit. Each op declares typed inputs and outputs (via Param), and implements a core() method that contains the execution logic. Ops are wired together inside a GraphOp using edge operators.

Sections (read top-to-bottom)::

1. INIT            __init__, __slots__, param helpers
2. EDGE OPERATORS  >>, >>~, >, <, [], ~ — wiring ops in a graph
3. EXECUTE         run(), get_inputs/outputs, store_result, _exec_core
4. OBSERVABILITY   _log(), _store_metrics()
5. SERIALIZATION   serialize(), metadata — for Rust backend & tracing

Example::

from operonx.core import GraphOp, op, START, END, PARENT

@op
def double(x: int):
    return {"result": x * 2}

with GraphOp(name="main") as graph:
    d = double(x=PARENT["x"])
    START >> d >> END
Source code in operonx/core/ops/base.py
def __init__(
    self,
    id: str = None,
    name: str = None,
    description: str = "",
    inputs: Dict[str, Any] = None,
    outputs: Dict[str, Any] = None,
    sources: List[str] = None,
    targets: List[str] = None,
    stream: bool = False,
    start: bool = False,
    end: bool = False,
    contain_generation: bool = False,
    verbose: bool = True,
    enabled: bool = True,
    executor: Optional[str] = None,
    bound: Optional[str] = None,
    cache: Union[bool, str, None] = None,
    delay: float = 0,
):
    # Backward compat: executor="thread" → bound="cpu"
    if executor == "thread" and bound is None:
        bound = "cpu"
    if bound not in self._VALID_BOUNDS:
        raise ValueError(
            f"bound must be 'sync', 'io', 'cpu', or None (auto-detect), got {bound!r}"
        )
    self.bound = bound  # resolved to concrete value by _set_core()
    self._input_cache = None  # built on first get_inputs() call
    self._metrics_idx = None  # (schema, st_idx, et_idx, dur_idx)
    self._error_idx = None  # (schema, err_idx)
    self.cache = cache
    self.delay = delay
    self.id = id or uuid.uuid4().hex
    if name is None:
        name = auto_name()
    self.name = name or unique_name()
    self._full_name = None  # Cached at build time by GraphOp.build()
    self.description = description

    self.stream = stream
    self.start = start
    self.end = end
    self.verbose = verbose
    self.enabled = enabled

    self.sources: List[str] = sources or []
    self.targets: List[str] = targets or []

    self.core: Optional[Callable] = None
    self.is_gen: bool = False
    self.contain_generation = contain_generation
    # Đăng ký vào graph cha
    self.parent = get_current()
    # Use getattr to avoid hasattr's double lookup
    add_op = getattr(self.parent, "add_op", None)
    if add_op is not None:
        add_op(self)

    # Validate op name
    if self.name and not self.name.replace("_", "").replace("-", "").isalnum():
        raise ValueError(
            f"Op name '{self.name}' may only contain alphanumeric characters, underscores, and hyphens"
        )

    # Normalize inputs/outputs to Dict[str, Param]
    self.inputs: Dict[str, Param] = self._normalize_params(inputs)
    self.outputs: Dict[str, Param] = self._normalize_params(outputs)

    # Error if there are overlapping keys between inputs and outputs
    if self.inputs and self.outputs:
        overlapping_keys = set(self.inputs.keys()) & set(self.outputs.keys())
        if overlapping_keys:
            raise ValueError(
                f"Op '{self.name}' has overlapping input/output keys: {overlapping_keys}. "
                "Input and output variable names must be different."
            )
Attributes
full_name property
full_name: str

Fully-qualified hierarchical path of this op. Cached after build().

specific_metadata property
specific_metadata: Dict[str, Any]

Return subclass-specific metadata. Override in subclasses.

metadata property
metadata: Dict[str, Any]

Build a metadata dictionary for this op.

Functions
warmup
warmup() -> None

Called by Operon engine after graph.build() when a ResourceHub is available.

Override in provider ops (LLMOp, EmbeddingOp, etc.) to eagerly initialize backends and eliminate cold-start latency on the first user request.

The default implementation is a no-op — subclasses opt in by overriding.

Source code in operonx/core/ops/base.py
def warmup(self) -> None:
    """Called by Operon engine after graph.build() when a ResourceHub is available.

    Override in provider ops (LLMOp, EmbeddingOp, etc.) to eagerly initialize
    backends and eliminate cold-start latency on the first user request.

    The default implementation is a no-op — subclasses opt in by overriding.
    """
get_inputs
get_inputs(state: MemoryState, context_id: str) -> Dict[str, Any]

Retrieve input values from state based on connection mappings.

Uses cached cell indices to avoid per-call schema.get_index() lookups. Falls back to standard path on first call to build the cache.

Source code in operonx/core/ops/base.py
def get_inputs(self, state: "MemoryState", context_id: str) -> Dict[str, Any]:
    """Retrieve input values from state based on connection mappings.

    Uses cached cell indices to avoid per-call schema.get_index() lookups.
    Falls back to standard path on first call to build the cache.
    """
    # Fast path: use cached indices to skip schema.get_index() dict lookup.
    # Cache is keyed by schema id since indices differ per schema.
    cached = self._input_cache
    if cached is not None and cached[0] is state.schema:
        result = {}
        cells = state._cells
        pull_refs = state.schema._pull_refs
        for var_name, idx, fallback in cached[1]:
            # Inline the hot path of state.__getitem__ without _unpack_key + get_index
            cell = cells[idx]
            if cell.is_shared or context_id in cell:
                value = cell[context_id]
            else:
                pull_ref = pull_refs[idx]
                if pull_ref and not pull_ref.is_output and pull_ref.idx >= 0:
                    source_val = cells[pull_ref.idx][context_id]
                    if source_val is not None or cells[pull_ref.idx].default_value is not None:
                        value = pull_ref._fn(source_val)
                        cell[context_id] = value
                    else:
                        value = cell.default_value
                else:
                    value = cell[context_id]  # hierarchy walk
            if value is not None:
                result[var_name] = value
            elif fallback is not None:
                result[var_name] = fallback
        for var_name in list(result):
            val = result[var_name]
            if isinstance(val, ScratchRef):
                result[var_name] = state._scratch.get(val.key)
        return result

    # First call (or schema changed): build index cache
    entries = []
    result = {}
    full_name = self.full_name
    for var_name, param in self.inputs.items():
        idx = state.schema.get_index(full_name, var_name)
        fallback = None
        if param.value is not None and not isinstance(param.value, Ref):
            fallback = param.value
        elif param.default is not None:
            fallback = param.default
        entries.append((var_name, idx, fallback))

        value = state[full_name, var_name, context_id]
        if value is not None:
            result[var_name] = value
        elif fallback is not None:
            result[var_name] = fallback
    self._input_cache = (state.schema, entries)
    for var_name in list(result):
        val = result[var_name]
        if isinstance(val, ScratchRef):
            result[var_name] = state._scratch.get(val.key)
    _unwrap_media_in_place(result)
    return result
get_outputs
get_outputs(state: MemoryState, context_id: str) -> Dict[str, Any]

Read output values from state.

Reads directly from this op's output variables. Output connections (outputs={...}) are resolved by the schema at build time — they create refs at the destination, not at this op.

Parameters:

Name Type Description Default
state MemoryState

Workflow state.

required
context_id str

Context of this op.

required
Source code in operonx/core/ops/base.py
def get_outputs(self, state: "MemoryState", context_id: str) -> Dict[str, Any]:
    """Read output values from state.

    Reads directly from this op's output variables. Output connections
    (outputs={...}) are resolved by the schema at build time —
    they create refs at the destination, not at this op.

    Args:
        state: Workflow state.
        context_id: Context of this op.
    """
    return {var: state[self.full_name, var, context_id] for var in self.outputs}
normalize_trace_io
normalize_trace_io(inputs: Dict[str, Any], outputs: Dict[str, Any]) -> tuple

Produce a trace-time view of this op's I/O.

Called by _extract_trace_io before media extraction. Subclasses override when their I/O carries media in a non-Media shape (e.g. LLMOp wraps OpenAI chat-format image_url blocks into Media instances). The real state value is untouched — this returns copies used only for trace capture.

Default is identity: most ops never override.

Source code in operonx/core/ops/base.py
def normalize_trace_io(self, inputs: Dict[str, Any], outputs: Dict[str, Any]) -> tuple:
    """Produce a trace-time view of this op's I/O.

    Called by ``_extract_trace_io`` before media extraction. Subclasses
    override when their I/O carries media in a non-``Media`` shape (e.g.
    LLMOp wraps OpenAI chat-format ``image_url`` blocks into ``Media``
    instances). The real state value is untouched — this returns copies
    used only for trace capture.

    Default is identity: most ops never override.
    """
    return inputs, outputs
store_result
store_result(
    state: MemoryState, result: Dict[str, Any], context_id: str
) -> None

Store result dict into state.

Uses state[op, var, ctx] = value for O(1) index-based storage. Extracts $tags special key for dynamic tagging.

Source code in operonx/core/ops/base.py
def store_result(self, state: "MemoryState", result: Dict[str, Any], context_id: str) -> None:
    """Store result dict into state.

    Uses state[op, var, ctx] = value for O(1) index-based storage.
    Extracts $tags special key for dynamic tagging.
    """
    if not result:
        return

    # Extract $tags before storing (don't store as output variable)
    tags = result.pop("$tags", None)
    if tags:
        state.add_tags(tags)

    for key, value in result.items():
        state[self.full_name, key, context_id] = value
save_all_caches staticmethod
save_all_caches() -> int

Save all file-backed caches. Returns total entries saved.

Source code in operonx/core/ops/base.py
@staticmethod
def save_all_caches() -> int:
    """Save all file-backed caches. Returns total entries saved."""
    import json
    import struct
    from pathlib import Path

    total = 0
    for _, (path, store) in BaseOp._cache_stores.items():
        if not path or not store:
            continue
        p = Path(path)
        p.parent.mkdir(parents=True, exist_ok=True)

        buf = struct.pack("<Q", len(store))
        for h, value in store.items():
            json_bytes = json.dumps(value, default=str).encode()
            buf += struct.pack("<QI", h, len(json_bytes))
            buf += json_bytes

        p.write_bytes(buf)
        total += len(store)

    return total
run async
run(
    state: MemoryState, context_id: Optional[str] = None
) -> AsyncGenerator[tuple[Optional[str], Dict[str, Any]], None]

Execute this op as a uniform async generator.

Every op — whether it uses return or yield — is driven through the same three-layer model:

  1. User function (@op): plain return {"k": v} or yield {"k": v}. No awareness of Frame/EOF.
  2. BaseOp.run() (this method): wraps the user function via _exec_core() into a uniform async generator that yields (context_id, result) tuples. Normal op → one yield. Generator op → N yields, each in its own stream context [i].
  3. Scheduler._pump(): consumes this generator. Each yield becomes a Frame event on the queue; when the generator exhausts naturally, _pump emits one EOF event. The user never writes Frame or EOF.

Yields:

Type Description
AsyncGenerator[tuple[Optional[str], Dict[str, Any]], None]

(context_id, result) — one tuple per item produced by the op.

Source code in operonx/core/ops/base.py
async def run(
    self,
    state: "MemoryState",
    context_id: Optional[str] = None,
) -> "AsyncGenerator[tuple[Optional[str], Dict[str, Any]], None]":
    """Execute this op as a uniform async generator.

    Every op — whether it uses ``return`` or ``yield`` — is driven through
    the same three-layer model:

    1. **User function** (``@op``): plain ``return {"k": v}`` or ``yield {"k": v}``.
       No awareness of Frame/EOF.
    2. **``BaseOp.run()``** (this method): wraps the user function via
       ``_exec_core()`` into a uniform async generator that yields
       ``(context_id, result)`` tuples.  Normal op → one yield.
       Generator op → N yields, each in its own stream context ``[i]``.
    3. **``Scheduler._pump()``**: consumes this generator.  Each yield becomes
       a ``Frame`` event on the queue; when the generator exhausts naturally,
       ``_pump`` emits one ``EOF`` event.  The user never writes Frame or EOF.

    Yields:
        ``(context_id, result)`` — one tuple per item produced by the op.
    """
    if not self.enabled:
        return

    _tracing = state.tracing
    start_time = None
    end_time = None
    perf_start = perf_counter()
    duration_ms = 0.0
    _inputs = {}
    _outputs = {}
    error_msg = None

    # New event-stream tracing: emit hooks alongside legacy state-cell writes.
    # Lookup is one ContextVar.get — cached as local to keep emit O(1).
    # NullEmitter when no pipeline bound, so calls cost ~one method dispatch.
    emitter = current_emitter()
    op_var_token = None
    idx = 0
    op_started = False
    op_cancelled = False
    ctx_for_end = context_id if context_id is not None else DEFAULT_CONTEXT

    try:
        if self.delay > 0:
            await asyncio.sleep(self.delay)

        _inputs = self.get_inputs(state, context_id)

        # Record timing AFTER inputs resolved — measures actual processing,
        # not time spent waiting for upstream ops in the scheduler queue.
        start_time = datetime.now(timezone.utc) if _tracing else None
        perf_start = perf_counter()

        # New tracing: OP_START fires after inputs resolve, before exec.
        # Tracks start_time on the emitter for cancel-emit duration calc (Rule 3).
        # Media extraction: walk inputs once, replace Media instances with
        # placeholder strings + accumulate refs for the exporter to upload.
        _trace_inputs, _input_media = self._extract_trace_io(_inputs, root="inputs")
        emitter.op_start(
            self.full_name,
            ctx_for_end,
            _trace_inputs,
            media_refs=_input_media,
        )
        op_started = True
        op_var_token = _current_op_var.set((self.full_name, ctx_for_end))

        # Cache check
        if self.cache is not None:
            _cache_key = self._cache_hash(_inputs)
            _cache_store = self._get_cache_store()
            if _cache_key in _cache_store:
                _outputs = _cache_store[_cache_key]
                self.store_result(state, _outputs, context_id)
                yield context_id, _outputs
                return

        base_ctx = context_id if context_id is not None else DEFAULT_CONTEXT
        _yield_start = perf_counter()
        async for result in self._exec_core(_inputs):
            ctx = base_ctx + (f"[{idx}]",) if self.is_gen else context_id
            # Interrupt is a scheduler control event, not a result dict.
            # Forward it untouched — _pump puts it on the queue as-is.
            if isinstance(result, Interrupt):
                yield ctx, result
                idx += 1
                _yield_start = perf_counter()
                continue
            self.store_result(state, result, ctx)
            _outputs = result
            if self.is_gen and _tracing:
                _yield_end = perf_counter()
                _now = datetime.now(timezone.utc)
                self._store_metrics(
                    state,
                    ctx,
                    start_time=_now,
                    end_time=_now,
                    duration_ms=(_yield_end - _yield_start) * 1000,
                )
            elif not self.is_gen and _tracing:
                # Batch ops: record timing BEFORE yield. The yield
                # suspends this generator until the scheduler resumes
                # it, inflating duration if measured in finally.
                _batch_end = perf_counter()
                end_time = datetime.now(timezone.utc)
                duration_ms = (_batch_end - perf_start) * 1000
            if not self.is_gen and self.cache is not None:
                _cache_store[_cache_key] = result
            # New tracing: per-yield event for generator ops only.
            # Threshold (`@op(emit_yields=N)`) is a T2 task — for now N=1.
            # Strip Media off the yielded dict so raw bytes don't sit in
            # the buffer; the parent observation's outputs (last yielded)
            # gets media uploaded via the OP_END path instead.
            if self.is_gen:
                _trace_yielded, _yield_media = self._extract_trace_io(
                    result,
                    root="outputs",
                )
                emitter.op_yield(
                    self.full_name,
                    ctx,
                    _trace_yielded,
                    idx,
                    media_refs=_yield_media,
                )
            yield ctx, result
            idx += 1
            _yield_start = perf_counter()

    except asyncio.CancelledError:
        # Mark for the OP_END emit below. finally still runs because Python
        # guarantees it on CancelledError. Re-raise so the scheduler sees
        # cancellation propagate through normally (Phase B Interrupt sweep).
        op_cancelled = True
        raise
    except Exception:
        import sys

        error_msg = (
            traceback.format_exc()
            if LOGGER.isEnabledFor(ERROR)
            else f"{type(sys.exc_info()[1]).__name__}: {sys.exc_info()[1]}"
        )
        LOGGER.error(
            format_event(
                "op_error",
                request_id=state.request_id or "unknown",
                name=self.name,
                error=error_msg.rstrip(),
            ),
        )

    finally:
        if self.is_gen or not _tracing:
            # Generators: measure total wall-clock time across all yields.
            # Non-tracing: just compute duration for metrics.
            duration_ms = (perf_counter() - perf_start) * 1000
            end_time = datetime.now(timezone.utc) if _tracing else None
        # else: batch ops already set end_time/duration_ms before yield

        self._store_metrics(
            state,
            context_id,
            start_time=start_time,
            end_time=end_time,
            duration_ms=duration_ms,
        )

        # Write error state via cached index
        err_cached = self._error_idx
        if err_cached is not None and err_cached[0] is state.schema:
            err_idx = err_cached[1]
        else:
            err_idx = state.schema.get_index(self.full_name, "error")
            self._error_idx = (state.schema, err_idx)
        if err_idx >= 0:
            ctx_key = context_id if context_id is not None else DEFAULT_CONTEXT
            if error_msg is not None:
                state._cells[err_idx][ctx_key] = error_msg
            elif "error" not in _outputs:
                state._cells[err_idx][ctx_key] = None

        if _tracing:
            self._log(state.request_id, context_id, _inputs, _outputs, duration_ms)

        # New tracing: OP_END fires regardless of legacy `_tracing` flag
        # — the emitter is NullEmitter when no pipeline is bound, so this
        # is free in the no-tracer path. Status reflects what happened.
        #
        # Cancel-before-start case (op_started=False): the body never ran,
        # no OP_START was emitted, so we skip OP_END too — no orphan event.
        if op_started:
            if op_cancelled:
                status = "cancelled"
            elif error_msg is not None:
                status = "error"
            else:
                status = "ok"
            if status == "ok":
                _trace_outputs, _output_media = self._extract_trace_io(
                    _outputs,
                    root="outputs",
                )
            else:
                _trace_outputs, _output_media = {}, []
            emitter.op_end(
                self.full_name,
                ctx_for_end,
                outputs=_trace_outputs,
                status=status,
                duration_ms=duration_ms,
                yield_count=idx,
                media_refs=_output_media,
            )
        if op_var_token is not None:
            _current_op_var.reset(op_var_token)

        if duration_ms > 100 and LOGGER.isEnabledFor(WARNING):
            LOGGER.warning(
                format_event(
                    "op_slow",
                    request_id=state.request_id or "unknown",
                    full_name=self.full_name,
                    duration_ms=f"{duration_ms:.1f}",
                ),
            )
serialize
serialize() -> dict

Serialize this op to a config dict for the Rust backend.

Source code in operonx/core/ops/base.py
def serialize(self) -> dict:
    """Serialize this op to a config dict for the Rust backend."""
    is_async = inspect.iscoroutinefunction(self.core)
    is_gen = inspect.isgeneratorfunction(self.core) or inspect.isasyncgenfunction(self.core)
    base = {
        "type": self.type,
        "full_name": self.full_name,
        "name": self.name,
        "func_name": getattr(self.core, "_func_name", getattr(self.core, "__name__", None)),
        "python_callable": self.core,
        "is_async": is_async,
        "is_generator": is_gen,
        "enabled": self.enabled,
        "verbose": self.verbose,
        "stream": self.stream,
        "bound": self.bound,
        "inputs": self._serialize_params(self.inputs),
        "outputs": self._serialize_params(self.outputs),
    }
    if self.cache is not None:
        base["cache"] = self.cache
    if self.delay > 0:
        base["delay"] = self.delay
    return base

DummyOp

DummyOp(name: str)

Bases: BaseOp

Sentinel op used as START, END, and PARENT markers.

Source code in operonx/core/ops/_edges.py
def __init__(self, name: str):
    super().__init__(name=name)
Functions
shared
shared(**kwargs)

Declare shared vars on current graph. Only valid on PARENT.

Shared vars persist across all stream contexts within the graph. Normal PARENT vars are copied per stream context.

Usage::

@graph
def pipeline():
    PARENT.shared(current_state="REMINDER", history=[])
    # PARENT["current_state"] now shared across all stream contexts
Source code in operonx/core/ops/_edges.py
def shared(self, **kwargs):
    """Declare shared vars on current graph. Only valid on PARENT.

    Shared vars persist across all stream contexts within the graph.
    Normal PARENT vars are copied per stream context.

    Usage::

        @graph
        def pipeline():
            PARENT.shared(current_state="REMINDER", history=[])
            # PARENT["current_state"] now shared across all stream contexts
    """
    if self.name != "__PARENT__":
        raise TypeError("shared() can only be called on PARENT")
    current_graph = get_current()
    if current_graph is None:
        raise RuntimeError("PARENT.shared() must be called inside a @graph function body")
    if not hasattr(current_graph, "_shared_vars"):
        current_graph._shared_vars = {}
    current_graph._shared_vars.update(kwargs)

ScratchAccessor

Dict-like accessor for per-call scratch space.

  • Inside an op body (ContextVar bound): reads/writes the live scratch dict on the current MemoryState.
  • At graph-construction time (ContextVar unbound): __getitem__ returns a ScratchRef marker, post-resolved by BaseOp.get_inputs(). __setitem__ raises — write-outside-run is a programming error.

SoftEdge

SoftEdge(op: BaseOp)

Marker for soft-edge connection. Use ~op syntax.

Source code in operonx/core/ops/_edges.py
def __init__(self, op: "_BaseOp"):
    self.op = op

Branch

Branch(name: Optional[str] = None, **kwargs)

Fluent builder for creating a BranchOp.

Example::

router = (if_(PARENT["score"] >= 90, "excellent")
          .if_(PARENT["score"] >= 70, "good")
          .else_("fail"))

Initialise the builder.

Parameters:

Name Type Description Default
name Optional[str]

Op name. If None, auto-inferred from the variable name.

None
Source code in operonx/core/ops/flow/branch_op.py
def __init__(self, name: Optional[str] = None, **kwargs):
    """Initialise the builder.

    Args:
        name: Op name. If None, auto-inferred from the variable name.
    """
    self._name = name
    self._cases: List[Tuple[Ref, str]] = []
    self._default: Optional[str] = None
    self._inputs: Dict[str, Any] = {}
    self._kwargs = kwargs
Functions
if_
if_(condition: Ref, target: Union[str, BaseOp]) -> Branch

Add a condition–target case.

Parameters:

Name Type Description Default
condition Ref

Ref with comparison (e.g., PARENT["score"] >= 90).

required
target Union[str, BaseOp]

Target op or op name.

required

Returns:

Type Description
Branch

self for chaining.

Source code in operonx/core/ops/flow/branch_op.py
def if_(self, condition: Ref, target: Union[str, BaseOp]) -> "Branch":
    """Add a condition–target case.

    Args:
        condition: Ref with comparison (e.g., ``PARENT["score"] >= 90``).
        target: Target op or op name.

    Returns:
        self for chaining.
    """
    target_name = target.name if hasattr(target, "name") else target
    self._cases.append((condition, target_name))
    return self
else_
else_(target: Union[str, BaseOp]) -> BranchOp

Set default target and build the BranchOp.

Parameters:

Name Type Description Default
target Union[str, BaseOp]

Fallback target when no condition matches.

required

Returns:

Type Description
BranchOp

The constructed BranchOp.

Source code in operonx/core/ops/flow/branch_op.py
@register_skip
def else_(self, target: Union[str, BaseOp]) -> "BranchOp":
    """Set default target and build the BranchOp.

    Args:
        target: Fallback target when no condition matches.

    Returns:
        The constructed BranchOp.
    """
    self._default = target.name if hasattr(target, "name") else target
    return self._build()
build
build() -> BranchOp

Build the BranchOp without a default target.

Returns:

Type Description
BranchOp

The constructed BranchOp.

Source code in operonx/core/ops/flow/branch_op.py
@register_skip
def build(self) -> "BranchOp":
    """Build the BranchOp without a default target.

    Returns:
        The constructed BranchOp.
    """
    return self._build()

BranchOp

BranchOp(
    cases: Optional[List[Tuple[Ref, str]]] = None,
    candidates: Optional[List[str]] = None,
    default: Optional[str] = None,
    inputs: Dict[str, Any] = None,
    outputs: Dict[str, Any] = None,
    **kwargs,
)

Bases: BaseOp

Op that evaluates conditions and routes execution to different targets.

Conditions are Ref objects with comparison operators. The first matching condition determines the target. An optional anchor input overrides all conditions. Use soft edges (>>~) to connect branch targets to a merge op.

Inputs

anchor (str, optional): Hard-coded target name that overrides conditions. (any): Variables referenced in condition Refs (auto-extracted).

Outputs

target (str): Name of the selected target op. matched (str): Description of which condition matched.

Example::

router = if_(PARENT["score"] >= 90, "excellent").else_("fail")
START >> router >> ~excellent >> merge >> END
router >> ~fail >> merge
Source code in operonx/core/ops/flow/branch_op.py
def __init__(
    self,
    cases: Optional[List[Tuple[Ref, str]]] = None,
    candidates: Optional[List[str]] = None,
    default: Optional[str] = None,
    inputs: Dict[str, Any] = None,
    outputs: Dict[str, Any] = None,
    **kwargs,
):
    # Parse inputs/outputs from cases
    parsed_inputs, parsed_outputs = self._parse_cases(cases or [])

    # Call super().__init__ without inputs/outputs
    super().__init__(**kwargs)

    # Merge parsed schema with user-provided
    self._init_io(parsed_inputs, parsed_outputs, inputs, outputs)

    self.default = default.name if isinstance(default, BaseOp) else default
    self.given_candidates = candidates
    self.cases = cases or []
    self._case_descriptions = [ref.describe() for ref, _ in self.cases]

    self._set_core(self._create_core_function())
Attributes
candidates property
candidates: List[str]

List of possible target op names.

specific_metadata property
specific_metadata: Dict[str, Any]

Return subclass-specific metadata.

Functions
get_target
get_target(
    state: MemoryState, context_id: Optional[str] = None
) -> Optional[str]

Get the routed target from state.

Source code in operonx/core/ops/flow/branch_op.py
def get_target(self, state: "MemoryState", context_id: Optional[str] = None) -> Optional[str]:
    """Get the routed target from state."""
    return state[self.full_name, "target", context_id]
serialize
serialize() -> dict

Serialize branch op with conditions for Rust backend.

Source code in operonx/core/ops/flow/branch_op.py
def serialize(self) -> dict:
    """Serialize branch op with conditions for Rust backend."""
    base = super().serialize()
    base.update(
        {
            "cases": [
                {"condition": ref.serialize(), "target": target} for ref, target in self.cases
            ],
            "default": self.default,
            "candidates": self.given_candidates,
        }
    )
    return base

GraphOp

GraphOp(concurrency: int = 64, **kwargs)

Bases: BaseOp

Container op that holds and executes a directed graph of child ops.

Lifecycle::

1. DEFINE        with GraphOp(name="wf") as g:
                     a = double(x=PARENT["x"])
                     b = add(a=a["result"], b=PARENT["y"])
                     START >> a >> b >> END
                 Ops auto-register via context manager. Edges via >> operator.
                 Inputs/outputs auto-discovered from PARENT refs.

2. BUILD         g.build()  (or auto on first run)
                 _setup_schema    scan PARENT refs → graph inputs/outputs
                 _setup_endpoints find entry/exit ops from topology
                 _build()         adj list + ready counts + stream ready counts
                 validate         branch targets, cycles, reachability, refs

3. EXECUTE       g.run(state, context_id)  — async generator
                 → run_task_scheduler()  drives ops via Frame/EOF events
                 → yields (ctx, outputs) per batch or per stream frame
                 → loop iteration handled inside scheduler EOF handler

4. EXPORT        serialize()  config dict for Rust backend
                 validate()   graph structure validation
                 show()       debug display
Source code in operonx/core/ops/graph/graph_op.py
def __init__(self, concurrency: int = 64, **kwargs):
    super().__init__(**kwargs)
    self._token = None
    self._is_building = True
    self._ops: Dict[str, BaseOp] = {}
    self._edges = {}
    self.entries = []
    self.exits = []
    self.prevs = defaultdict(list)
    self.nexts = defaultdict(list)
    self.concurrency = concurrency
    self._loop_config = None
    self._shared_vars = {}  # {var_name: initial_value} — set by PARENT.shared()
    self._adj = {}  # {op_name: [Link(dst, soft), ...]}
    self._initial_ready = {}  # {op_name: ready_count}
    self._stream_initial_ready = {}  # {gen_name: {op_name: ready_count_for_stream_ctx}}
    self._scheduler = None  # set by build()
    self._out_vars: Dict[
        str, dict
    ] = {}  # {op_name: {src_var: dest_var}} — vars mapped to PARENT output
Functions
loop classmethod
loop(
    name: Optional[str] = None,
    until: Optional[Union[str, Callable]] = None,
    max_iterations: int = 100,
    **initial_state: Any,
)

Create a GraphOp configured for feedback-loop execution.

Each iteration re-runs the graph's scheduler, carrying forward outputs as the next iteration's inputs. Stops when until evaluates to True or max_iterations is reached.

Parameters:

Name Type Description Default
name Optional[str]

Graph name.

None
until Optional[Union[str, Callable]]

Stop condition — a string expression (evaluated against outputs) or a callable (outputs_dict) -> bool.

None
max_iterations int

Safety cap on iterations (default 100).

100
**initial_state Any

Initial values for loop variables, injected as inputs.

{}

Example::

with GraphOp.loop(name="counter", until="count >= 5", count=0) as g:
    inc = increment(counter=PARENT["count"])
    inc["counter"] >> PARENT["count"]
    START >> inc >> END
Source code in operonx/core/ops/graph/graph_op.py
@classmethod
def loop(
    cls,
    name: Optional[str] = None,
    until: Optional[Union[str, Callable]] = None,
    max_iterations: int = 100,
    **initial_state: Any,
):
    """Create a GraphOp configured for feedback-loop execution.

    Each iteration re-runs the graph's scheduler, carrying forward outputs
    as the next iteration's inputs. Stops when ``until`` evaluates to True
    or ``max_iterations`` is reached.

    Args:
        name: Graph name.
        until: Stop condition — a string expression (evaluated against outputs)
               or a callable ``(outputs_dict) -> bool``.
        max_iterations: Safety cap on iterations (default 100).
        **initial_state: Initial values for loop variables, injected as inputs.

    Example::

        with GraphOp.loop(name="counter", until="count >= 5", count=0) as g:
            inc = increment(counter=PARENT["count"])
            inc["counter"] >> PARENT["count"]
            START >> inc >> END
    """
    g = cls(name=name, inputs=initial_state or None)
    g._loop_config = LoopConfig(
        until=until,
        max_iterations=max_iterations,
    )
    return g
get_current_graph staticmethod
get_current_graph() -> Optional[GraphOp]

Return the current graph from context.

Source code in operonx/core/ops/graph/graph_op.py
@staticmethod
def get_current_graph() -> Optional["GraphOp"]:
    """Return the current graph from context."""
    try:
        return _current_graph.get()
    except LookupError:
        return None
add_op
add_op(op: BaseOp) -> BaseOp

Add an op to the graph.

Source code in operonx/core/ops/graph/graph_op.py
def add_op(self, op: BaseOp) -> BaseOp:
    """Add an op to the graph."""
    if not self._is_building:
        raise RuntimeError("Cannot add op after graph has been built")

    if getattr(op, "_is_operonx_builder", False):
        name = getattr(op, "_name", None) or type(op).__name__
        LOGGER.error(
            "%s '%s' is not built. Call .build() or .else_() before adding to graph.",
            type(op).__name__,
            name,
        )
        raise TypeError(
            f"{type(op).__name__} '{name}' is not built. "
            f"Call .build() or .else_() to create the op."
        )

    if op in [START, END]:
        return op

    if op.name in self._ops:
        LOGGER.warning(
            "Graph [highlight]%s[/highlight]: op [highlight]%s[/highlight] already exists and will be overwritten",
            self.name,
            op.name,
        )

    self._ops[op.name] = op

    if hasattr(op, "start") and op.start:
        if op.name not in self.entries:
            self.entries.append(op.name)

    if hasattr(op, "end") and op.end:
        if op.name not in self.exits:
            self.exits.append(op.name)

    return op
add_edge
add_edge(
    source: str, target: str, type: EdgeType = "normal", soft: bool = False
)

Add an edge between two ops.

Parameters:

Name Type Description Default
source str

Source op name.

required
target str

Target op name.

required
type EdgeType

Edge type (normal, lookback, condition).

'normal'
soft bool

If True, edge does not count toward ready_count. Used for branch outputs when only one branch executes.

False
Source code in operonx/core/ops/graph/graph_op.py
def add_edge(self, source: str, target: str, type: EdgeType = "normal", soft: bool = False):
    """Add an edge between two ops.

    Args:
        source: Source op name.
        target: Target op name.
        type: Edge type (normal, lookback, condition).
        soft: If True, edge does not count toward ready_count.
              Used for branch outputs when only one branch executes.
    """
    if not self._is_building:
        raise RuntimeError("Cannot add edge after graph has been built!")

    if source == START.name:
        if target not in self._ops:
            raise ValueError(f"Target op '{target}' not found")

        target_node = self._ops[target]
        target_node.start = True

        if target not in self.entries:
            self.entries.append(target)

        return

    if target == END.name:
        if source not in self._ops:
            raise ValueError(f"Source op '{source}' not found")

        source_node = self._ops[source]
        source_node.end = True

        if source not in self.exits:
            self.exits.append(source)

        return

    if target == PARENT.name:
        return

    if source not in self._ops:
        raise ValueError(f"Source op '{source}' not found")
    if target not in self._ops:
        raise ValueError(f"Target op '{target}' not found")

    new_edge = EdgeConfig(from_node=source, to_node=target, type=type, soft=soft)
    if (source, target) not in self._edges:
        self._edges[source, target] = new_edge
        self.nexts[source].append(target)
        self.prevs[target].append(source)
build
build()

Build graph: children first, then schema → endpoints → topology → validation.

Source code in operonx/core/ops/graph/graph_op.py
def build(self):
    """Build graph: children first, then schema → endpoints → topology → validation."""
    for child in self._ops.values():
        if hasattr(child, "build"):
            child.build()

    self._setup_schema()
    self._setup_endpoints()

    result = self.validate()
    result.raise_if_errors()

    self._build()

    self._scheduler = Scheduler(self)
    self._is_building = False
    self._cache_full_names()

    # Auto-detect bound from children, with user override.
    # If user explicitly set bound on the graph, respect it.
    # Otherwise: all children sync → graph is sync (inline); any io/cpu → task.
    if self.bound is None:
        if all(getattr(op, "bound", None) == "sync" for op in self._ops.values()):
            self.bound = "sync"
        else:
            self.bound = "io"
run async
run(
    state: MemoryState, context_id: Optional[tuple] = None
) -> AsyncGenerator[Tuple[tuple, Dict[str, Any]], None]

Execute graph: get inputs → schedule ops → loop if needed → store results.

Source code in operonx/core/ops/graph/graph_op.py
async def run(
    self,
    state: "MemoryState",
    context_id: Optional[tuple] = None,
) -> AsyncGenerator[Tuple[tuple, Dict[str, Any]], None]:
    """Execute graph: get inputs → schedule ops → loop if needed → store results."""

    if context_id is None:
        context_id = DEFAULT_CONTEXT

    request_id = state.request_id
    start_time = datetime.now(timezone.utc)
    perf_start = perf_counter()
    _inputs = {}
    _outputs = {}
    error_msg = None

    try:
        _inputs = self.get_inputs(state, context_id=context_id)

        if self._is_building:
            self.build()

        _outputs, stream_ctxs = await self._scheduler.run(state, context_id)

        _has_generators = any(op.is_gen for op in self._ops.values())
        if not stream_ctxs and not _has_generators:
            self.store_result(state, _outputs, context_id)
            yield context_id, _outputs
        else:
            for sctx in stream_ctxs:
                item = self.get_outputs(state, context_id=sctx)
                if any(v is not None for v in item.values()):
                    self.store_result(state, item, sctx)
                    yield sctx, item

    except Exception:
        import sys

        error_msg = (
            traceback.format_exc()
            if LOGGER.isEnabledFor(40)
            else f"{type(sys.exc_info()[1]).__name__}: {sys.exc_info()[1]}"
        )
        LOGGER.error(
            "[title]\\[%s][/title] Error in op [highlight]%s[/highlight]:\n%s",
            request_id,
            self.name,
            error_msg.rstrip(),
        )

    finally:
        end_time = datetime.now(timezone.utc)
        duration_ms = (perf_counter() - perf_start) * 1000
        self._log(request_id, context_id, _inputs, _outputs, duration_ms)
        self._store_metrics(
            state,
            context_id,
            start_time=start_time,
            end_time=end_time,
            duration_ms=duration_ms,
        )
        if error_msg is not None:
            state[self.full_name, "error", context_id] = error_msg
serialize
serialize() -> dict

Serialize full graph to config dict for the Rust backend.

Note: the key "initial_ready_count" is kept as-is for Rust backend compatibility even though the internal Python attribute was renamed to _initial_ready during the scheduler rewrite.

Source code in operonx/core/ops/graph/graph_op.py
def serialize(self) -> dict:
    """Serialize full graph to config dict for the Rust backend.

    Note: the key ``"initial_ready_count"`` is kept as-is for Rust backend
    compatibility even though the internal Python attribute was renamed to
    ``_initial_ready`` during the scheduler rewrite.
    """
    base = super().serialize()
    base.update(
        {
            "ops": {name: op.serialize() for name, op in self._ops.items()},
            "edges": [
                {"from": src, "to": dst, "soft": edge.soft}
                for (src, dst), edge in self._edges.items()
            ],
            "entries": list(self.entries),
            "exits": list(self.exits),
            "initial_ready_count": dict(self._initial_ready),
            "compiled_adj": {
                op: [[link.dst, link.soft] for link in links] for op, links in self._adj.items()
            },
            "stream_initial_ready": self._stream_initial_ready,
            "loop_config": {
                "until": self._loop_config.until
                if isinstance(self._loop_config.until, str)
                else None,
                "max_iterations": self._loop_config.max_iterations,
            }
            if self._loop_config
            else None,
            "max_stream_concurrent": self.concurrency,
        }
    )
    return base
validate
validate() -> ValidationResult

Run all validations and return result.

Source code in operonx/core/ops/graph/graph_op.py
def validate(self) -> ValidationResult:
    """Run all validations and return result."""
    return validate_graph(
        self.name,
        self._ops,
        self._edges,
        self.prevs,
        self.nexts,
        self.entries,
        self.exits,
    )
show
show(indent=0)

Display graph structure (debug).

Source code in operonx/core/ops/graph/graph_op.py
def show(self, indent=0):
    """Display graph structure (debug)."""
    prefix = "  " * indent
    LOGGER.debug("%sGraph: %s", prefix, self.name)
    LOGGER.debug("%sOps: %s", prefix, list(self._ops.keys()))
    LOGGER.debug("%sEdges:", prefix)
    for edge in self._edges.values():
        soft_marker = " (soft)" if edge.soft else ""
        LOGGER.debug(
            "%s  %s -> %s: %s%s", prefix, edge.from_node, edge.to_node, edge.type, soft_marker
        )
    LOGGER.debug("%sReady count: %s", prefix, dict(self._initial_ready))

    for child in self._ops.values():
        if isinstance(child, GraphOp):
            child.show(indent + 1)

FuncOp

FuncOp(
    code_fn: Optional[Callable] = None,
    return_keys: Optional[List[str]] = None,
    inputs: Dict[str, Any] = None,
    outputs: Dict[str, Any] = None,
    _mappings: Dict[str, Any] = None,
    **kwargs,
)

Bases: BaseOp

Op that executes a Python function.

Inputs and outputs are auto-extracted from the function's signature and return-statement AST. Both sync and async functions are supported. Prefer the @op decorator over instantiating FuncOp directly.

Inputs

Auto-parsed from the function's parameter list.

Outputs

Auto-parsed from return {"key": ...} via AST, or from explicit return_keys.

Example::

@op
def add(a: int, b: int):
    return {"sum": a + b}

with GraphOp(name="main") as graph:
    result = add(a=PARENT["x"], b=PARENT["y"])
    START >> result >> END
Source code in operonx/core/ops/transform/func_op.py
def __init__(
    self,
    code_fn: Optional[Callable] = None,
    return_keys: Optional[List[str]] = None,
    inputs: Dict[str, Any] = None,
    outputs: Dict[str, Any] = None,
    _mappings: Dict[str, Any] = None,
    **kwargs,
):
    # Parse inputs/outputs từ function signature/AST
    parsed_inputs, parsed_outputs = self._parse_function(code_fn, return_keys)

    # Split _mappings into inputs/outputs using parsed schema
    if _mappings:
        for key, value in _mappings.items():
            if key in parsed_inputs:
                if inputs is None:
                    inputs = {}
                inputs[key] = value
            elif key in parsed_outputs:
                if outputs is None:
                    outputs = {}
                outputs[key] = value
            else:
                raise TypeError(
                    f"'{key}' is not a known input or output of {code_fn.__name__}(). "
                    f"Inputs: {set(parsed_inputs)}, Outputs: {set(parsed_outputs)}"
                )

    # Resolve cache from @op(cache=...) if not set in kwargs
    if "cache" not in kwargs:
        fn_cache = getattr(code_fn, "_op_cache", None)
        if fn_cache is not None:
            kwargs["cache"] = fn_cache

    # Gọi super().__init__ không truyền inputs/outputs
    super().__init__(**kwargs)

    # Merge parsed schema with user-provided (handles {"*": PARENT} wildcard)
    self._init_io(parsed_inputs, parsed_outputs, inputs, outputs)

    self.code_fn = code_fn
    self._set_core(code_fn)

    # Lấy source code
    try:
        self.source = inspect.getsource(code_fn) if code_fn else ""
    except:
        self.source = str(code_fn) if code_fn else ""

    # Set description từ docstring nếu chưa có
    if not self.description and code_fn and code_fn.__doc__:
        self.description = code_fn.__doc__.strip().split("\n")[0]
Attributes
specific_metadata property
specific_metadata: Dict[str, Any]

Trả về metadata riêng của subclass.

Functions
run async
run(
    state: MemoryState, context_id: Optional[str] = None
) -> AsyncGenerator[Tuple[Optional[str], Dict[str, Any]], None]

Execute FuncOp with CodeError wrapping.

Delegates to BaseOp.run() (async generator) and re-raises any exception as a CodeError with full op context attached.

Source code in operonx/core/ops/transform/func_op.py
async def run(
    self,
    state: "MemoryState",
    context_id: Optional[str] = None,
) -> AsyncGenerator[Tuple[Optional[str], Dict[str, Any]], None]:
    """Execute FuncOp with CodeError wrapping.

    Delegates to ``BaseOp.run()`` (async generator) and re-raises any
    exception as a ``CodeError`` with full op context attached.
    """
    try:
        async for ctx, result in super().run(state, context_id):
            yield ctx, result
    except CodeError:
        raise  # Đã wrapped, không wrap lại
    except Exception as e:
        # Lấy inputs để có context cho error
        _inputs = self.get_inputs(state, context_id)
        raise CodeError(
            message=f"Function '{self.code_fn.__name__ if self.code_fn else 'unknown'}' raised an exception",
            function_name=self.code_fn.__name__ if self.code_fn else "unknown",
            source=self.source,
            inputs=_inputs,
            original_error=e,
        ) from e

ParserOp

ParserOp(
    format: ParserType = "xml",
    extract: Optional[List[str]] = None,
    inputs: Dict[str, Any] = None,
    outputs: Dict[str, Any] = None,
    **kwargs,
)

Bases: BaseOp

Op that parses text into structured data.

Supports multiple formats (JSON, XML, YAML) and extracts fields using dot-separated chain paths (e.g. "user.address.city: str"). Commonly used as the final stage inside a ChainOp pipeline.

Inputs

text (str): Raw text to parse (e.g. LLM output).

Outputs

Dynamically generated from the extract list — one output key per extracted field.

Example::

parser = ParserOp(
    format="json",
    extract=["user.name: str", "user.age: int"],
    inputs={"text": llm["content"]},
)
Source code in operonx/core/ops/transform/parser_op.py
def __init__(
    self,
    format: ParserType = "xml",
    extract: Optional[List[str]] = None,
    inputs: Dict[str, Any] = None,
    outputs: Dict[str, Any] = None,
    **kwargs,
):
    if not extract:
        raise TypeError("extract là bắt buộc")

    # Parse schema thành format có cấu trúc
    extract_fields = [ExtractField.from_string(schema_str) for schema_str in extract]

    # Parse inputs/outputs từ extract
    parsed_inputs = {
        "text": Param(type=str, required=True),
        "validators": Param(type=dict, required=False),
    }
    parsed_outputs = {field.output_key: Param() for field in extract_fields}

    # Gọi super().__init__ không truyền inputs/outputs
    super().__init__(**kwargs)

    # Merge parsed schema with user-provided
    self._init_io(parsed_inputs, parsed_outputs, inputs, outputs)

    self.format = format or "xml"
    self.extract = extract
    self.extract_fields = extract_fields

    # Serialize parser config as literal inputs so Rust can read them
    mode_param = Param(type=str, required=False)
    mode_param.value = self.format
    self.inputs["mode"] = mode_param

    schema_param = Param(type=list, required=False)
    schema_param.value = self.extract
    self.inputs["schema"] = schema_param

    self.backend = self._create_parser()
    self._set_core(self._process)
Attributes
specific_metadata property
specific_metadata: Dict[str, Any]

Trả về metadata riêng của subclass.

Functions

shorthand

shorthand(fn)

Decorator for Op.of() classmethods.

Registers the function for auto-naming frame skip via register_skip() and wraps as classmethod.

Usage::

class MyOp(BaseOp):
    @shorthand
    def of(cls, my_param=None, **kwargs):
        inputs, init_kwargs = split_shorthand_kwargs(kwargs)
        return cls(my_param=my_param, inputs=inputs or None, **init_kwargs)
Source code in operonx/core/ops/_shortcuts.py
def shorthand(fn):
    """Decorator for ``Op.of()`` classmethods.

    Registers the function for auto-naming frame skip via ``register_skip()``
    and wraps as ``classmethod``.

    Usage::

        class MyOp(BaseOp):
            @shorthand
            def of(cls, my_param=None, **kwargs):
                inputs, init_kwargs = split_shorthand_kwargs(kwargs)
                return cls(my_param=my_param, inputs=inputs or None, **init_kwargs)
    """
    register_skip(fn)
    return classmethod(fn)

split_shorthand_kwargs

split_shorthand_kwargs(kwargs: dict, extra_init_keys: set = None) -> tuple

Split flat kwargs into (inputs, init_kwargs).

Used by shorthand functions (llm_, for_, op, etc.) to separate op constructor kwargs from input mappings.

Parameters:

Name Type Description Default
kwargs dict

Flat keyword arguments from shorthand function.

required
extra_init_keys set

Additional op-specific init keys beyond base keys (e.g., {'max_concurrency', 'callback'} for iteration ops).

None

Returns:

Type Description
tuple

(inputs, init_kwargs) tuple where:

tuple
  • inputs: Dict of input variable mappings
tuple
  • init_kwargs: Dict of op constructor arguments
Example
Provider ops - just base keys

inputs, init_kwargs = split_shorthand_kwargs(kwargs)

Iteration ops - with extra keys

inputs, init_kwargs = split_shorthand_kwargs( kwargs, {'max_concurrency', 'until', 'callback'} )

Source code in operonx/core/ops/_shortcuts.py
def split_shorthand_kwargs(kwargs: dict, extra_init_keys: set = None) -> tuple:
    """Split flat kwargs into (inputs, init_kwargs).

    Used by shorthand functions (llm_, for_, op, etc.) to separate
    op constructor kwargs from input mappings.

    Args:
        kwargs: Flat keyword arguments from shorthand function.
        extra_init_keys: Additional op-specific init keys beyond base keys
            (e.g., {'max_concurrency', 'callback'} for iteration ops).

    Returns:
        (inputs, init_kwargs) tuple where:
        - inputs: Dict of input variable mappings
        - init_kwargs: Dict of op constructor arguments

    Example:
        # Provider ops - just base keys
        inputs, init_kwargs = split_shorthand_kwargs(kwargs)

        # Iteration ops - with extra keys
        inputs, init_kwargs = split_shorthand_kwargs(
            kwargs,
            {'max_concurrency', 'until', 'callback'}
        )
    """
    init_keys = _BASE_INIT_KEYS | (extra_init_keys or set())

    inputs = {}
    init_kwargs = {}

    for key, value in kwargs.items():
        if key in init_keys and not isinstance(value, Ref):
            init_kwargs[key] = value
        else:
            if key in init_keys:
                LOGGER.warning(
                    "Keyword '%s' is a reserved op parameter (one of %s) but received a Ref value. "
                    "It will be treated as an input mapping instead of an op constructor arg. "
                    "Consider renaming this parameter to avoid ambiguity.",
                    key,
                    sorted(init_keys),
                )
            inputs[key] = value

    return inputs, init_kwargs

if_

if_(condition: Ref, target: Union[str, BaseOp]) -> Branch

Start a branch declaration with the first condition.

Example::

router = if_(PARENT["score"] >= 90, "excellent").else_("fail")
Source code in operonx/core/ops/flow/branch_op.py
def if_(condition: Ref, target: Union[str, BaseOp]) -> Branch:
    """Start a branch declaration with the first condition.

    Example::

        router = if_(PARENT["score"] >= 90, "excellent").else_("fail")
    """
    return Branch().if_(condition, target)

op

op(
    func: Optional[Callable] = None,
    *,
    executor: Optional[str] = None,
    bound: Optional[str] = None,
    cache=None,
    delay: float = 0,
)

Decorator that turns a plain function into a FuncOp factory.

Can be used bare or with keyword arguments::

@op
def double(x: int):
    return {"result": x * 2}

@op(bound="cpu")
def heavy_compute(data: list):
    return {"result": process(data)}

@op(bound="io")
async def call_api(url: str):
    return {"data": await fetch(url)}

Parameters:

Name Type Description Default
bound Optional[str]

Execution bound hint for the scheduler. None (default) auto-detects: async → "io", sync → "sync". "sync" — inline dispatch, no asyncio task (fastest). "io" — asyncio task, for network/disk I/O. "cpu" — asyncio.to_thread(), for heavy compute (C extensions release GIL).

None
executor Optional[str]

Deprecated — use bound="cpu" instead of executor="thread".

None
Source code in operonx/core/ops/transform/func_op.py
def op(
    func: Optional[Callable] = None,
    *,
    executor: Optional[str] = None,
    bound: Optional[str] = None,
    cache=None,
    delay: float = 0,
):
    """Decorator that turns a plain function into a FuncOp factory.

    Can be used bare or with keyword arguments::

        @op
        def double(x: int):
            return {"result": x * 2}

        @op(bound="cpu")
        def heavy_compute(data: list):
            return {"result": process(data)}

        @op(bound="io")
        async def call_api(url: str):
            return {"data": await fetch(url)}

    Args:
        bound: Execution bound hint for the scheduler.
            ``None`` (default) auto-detects: async → ``"io"``, sync → ``"sync"``.
            ``"sync"``  — inline dispatch, no asyncio task (fastest).
            ``"io"``    — asyncio task, for network/disk I/O.
            ``"cpu"``   — asyncio.to_thread(), for heavy compute (C extensions release GIL).
        executor: Deprecated — use ``bound="cpu"`` instead of ``executor="thread"``.
    """
    # Backward compat: executor="thread" → bound="cpu"
    if executor == "thread" and bound is None:
        bound = "cpu"

    def decorator(fn):
        module = fn.__module__ or ""
        if module in ("__main__", "") or "." not in module:
            fn._func_name = fn.__name__
        else:
            fn._func_name = f"{module}.{fn.__name__}"
        if bound is not None:
            fn._op_bound = bound
        if cache is not None:
            fn._op_cache = cache
        sig = inspect.signature(fn)
        collisions = set(sig.parameters.keys()) & _BASE_INIT_KEYS
        if collisions:
            LOGGER.warning(
                "@op function '%s' has parameter(s) %s that collide with reserved op keywords %s. "
                "When called via shorthand (e.g. %s(name=PARENT['name'])), these may be misinterpreted "
                "as op constructor args instead of function inputs. Consider renaming them.",
                fn.__name__,
                sorted(collisions),
                sorted(_BASE_INIT_KEYS),
                fn.__name__,
            )

        @wraps(fn)
        def wrapper(**kwargs):
            mappings, init_kwargs = split_shorthand_kwargs(kwargs, {"return_keys"})
            op_bound = init_kwargs.pop("bound", bound)
            # Backward compat: call-time executor="thread" → bound="cpu"
            op_executor = init_kwargs.pop("executor", None)
            if op_executor == "thread" and op_bound is None:
                op_bound = "cpu"
            op_delay = init_kwargs.pop("delay", delay)
            return FuncOp(
                code_fn=fn,
                bound=op_bound,
                delay=op_delay,
                _mappings=mappings or None,
                **init_kwargs,
            )

        register_skip(wrapper)
        wrapper.__wrapped__ = fn
        return wrapper

    if func is not None:
        # @op without parentheses
        return decorator(func)
    # @op(bound="cpu") with parentheses
    return decorator