Parallelized autoresearch agent

Code available here.

This tutorial extends the Autoresearch agent pattern with a code-mode MLE agent that plans batches of training experiments, saves distinct train.py edits, and runs them in parallel via flyte.map. It follows the karpathy/autoresearch loop — minimize validation bits-per-byte on a TinyGPT variant — but orchestrates fan-out batches with durable Flyte tasks and unionai-sandbox execution.

Compared to the single-threaded Claude Code autoresearch tutorial, this agent:

  • Edits full train.py source (upstream karpathy style) instead of calling a remote coding CLI
  • Uses code_mode=True so the LLM writes Python plans that call batch tools such as run_experiment_batch
  • Persists a leaderboard, code-edit history, and batch plans in MemoryStore
  • Right-sizes each experiment with an LLM via a @tool call_handler, then retries on Flyte or sandbox OOM by bumping memory

Each experiment has different compute needs (wider models, larger batch sizes, longer training loops). A single static flyte.Resources on the task would either waste cluster memory or OOM on the heavy configs. Instead, this example uses the same call_handler pattern as the Flyte SDK self-correcting agent: before every run, a sizing LLM reads the tool name, docstring, and call arguments and returns a JSON resource spec; the handler applies it with tool_fn.target.override(resources=...).aio(**kwargs) and retries with more memory when needed.

Define the task environments

The example uses three environments — bundle preparation, sandbox experiments, and the agent driver — sharing a Debian-based image with PyTorch and sandbox tooling.

bundle.py
agent_env = flyte.TaskEnvironment(
    name="autoresearch-agent",
    resources=flyte.Resources(cpu=1, memory="2Gi"),
    image=image,
    include=_INCLUDE,
    secrets=[flyte.Secret(key="internal-anthropic-api-key", as_env_var="ANTHROPIC_API_KEY")],
    depends_on=[experiment_env, bundle_env],
)

Supporting modules (train.py, prepare.py, tools.py, and ui.py) live alongside the entry point in the example directory.

Right-size experiments with call_handler

The right-sizing logic lives in tools.py. execute_with_right_sizing asks the LLM for a resource estimate, runs the underlying @env.task with override(resources=...), and loops on flyte.errors.OOMError or a sandbox-reported OOM flag until the run succeeds or retries are exhausted:

tools.py
async def execute_with_right_sizing(
    call_llm: LLMCallable,
    target_task: Any,
    *,
    model: str,
    tool_name: str,
    description: str,
    max_oom_retries: int = MAX_OOM_RETRIES,
    **kwargs: Any,
) -> dict:
    """LLM-size *target_task*, run it, and retry with more memory on OOM."""
    resources = await estimate_resources(call_llm, model, tool_name, description, kwargs)
    attempt = 0
    while True:
        try:
            with flyte.group(f"{tool_name}-attempt-{attempt + 1}"):
                result = await target_task.override(resources=resources).aio(**kwargs)
        except flyte.errors.OOMError:
            if attempt >= max_oom_retries:
                flyte.logger.error("%s Flyte OOM after %d retries; giving up.", tool_name, attempt)
                raise
            resources = bump_memory(resources)
            attempt += 1
            flyte.logger.warning(
                "%s Flyte OOM; retrying with memory=%s",
                tool_name,
                resources.memory,
            )
            continue

        if isinstance(result, dict):
            result["resources"] = f"cpu={resources.cpu}, mem={resources.memory}"
            result["oom_retries"] = attempt

        if isinstance(result, dict) and result.get("oom"):
            if attempt >= max_oom_retries:
                return result
            resources = bump_memory(resources)
            attempt += 1
            flyte.logger.warning(
                "%s sandbox OOM; retrying with memory=%s",
                tool_name,
                resources.memory,
            )
            continue

        return result

def right_sizing_handler(*, max_oom_retries: int = MAX_OOM_RETRIES):
    """Build a ``@tool`` ``call_handler`` that right-sizes and self-heals on OOM."""

    async def handle(call_llm: LLMCallable, tool_fn: ToolFn, **kwargs: Any) -> Any:
        return await execute_with_right_sizing(
            call_llm,
            tool_fn.target,
            model=tool_fn.model,
            tool_name=tool_fn.name,
            description=tool_fn.description,
            max_oom_retries=max_oom_retries,
            **kwargs,
        )

    return handle

