Rich Streaming (Observe)
Get metadata like tokens, cost, and duration while streaming
While agent.stream() gives you just the text, agent.stream_observe() provides rich updates containing metadata. This is essential for production applications where you need to track costs, monitor performance, or show progress indicators.
Quick Start
Use stream_observe() to receive StreamUpdate objects. You can check the type of update to handle text chunks and final metadata differently.
from peargent import create_agent
from peargent.models import openai
agent = create_agent(
name="ObservableAgent",
description="Agent with observable execution",
persona="You are helpful.",
model=openai("gpt-5")
)
print("Agent: ", end="", flush=True)
for update in agent.stream_observe("What is the capital of France?"):
# 1. Handle text tokens
if update.is_token:
print(update.content, end="", flush=True)
# 2. Handle completion (metadata)
elif update.is_agent_end:
print(f"\n\n--- Metadata ---")
print(f"Tokens: {update.tokens}")
print(f"Cost: ${update.cost:.6f}")
print(f"Time: {update.duration:.2f}s")Output:
Agent: The capital of France is Paris.
--- Metadata ---
Tokens: 15
Cost: $0.000012
Time: 0.45sThe StreamUpdate Object
Each item yielded by stream_observe() is a StreamUpdate object with helpful properties:
| Property | Description |
|---|---|
is_token | True if this update contains a text chunk. |
content | The text chunk (only available when is_token is True). |
is_agent_end | True when the agent has finished generating. |
tokens | Total tokens used (available on is_agent_end). |
cost | Total cost in USD (available on is_agent_end). |
duration | Time taken in seconds (available on is_agent_end). |
is_agent_start | True when the agent starts working. |
Update Types
The UpdateType enum defines all possible event types during streaming:
| Type | Description |
|---|---|
AGENT_START | Agent execution started. |
TOKEN | A text chunk was generated. |
AGENT_END | Agent execution completed. |
POOL_START | Pool execution started. |
POOL_END | Pool execution completed. |
TOOL_START | Tool execution started. |
TOOL_END | Tool execution completed. |
ERROR | An error occurred during streaming. |
Streaming with Pools
When using pool.stream_observe(), you get additional event types to track the pool's lifecycle.
from peargent import UpdateType
for update in pool.stream_observe("Query"):
# Pool Events
if update.type == UpdateType.POOL_START:
print("[Pool Started]")
# Agent Events (same as single agent)
elif update.is_agent_start:
print(f"\n[Agent: {update.agent}]")
elif update.is_token:
print(update.content, end="", flush=True)
# Pool Finished
elif update.type == UpdateType.POOL_END:
print(f"\n[Pool Finished] Total Cost: ${update.cost}")What's Next?
Async Streaming Learn how to use these features in async environments for high concurrency.
Tracing & Observability For deep debugging and historical logs, combine streaming with Peargent's tracing system.