peargent.

Rich Streaming (Observe)

Get metadata like tokens, cost, and duration while streaming

While agent.stream() gives you just the text, agent.stream_observe() provides rich updates containing metadata. This is essential for production applications where you need to track costs, monitor performance, or show progress indicators.

Quick Start

Use stream_observe() to receive StreamUpdate objects. You can check the type of update to handle text chunks and final metadata differently.

from peargent import create_agent
from peargent.models import openai

agent = create_agent(
    name="ObservableAgent",
    description="Agent with observable execution",
    persona="You are helpful.",
    model=openai("gpt-5")
)

print("Agent: ", end="", flush=True)

for update in agent.stream_observe("What is the capital of France?"):
    # 1. Handle text tokens
    if update.is_token:
        print(update.content, end="", flush=True)
    
    # 2. Handle completion (metadata)
    elif update.is_agent_end:
        print(f"\n\n--- Metadata ---")
        print(f"Tokens: {update.tokens}")
        print(f"Cost:   ${update.cost:.6f}")
        print(f"Time:   {update.duration:.2f}s")

Output:

Agent: The capital of France is Paris.

--- Metadata ---
Tokens: 15
Cost:   $0.000012
Time:   0.45s

The StreamUpdate Object

Each item yielded by stream_observe() is a StreamUpdate object with helpful properties:

PropertyDescription
is_tokenTrue if this update contains a text chunk.
contentThe text chunk (only available when is_token is True).
is_agent_endTrue when the agent has finished generating.
tokensTotal tokens used (available on is_agent_end).
costTotal cost in USD (available on is_agent_end).
durationTime taken in seconds (available on is_agent_end).
is_agent_startTrue when the agent starts working.

Update Types

The UpdateType enum defines all possible event types during streaming:

TypeDescription
AGENT_STARTAgent execution started.
TOKENA text chunk was generated.
AGENT_ENDAgent execution completed.
POOL_STARTPool execution started.
POOL_ENDPool execution completed.
TOOL_STARTTool execution started.
TOOL_ENDTool execution completed.
ERRORAn error occurred during streaming.

Streaming with Pools

When using pool.stream_observe(), you get additional event types to track the pool's lifecycle.

from peargent import UpdateType

for update in pool.stream_observe("Query"):
    # Pool Events
    if update.type == UpdateType.POOL_START:
        print("[Pool Started]")
    
    # Agent Events (same as single agent)
    elif update.is_agent_start:
        print(f"\n[Agent: {update.agent}]")
    elif update.is_token:
        print(update.content, end="", flush=True)
        
    # Pool Finished
    elif update.type == UpdateType.POOL_END:
        print(f"\n[Pool Finished] Total Cost: ${update.cost}")

What's Next?

Async Streaming Learn how to use these features in async environments for high concurrency.

Tracing & Observability For deep debugging and historical logs, combine streaming with Peargent's tracing system.