right_size = right_sizing_handler(max_oom_retries=MAX_OOM_RETRIES)

right_size is the pre-built handler passed to @tool(call_handler=...). The agent does not need a back-reference to the Agent instance — the harness passes call_llm and tool_fn.model into the handler on each invocation.

The experiment task stacks @tool(call_handler=tools.right_size) on @experiment_env.task. The task body only loads edited code and runs sandbox training; sizing and OOM recovery happen in the handler:

parallelized_autoresearch.py
@tool(call_handler=tools.right_size)
@experiment_env.task
async def run_experiment(
    title: str,
    time_budget_sec: int = 45,
    memory_key: str = tools.MEMORY_KEY_FANOUT,
) -> dict:
    """Train using agent-edited ``train.py`` with LLM right-sizing and OOM self-healing."""
    train_py = await tools.load_train_code(memory_key, title)
    config_overrides = await tools.load_config_overrides(memory_key, title)
    duplicate = await tools.check_duplicate_config(memory_key, title, train_py, config_overrides)
    if duplicate:
        result = {
            "success": False,
            "title": title,
            "error": (
                f"Duplicate config of '{duplicate['duplicate_of']}' "
                f"(signature {duplicate['config_signature']}); change train.py or overrides."
            ),
            "duplicate_of": duplicate["duplicate_of"],
        }
        await tools.record_experiment_result(
            memory_key,
            result,
            actor="parallelized-autoresearch",
        )
        return result
    bundle = await build_bundle()
    cache_dir = await materialize_cache(bundle)
    result = await tools.run_train_in_sandbox(
        cache_dir,
        train_py,
        title=title,
        time_budget_sec=time_budget_sec,
        config_overrides=config_overrides or None,
    )
    if result.get("success"):
        await tools.record_promising_run(memory_key, title, result)
        await tools.register_config_signature(
            memory_key,
            title,
            train_py,
            config_overrides,
            actor="parallelized-autoresearch",
        )
    await tools.record_experiment_result(
        memory_key,
        result,
        actor="parallelized-autoresearch",
    )
    return result

# ``flyte.map`` invokes ``run_experiment.aio`` directly (not through the agent
# registry), so bind the LLM callback and model here for ``call_handler`` right-sizing.
run_experiment = dataclasses.replace(
    run_experiment,
    call_llm=tools.call_llm,
    model=MODEL,
)

Batch fan-out calls flyte.map.aio(run_experiment, ...) from run_experiment_batch. That path invokes run_experiment.aio() directly — not through the agent registry — so the example binds call_llm and model on the tool after construction (see the dataclasses.replace block above). With Flyte SDK ≥ 2.5.5, AgentTool.aio routes through call_handler, so every mapped experiment gets LLM right-sizing even when the agent only exposes run_experiment_batch in code mode.

The fan-out agent task

The driver task parallelized_autoresearch restores prior memory (default key parallelized-autoresearch), streams Activity / Leaderboard / Code edits / Memory report tabs, and runs the code-mode agent loop. The agent tool registry is trimmed to the batch workflow — run_experiment is internal to run_experiment_batch, not a sandbox function the LLM calls directly.

