peargent.

Async Streaming

Run multiple agents concurrently with non-blocking streaming

Async streaming allows your application to handle multiple agent requests at the same time without blocking. This is essential for:

  • Web Servers: Handling multiple user requests in FastAPI or Django.
  • Parallel Processing: Running multiple agents simultaneously (e.g., a Researcher and a Reviewer).

Quick Start

Use astream() with async for to stream responses asynchronously.

import asyncio
from peargent import create_agent
from peargent.models import openai

agent = create_agent(
    name="AsyncAgent",
    description="Async streaming agent",
    persona="You are helpful.",
    model= openai("gpt-4o")
)

async def main():
    print("Agent: ", end="", flush=True)
    
    # Use 'async for' with 'astream'
    async for chunk in agent.astream("Hello, how are you?"):
        print(chunk, end="", flush=True)

if __name__ == "__main__":
    asyncio.run(main())

Running Agents Concurrently

The real power of async comes when you run multiple things at once. Here is how to run two agents in parallel using asyncio.gather().

import asyncio
from peargent import create_agent
from peargent.models import openai

# Create two agents
agent1 = create_agent(name="Agent1", persona="You are concise.", model=openai("gpt-4o"))
agent2 = create_agent(name="Agent2", persona="You are verbose.", model=openai("gpt-4o"))

async def run_agent(agent, query, label):
    print(f"[{label}] Starting...")
    async for chunk in agent.astream(query):
        # In a real app, you might send this to a websocket
        pass 
    print(f"[{label}] Finished!")

async def main():
    # Run both agents at the same time
    await asyncio.gather(
        run_agent(agent1, "Explain Quantum Physics", "Agent 1"),
        run_agent(agent2, "Explain Quantum Physics", "Agent 2")
    )

asyncio.run(main())

Result: Both agents start processing immediately. You don't have to wait for Agent 1 to finish before Agent 2 starts.

Async with Metadata

Just like the synchronous version, you can use astream_observe() to get metadata asynchronously.

async for update in agent.astream_observe("Query"):
    if update.is_token:
        print(update.content, end="")
    elif update.is_agent_end:
        print(f"\nCost: ${update.cost}")

Async Pools

Pools also support async streaming, allowing you to run multi-agent workflows without blocking.

# Stream text chunks from a pool asynchronously
async for chunk in pool.astream("Query"):
    print(chunk, end="", flush=True)

# Stream rich updates from a pool asynchronously
async for update in pool.astream_observe("Query"):
    if update.is_token:
        print(update.content, end="")

Web Server Example (FastAPI)

Async streaming is the standard way to build AI endpoints in FastAPI.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get("/chat")
async def chat(query: str):
    async def generate():
        async for chunk in agent.astream(query):
            yield chunk

    return StreamingResponse(generate(), media_type="text/plain")

What's Next?

Tracing & Observability Learn how to monitor your async agents in production.