Skip to content

Operonx

Operonx is a workflow engine that runs anything as a workflow — from IO-bound AI tasks (LLMs, agents, RAG) to CPU-bound workloads needing native performance. Define complex pipelines as DAGs with async execution, built-in tracing, and a dual Python/Rust backend.

Why Operonx

  • DAG-based workflows — nodes and edges, inspired by Airflow operators.
  • Dual backend — Python for flexibility, Rust for raw speed.
  • Built-in tracing — Langfuse + OpenTelemetry, plus a local viewer.
  • Provider agnostic — OpenAI, Azure, Gemini, Anthropic, vLLM, ONNX — swap with one line.
  • Type-safe state — O(1) state access with schema validation.

Quick start

pip install operonx
import asyncio
from operonx.core import Operon, GraphOp, op, START, END, PARENT

@op
def greet(name: str):
    return {"message": f"Hello, {name}!"}

async def main():
    with GraphOp(name="hello") as graph:
        step = greet(name=PARENT["name"])
        START >> step >> END

    result = await Operon(graph).run(inputs={"name": "World"})
    print(result["message"])

asyncio.run(main())

LLM integration

pip install "operonx[standard]"

Configure resources in resources.yaml, credentials in .env, then:

import asyncio
import operonx
from operonx.core import Operon, GraphOp, START, END, PARENT
from operonx.providers import chat

async def main():
    operonx.bootstrap()  # loads ./.env + ./resources.yaml

    with GraphOp(name="chat") as graph:
        c = chat(
            resource="gpt-4o",
            template={"system": "You are a helpful assistant.", "user": "{question}"},
            question=PARENT["question"],
        )
        START >> c >> END

    result = await Operon(graph).run(inputs={"question": "What is Python?"})
    print(result["content"])

asyncio.run(main())

See Resource hub for the full setup model.

Where to go next

Repository