parallelized_autoresearch.py
@agent_env.task(report=True)
async def parallelized_autoresearch(
    n_experiments: int = 6,
    num_shards: int = DEFAULT_NUM_SHARDS,
    memory_key: str = tools.MEMORY_KEY_FANOUT,
    batch_size: int = 3,
    max_turns: int = DEFAULT_MAX_TURNS,
) -> AutoresearchOutput:
    """Drive the fan-out code-edit MLE agent with sandbox batch execution."""
    bundle = await build_bundle(num_shards=num_shards)
    profile = await profile_bundle(bundle)

    memory = await MemoryStore.get_or_create.aio(key=memory_key)
    persisted = await memory.read_json.aio("memory/leaderboard.json", default=[])
    promising = await memory.read_json.aio("memory/promising_code.json", default=[])
    history = await tools.load_research_history(memory_key)
    flyte.logger.info(
        "Fan-out agent restored %d messages, %d experiments, %d promising edits, best val_bpb=%s.",
        len(memory.messages),
        len(persisted),
        len(promising),
        history.get("best_val_bpb"),
    )

    events: list[dict[str, Any]] = []

    async def on_event(ev) -> None:
        events.append({"type": ev.type, "data": ev.data})
        if ev.type in ("tool_start", "tool_end", "tool_error", "turn_start", "agent_end"):
            tab = flyte.report.get_tab("Activity")
            tab.replace(ui.render_activity_log(events))
            await flyte.report.flush.aio()
        if ev.type == "tool_end" and ev.data.get("tool") in (
            "edit_train_code_batch",
            "<sandbox>",
        ):
            edits = await tools.load_saved_code_edits(memory_key)
            if edits:
                flyte.report.get_tab("Code edits").replace(ui.render_code_edits_panel(edits))
                await flyte.report.flush.aio()

    directive_text = ui.directive_code_edit_fanout(
        n_experiments,
        profile,
        memory_key,
        batch_size=batch_size,
        history=history,
    )

    token = agent_progress_cb.set(on_event)
    run_agent = build_fanout_agent(max_turns=max_turns)
    try:
        result = await run_agent.run.aio(directive_text, memory=memory)
    finally:
        agent_progress_cb.reset(token)

    leaderboard, best = ui.parse_leaderboard(
        memory.messages,
        promising_fallback=promising,
    )
    leaderboard_dicts = [dataclasses.asdict(e) for e in leaderboard]
    code_edits = await tools.load_saved_code_edits(memory_key)

    tab_lb = flyte.report.get_tab("Leaderboard")
    tab_lb.replace(ui.render_leaderboard(leaderboard, best))

    flyte.report.get_tab("Code edits").replace(
        ui.render_code_edits_panel(code_edits, best_title=best.title if best else None)
    )

    await memory.write_json.aio(
        "memory/leaderboard.json",
        leaderboard_dicts,
        actor="parallelized-autoresearch",
        reason=f"leaderboard after {len(leaderboard)} experiments",
    )
    await memory.save.aio()
    audit = await memory.audit_tail(20)
    hypotheses = await memory.read_json.aio("memory/hypotheses.json", default=[])
    promising = await memory.read_json.aio("memory/promising_code.json", default=[])

    tab_mem = flyte.report.get_tab("Memory")
    tab_mem.replace(
        ui.render_memory_panel(
            memory_key,
            len(memory.messages),
            leaderboard_dicts,
            audit,
            hypotheses,
            persisted_promising=promising,
            code_edits=code_edits,
        )
    )

    summary_body = result.summary or result.error or ""
    if result.error and leaderboard:
        best_line = f" Best val_bpb so far: {best.val_bpb} ({best.title})." if best and best.val_bpb else ""
        summary_body = f"{result.error}{best_line}"

    await flyte.report.replace.aio(
        ui.render_summary(
            directive_text,
            leaderboard,
            best,
            summary_body,
            code_edits=code_edits,
        )
    )
    await flyte.report.flush.aio()

    return AutoresearchOutput(
        directive=directive_text,
        dataset_profile=profile,
        best=best,
        leaderboard=leaderboard,
        summary=summary_body,
        memory_key=memory_key,
        total_experiments=len(leaderboard),
    )

Run the agent

Create secrets

Register an Anthropic API key for agent LLM calls and for per-experiment resource sizing inside call_handler:

flyte create secret internal-anthropic-api-key <YOUR_ANTHROPIC_API_KEY>

Run remotely

From the example directory:

cd v2/tutorials/parallelized_autoresearch
uv run --script parallelized_autoresearch.py --n-experiments 6 --batch-size 3 --num-shards 1

Use --memory-key to resume a prior research session (default: parallelized-autoresearch). Pass a unique key — for example parallelized-autoresearch-20260622-215057 — to start with empty memory. Code mode needs more turns than JSON tool mode — increase --max-turns for larger sweeps.

Or invoke the agent task directly with flyte run (snake_case task inputs):

flyte run parallelized_autoresearch.py parallelized_autoresearch \
  --n_experiments 6 --batch_size 3 --num_shards 1 --max_turns 12 \
  --memory_key parallelized-autoresearch

The first run downloads climbmix data shards and trains a BPE tokenizer. Subsequent runs reuse cached bundle tasks. Requires Flyte SDK ≥ 2.5.5 for call_handler support in code mode and on AgentTool.aio (used by flyte.map fan-out).

See also the single-task Autoresearch agent tutorial for the Claude Code + pull-request workflow.