operonx.core¶
Engine, op decorators, graph composition, state markers, and middleware. This page is the primary public surface — everything you need to build and run a workflow without touching providers or telemetry.
Engine¶
Operon
¶
Operon(
graph: Union[GraphOp, Callable[..., GraphOp]],
*,
params: Optional[Dict[str, Any]] = None,
tracer: Optional[Union[Tracer, List[Tracer]]] = None,
)
Workflow execution engine.
Operon takes a GraphOp and provides execution capabilities: - Builds and validates the graph structure - Creates state schema for data flow - Executes workflows with fresh state per run - Integrates with tracers for observability
Attributes:
| Name | Type | Description |
|---|---|---|
graph |
The GraphOp to execute |
|
name |
Workflow name (from graph) |
|
schema |
StateSchema
|
State schema for the workflow |
Example
# Define graph
with GraphOp(name="chatbot") as graph:
prompt = PromptOp(name="prompt", ...)
llm = LLMOp(name="llm", ...)
START >> prompt >> llm >> END
# Create engine (builds automatically)
engine = Operon(graph)
# Run multiple times with fresh state
result = await engine.run(inputs={"query": "Hello!"})
print(result["response"]) # workflow output
print(result["$state"]) # MemoryState for debugging
# Or use callable syntax
result = await engine({"query": "Goodbye!"})
Initialize Operon engine with a GraphOp or a graph factory.
Pure orchestrator — does not load .env or resources.yaml.
Call :func:operonx.bootstrap (or :meth:ResourceHub.from_yaml directly)
before constructing the engine if your graph uses provider ops.
Pure-compute graphs need no setup.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph
|
Union[GraphOp, Callable[..., GraphOp]]
|
A GraphOp workflow, or a callable that returns one.
When a callable is passed, it is invoked with |
required |
params
|
Optional[Dict[str, Any]]
|
Keyword arguments passed to the graph factory. Ignored
when graph is already a GraphOp. Defaults to |
None
|
tracer
|
Optional[Union[Tracer, List[Tracer]]]
|
Default tracer(s) for all run() calls. Can be overridden per-run. |
None
|
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If a provider op needs the hub but none has been
installed. The message points at |
Source code in operonx/core/engine.py
Attributes¶
Functions¶
use
¶
Add middleware to the engine. Returns self for chaining.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
middleware
|
Middleware
|
A Middleware instance to add. |
required |
Returns:
| Type | Description |
|---|---|
Operon
|
self, for fluent chaining: |
Source code in operonx/core/engine.py
start
¶
start(
inputs: Dict[str, Any],
*,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
request_id: Optional[str] = None,
tracer: Optional[Union[Tracer, List[Tracer]]] = None,
scratch: Optional[Dict[str, Any]] = None,
) -> ExecutionHandle
Start workflow execution and return a streaming handle immediately.
Does not block — the graph runs in the background. Use the handle to stream frames, await specific outputs, or collect the final result.
Tracer flush happens automatically when the scheduler completes — no explicit finalize step needed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inputs
|
Dict[str, Any]
|
Input data for the workflow |
required |
user_id
|
Optional[str]
|
Optional user identifier (auto-generated if not provided) |
None
|
session_id
|
Optional[str]
|
Optional session identifier (auto-generated if not provided) |
None
|
request_id
|
Optional[str]
|
Optional request identifier (auto-generated if not provided) |
None
|
tracer
|
Optional[Union[Tracer, List[Tracer]]]
|
Optional tracer(s) — overrides engine default for this execution. |
None
|
scratch
|
Optional[Dict[str, Any]]
|
Optional initial values for per-call scratch space. Applied
synchronously before the scheduler task is created — race-free.
Equivalent to writing |
None
|
Returns:
| Type | Description |
|---|---|
ExecutionHandle
|
ExecutionHandle — async-iterable, supports |
ExecutionHandle
|
and |
Source code in operonx/core/engine.py
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 | |
run
async
¶
run(
inputs: Dict[str, Any],
*,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
request_id: Optional[str] = None,
tracer: Optional[Union[Tracer, List[Tracer]]] = None,
scratch: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]
Execute the workflow with given inputs.
Each call creates a fresh state, so the same engine can be used for multiple independent executions. Equivalent to::
handle = engine.start(inputs, tracer=tracer, ...)
result = await handle.collect(unwrap=True)
Tracer flush happens automatically inside start() when the
scheduler completes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inputs
|
Dict[str, Any]
|
Input data for the workflow |
required |
user_id
|
Optional[str]
|
Optional user identifier (auto-generated if not provided) |
None
|
session_id
|
Optional[str]
|
Optional session identifier (auto-generated if not provided) |
None
|
request_id
|
Optional[str]
|
Optional request identifier (auto-generated if not provided) |
None
|
tracer
|
Optional[Union[Tracer, List[Tracer]]]
|
Optional tracer or list of tracers for observability.
Overrides the default tracer set in |
None
|
scratch
|
Optional[Dict[str, Any]]
|
Optional initial values for per-call scratch space. |
None
|
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dictionary containing workflow outputs plus "$state" key |
Dict[str, Any]
|
with the MemoryState for debugging/tracing access. |
Source code in operonx/core/engine.py
serve
¶
serve(
*,
path: str = "/",
host: str = "0.0.0.0",
port: int = 8000,
stream: Optional[bool] = None,
websocket: bool = False,
backend: str = "python",
**kwargs: Any,
) -> None
Serve this workflow as an HTTP API.
Convenience wrapper around operonx.serve.OperonApp. Requires operonx-serve
to be installed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
URL path for the endpoint (default: "/"). |
'/'
|
host
|
str
|
Bind address. |
'0.0.0.0'
|
port
|
int
|
Bind port. |
8000
|
stream
|
Optional[bool]
|
Enable SSE streaming endpoint. None = auto-detect. |
None
|
websocket
|
bool
|
Enable WebSocket endpoint. |
False
|
backend
|
str
|
"python" (FastAPI/uvicorn) or "rust" (Axum). |
'python'
|
**kwargs
|
Any
|
Extra arguments forwarded to |
{}
|
Source code in operonx/core/engine.py
batch
async
¶
batch(
inputs_list: List[Dict[str, Any]], *, concurrency: int = 10, **kwargs: Any
) -> List[Dict[str, Any]]
Run the workflow concurrently on multiple inputs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inputs_list
|
List[Dict[str, Any]]
|
List of input dicts to process. |
required |
concurrency
|
int
|
Max concurrent executions (default: 10). |
10
|
**kwargs
|
Any
|
Extra arguments forwarded to |
{}
|
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
List of result dicts in the same order as inputs. |
Source code in operonx/core/engine.py
cli
¶
Interactive CLI mode — read JSON from stdin, print result to stdout.
Source code in operonx/core/engine.py
input_schema
¶
output_schema
¶
Decorators¶
The two decorators that turn ordinary Python into Operonx ops:
op
¶
op(
func: Optional[Callable] = None,
*,
executor: Optional[str] = None,
bound: Optional[str] = None,
cache=None,
delay: float = 0,
)
Decorator that turns a plain function into a FuncOp factory.
Can be used bare or with keyword arguments::
@op
def double(x: int):
return {"result": x * 2}
@op(bound="cpu")
def heavy_compute(data: list):
return {"result": process(data)}
@op(bound="io")
async def call_api(url: str):
return {"data": await fetch(url)}
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bound
|
Optional[str]
|
Execution bound hint for the scheduler.
|
None
|
executor
|
Optional[str]
|
Deprecated — use |
None
|
Source code in operonx/core/ops/transform/func_op.py
graph
¶
GraphOp — container op that manages a graph of child ops.
Classes¶
GraphOp
¶
Bases: BaseOp
Container op that holds and executes a directed graph of child ops.
Lifecycle::
1. DEFINE with GraphOp(name="wf") as g:
a = double(x=PARENT["x"])
b = add(a=a["result"], b=PARENT["y"])
START >> a >> b >> END
Ops auto-register via context manager. Edges via >> operator.
Inputs/outputs auto-discovered from PARENT refs.
2. BUILD g.build() (or auto on first run)
_setup_schema scan PARENT refs → graph inputs/outputs
_setup_endpoints find entry/exit ops from topology
_build() adj list + ready counts + stream ready counts
validate branch targets, cycles, reachability, refs
3. EXECUTE g.run(state, context_id) — async generator
→ run_task_scheduler() drives ops via Frame/EOF events
→ yields (ctx, outputs) per batch or per stream frame
→ loop iteration handled inside scheduler EOF handler
4. EXPORT serialize() config dict for Rust backend
validate() graph structure validation
show() debug display
Source code in operonx/core/ops/graph/graph_op.py
Functions¶
loop
classmethod
¶
loop(
name: Optional[str] = None,
until: Optional[Union[str, Callable]] = None,
max_iterations: int = 100,
**initial_state: Any,
)
Create a GraphOp configured for feedback-loop execution.
Each iteration re-runs the graph's scheduler, carrying forward outputs
as the next iteration's inputs. Stops when until evaluates to True
or max_iterations is reached.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
Optional[str]
|
Graph name. |
None
|
until
|
Optional[Union[str, Callable]]
|
Stop condition — a string expression (evaluated against outputs)
or a callable |
None
|
max_iterations
|
int
|
Safety cap on iterations (default 100). |
100
|
**initial_state
|
Any
|
Initial values for loop variables, injected as inputs. |
{}
|
Example::
with GraphOp.loop(name="counter", until="count >= 5", count=0) as g:
inc = increment(counter=PARENT["count"])
inc["counter"] >> PARENT["count"]
START >> inc >> END
Source code in operonx/core/ops/graph/graph_op.py
get_current_graph
staticmethod
¶
add_op
¶
Add an op to the graph.
Source code in operonx/core/ops/graph/graph_op.py
add_edge
¶
Add an edge between two ops.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str
|
Source op name. |
required |
target
|
str
|
Target op name. |
required |
type
|
EdgeType
|
Edge type (normal, lookback, condition). |
'normal'
|
soft
|
bool
|
If True, edge does not count toward ready_count. Used for branch outputs when only one branch executes. |
False
|
Source code in operonx/core/ops/graph/graph_op.py
build
¶
Build graph: children first, then schema → endpoints → topology → validation.
Source code in operonx/core/ops/graph/graph_op.py
run
async
¶
run(
state: MemoryState, context_id: Optional[tuple] = None
) -> AsyncGenerator[Tuple[tuple, Dict[str, Any]], None]
Execute graph: get inputs → schedule ops → loop if needed → store results.
Source code in operonx/core/ops/graph/graph_op.py
serialize
¶
Serialize full graph to config dict for the Rust backend.
Note: the key "initial_ready_count" is kept as-is for Rust backend
compatibility even though the internal Python attribute was renamed to
_initial_ready during the scheduler rewrite.
Source code in operonx/core/ops/graph/graph_op.py
validate
¶
Run all validations and return result.
show
¶
Display graph structure (debug).
Source code in operonx/core/ops/graph/graph_op.py
GraphValidationError
¶
Bases: Exception
Exception raised when graph validation fails.
Source code in operonx/core/ops/graph/validation.py
ValidationIssue
dataclass
¶
ValidationIssue(
level: ValidationLevel,
category: str,
message: str,
op_name: Optional[str] = None,
target_name: Optional[str] = None,
available_nodes: List[str] = list(),
suggestions: List[str] = list(),
)
A single validation issue found in the graph.
ValidationLevel
¶
Bases: Enum
Severity level for validation issues.
ValidationResult
dataclass
¶
Result of graph validation.
Functions¶
raise_if_errors
¶
Raise exception if there are any errors.
Source code in operonx/core/ops/graph/validation.py
Op types¶
The base classes that compose into a workflow. Most users only touch
GraphOp directly (via with GraphOp(...) as g:) — the others are
constructed by decorators or factory helpers.
GraphOp
¶
Bases: BaseOp
Container op that holds and executes a directed graph of child ops.
Lifecycle::
1. DEFINE with GraphOp(name="wf") as g:
a = double(x=PARENT["x"])
b = add(a=a["result"], b=PARENT["y"])
START >> a >> b >> END
Ops auto-register via context manager. Edges via >> operator.
Inputs/outputs auto-discovered from PARENT refs.
2. BUILD g.build() (or auto on first run)
_setup_schema scan PARENT refs → graph inputs/outputs
_setup_endpoints find entry/exit ops from topology
_build() adj list + ready counts + stream ready counts
validate branch targets, cycles, reachability, refs
3. EXECUTE g.run(state, context_id) — async generator
→ run_task_scheduler() drives ops via Frame/EOF events
→ yields (ctx, outputs) per batch or per stream frame
→ loop iteration handled inside scheduler EOF handler
4. EXPORT serialize() config dict for Rust backend
validate() graph structure validation
show() debug display
Source code in operonx/core/ops/graph/graph_op.py
Functions¶
loop
classmethod
¶
loop(
name: Optional[str] = None,
until: Optional[Union[str, Callable]] = None,
max_iterations: int = 100,
**initial_state: Any,
)
Create a GraphOp configured for feedback-loop execution.
Each iteration re-runs the graph's scheduler, carrying forward outputs
as the next iteration's inputs. Stops when until evaluates to True
or max_iterations is reached.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
Optional[str]
|
Graph name. |
None
|
until
|
Optional[Union[str, Callable]]
|
Stop condition — a string expression (evaluated against outputs)
or a callable |
None
|
max_iterations
|
int
|
Safety cap on iterations (default 100). |
100
|
**initial_state
|
Any
|
Initial values for loop variables, injected as inputs. |
{}
|
Example::
with GraphOp.loop(name="counter", until="count >= 5", count=0) as g:
inc = increment(counter=PARENT["count"])
inc["counter"] >> PARENT["count"]
START >> inc >> END
Source code in operonx/core/ops/graph/graph_op.py
get_current_graph
staticmethod
¶
add_op
¶
Add an op to the graph.
Source code in operonx/core/ops/graph/graph_op.py
add_edge
¶
Add an edge between two ops.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str
|
Source op name. |
required |
target
|
str
|
Target op name. |
required |
type
|
EdgeType
|
Edge type (normal, lookback, condition). |
'normal'
|
soft
|
bool
|
If True, edge does not count toward ready_count. Used for branch outputs when only one branch executes. |
False
|
Source code in operonx/core/ops/graph/graph_op.py
build
¶
Build graph: children first, then schema → endpoints → topology → validation.
Source code in operonx/core/ops/graph/graph_op.py
run
async
¶
run(
state: MemoryState, context_id: Optional[tuple] = None
) -> AsyncGenerator[Tuple[tuple, Dict[str, Any]], None]
Execute graph: get inputs → schedule ops → loop if needed → store results.
Source code in operonx/core/ops/graph/graph_op.py
serialize
¶
Serialize full graph to config dict for the Rust backend.
Note: the key "initial_ready_count" is kept as-is for Rust backend
compatibility even though the internal Python attribute was renamed to
_initial_ready during the scheduler rewrite.
Source code in operonx/core/ops/graph/graph_op.py
validate
¶
Run all validations and return result.
show
¶
Display graph structure (debug).
Source code in operonx/core/ops/graph/graph_op.py
BranchOp
¶
BranchOp(
cases: Optional[List[Tuple[Ref, str]]] = None,
candidates: Optional[List[str]] = None,
default: Optional[str] = None,
inputs: Dict[str, Any] = None,
outputs: Dict[str, Any] = None,
**kwargs,
)
Bases: BaseOp
Op that evaluates conditions and routes execution to different targets.
Conditions are Ref objects with comparison operators. The first matching
condition determines the target. An optional anchor input overrides all
conditions. Use soft edges (>>~) to connect branch targets to a merge op.
Inputs
anchor (str, optional): Hard-coded target name that overrides conditions. (any): Variables referenced in condition Refs (auto-extracted).
Outputs
target (str): Name of the selected target op. matched (str): Description of which condition matched.
Example::
router = if_(PARENT["score"] >= 90, "excellent").else_("fail")
START >> router >> ~excellent >> merge >> END
router >> ~fail >> merge
Source code in operonx/core/ops/flow/branch_op.py
Attributes¶
Functions¶
get_target
¶
serialize
¶
Serialize branch op with conditions for Rust backend.
Source code in operonx/core/ops/flow/branch_op.py
FuncOp
¶
FuncOp(
code_fn: Optional[Callable] = None,
return_keys: Optional[List[str]] = None,
inputs: Dict[str, Any] = None,
outputs: Dict[str, Any] = None,
_mappings: Dict[str, Any] = None,
**kwargs,
)
Bases: BaseOp
Op that executes a Python function.
Inputs and outputs are auto-extracted from the function's signature and
return-statement AST. Both sync and async functions are supported.
Prefer the @op decorator over instantiating FuncOp directly.
Inputs
Auto-parsed from the function's parameter list.
Outputs
Auto-parsed from return {"key": ...} via AST, or from
explicit return_keys.
Example::
@op
def add(a: int, b: int):
return {"sum": a + b}
with GraphOp(name="main") as graph:
result = add(a=PARENT["x"], b=PARENT["y"])
START >> result >> END
Source code in operonx/core/ops/transform/func_op.py
Attributes¶
Functions¶
run
async
¶
run(
state: MemoryState, context_id: Optional[str] = None
) -> AsyncGenerator[Tuple[Optional[str], Dict[str, Any]], None]
Execute FuncOp with CodeError wrapping.
Delegates to BaseOp.run() (async generator) and re-raises any
exception as a CodeError with full op context attached.
Source code in operonx/core/ops/transform/func_op.py
ParserOp
¶
ParserOp(
format: ParserType = "xml",
extract: Optional[List[str]] = None,
inputs: Dict[str, Any] = None,
outputs: Dict[str, Any] = None,
**kwargs,
)
Bases: BaseOp
Op that parses text into structured data.
Supports multiple formats (JSON, XML, YAML) and extracts fields using
dot-separated chain paths (e.g. "user.address.city: str"). Commonly
used as the final stage inside a ChainOp pipeline.
Inputs
text (str): Raw text to parse (e.g. LLM output).
Outputs
Dynamically generated from the extract list — one output key
per extracted field.
Example::
parser = ParserOp(
format="json",
extract=["user.name: str", "user.age: int"],
inputs={"text": llm["content"]},
)
Source code in operonx/core/ops/transform/parser_op.py
Branch helpers¶
Branch
¶
Fluent builder for creating a BranchOp.
Example::
router = (if_(PARENT["score"] >= 90, "excellent")
.if_(PARENT["score"] >= 70, "good")
.else_("fail"))
Initialise the builder.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
Optional[str]
|
Op name. If None, auto-inferred from the variable name. |
None
|
Source code in operonx/core/ops/flow/branch_op.py
Functions¶
if_
¶
Add a condition–target case.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
condition
|
Ref
|
Ref with comparison (e.g., |
required |
target
|
Union[str, BaseOp]
|
Target op or op name. |
required |
Returns:
| Type | Description |
|---|---|
Branch
|
self for chaining. |
Source code in operonx/core/ops/flow/branch_op.py
else_
¶
Set default target and build the BranchOp.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target
|
Union[str, BaseOp]
|
Fallback target when no condition matches. |
required |
Returns:
| Type | Description |
|---|---|
BranchOp
|
The constructed BranchOp. |
Source code in operonx/core/ops/flow/branch_op.py
if_
¶
Start a branch declaration with the first condition.
Example::
router = if_(PARENT["score"] >= 90, "excellent").else_("fail")
Source code in operonx/core/ops/flow/branch_op.py
State markers¶
Constants used inside with GraphOp(...) blocks to wire edges and
references. None of these are real instances you'd construct — they're
sentinels the graph builder recognises.
| Marker | Meaning |
|---|---|
START |
Entry node. Every graph's first hard edge goes from START. |
END |
Exit node. op >> END auto-forwards op's outputs as the graph result. |
PARENT |
Reference root for inputs from engine.run(inputs={...}) or the parent graph in nested contexts. Used as PARENT["key"]. |
PENDING |
Sentinel returned by ops that absorb input without producing output. |
Middleware¶
Hook into engine lifecycle events — see Tracing for built-in tracers and middleware patterns.
Middleware
¶
Base class for engine middleware.
Subclass and override any of the hooks to add behavior. All hooks are async and called in order (before_run) or reverse order (after_run, on_error).
Functions¶
before_run
async
¶
Called before graph execution. Can modify inputs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph
|
GraphOp
|
The GraphOp being executed |
required |
inputs
|
Dict[str, Any]
|
The input dict (modify and return) |
required |
context
|
Dict[str, Any]
|
Execution context (user_id, session_id, request_id, etc.) |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
The (possibly modified) inputs dict. |
Source code in operonx/core/middleware.py
after_run
async
¶
after_run(
graph: GraphOp,
inputs: Dict[str, Any],
result: Dict[str, Any],
context: Dict[str, Any],
) -> Dict[str, Any]
Called after graph execution. Can modify result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph
|
GraphOp
|
The GraphOp that was executed |
required |
inputs
|
Dict[str, Any]
|
The original inputs |
required |
result
|
Dict[str, Any]
|
The result dict (modify and return) |
required |
context
|
Dict[str, Any]
|
Execution context |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
The (possibly modified) result dict. |
Source code in operonx/core/middleware.py
on_error
async
¶
on_error(
graph: GraphOp,
inputs: Dict[str, Any],
error: Exception,
context: Dict[str, Any],
) -> None
Called when graph execution fails.
Default behavior re-raises the error. Override to add logging, alerting, or error transformation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph
|
GraphOp
|
The GraphOp that failed |
required |
inputs
|
Dict[str, Any]
|
The original inputs |
required |
error
|
Exception
|
The exception that occurred |
required |
context
|
Dict[str, Any]
|
Execution context |
required |
Source code in operonx/core/middleware.py
Top-level convenience¶
bootstrap
¶
bootstrap(
*, resources: Optional[Union[str, Path]] = None, env: bool = True
) -> Optional[ResourceHub]
One-line setup for .env and :class:ResourceHub.
- When
envisTrue(default), load./.envfrom CWD usingpython-dotenv(non-override; existing env wins). The path is recorded inBOOTSTRAP_ENV_PATHSfor later diagnostic messages. - When
resourcesis a path, install the hub via :meth:ResourceHub.from_yaml. - When
resourcesisNone, call :meth:ResourceHub.auto— which checks./resources.yamland warns on miss. - Idempotent: if a hub is already installed, return it unchanged.
Returns the installed hub, or None if no resources.yaml was
found and none was provided. Pure-compute graphs that don't need a
hub can ignore the return value.
Source code in operonx/__init__.py
Provider-neutral types¶
The v0.7 LLMOp converter layer will translate provider-specific types to/from these at the provider boundary.
ChatMessage
¶
Bases: TypedDict
A single message in a chat conversation.
Provider-neutral shape; backends translate to / from this at the LLMOp boundary.
Required fields
role: One of "system", "user", "assistant",
"tool".
content: The message body. str for plain text;
providers may accept richer structured shapes (tool calls,
multi-modal parts) via opt-in fields below.
Optional fields
name: Speaker identifier (for tool replies and named system
prompts).
tool_call_id: When role == "tool", the id of the tool call
this message responds to.
tool_calls: When role == "assistant", the list of tool
calls the model is requesting. Shape is
provider-specific; converter layers normalise this in v0.7.