peargent.

Custom Storage Backends

Create custom storage backends for Peargent history.

For production-grade backends with complex requirements, you can create a custom storage backend by subclassing HistoryStore. This allows you to persist conversation history in any database or storage system of your choice, such as MongoDB, PostgreSQL, Redis, or even a custom API.

Subclassing HistoryStore

To create a custom store, you need to implement the abstract methods defined in the HistoryStore class. Here is a comprehensive example using MongoDB.

1. Initialization and Setup

First, set up your class and initialize the database connection. You should also ensure any necessary indexes are created for performance.

from peargent.storage import HistoryStore, Thread, Message
from typing import Dict, List, Optional, Any
from datetime import datetime

class MongoDBHistoryStore(HistoryStore):
    """Custom MongoDB storage backend."""

    def __init__(self, connection_string: str, database: str = "peargent"):
        from pymongo import MongoClient
        self.client = MongoClient(connection_string)
        self.db = self.client[database]
        self.threads = self.db.threads
        self.messages = self.db.messages

        # Create indexes for performance
        # Indexing 'id' ensures fast thread lookups
        self.threads.create_index("id", unique=True)
        # Compound index on 'thread_id' and 'timestamp' speeds up message retrieval
        self.messages.create_index([("thread_id", 1), ("timestamp", 1)])

2. Thread Management

Implement methods to create, retrieve, and list threads.

Creating Threads: When creating a thread, you must persist its ID, creation time, and any initial metadata.

    def create_thread(self, metadata: Optional[Dict] = None) -> str:
        thread = Thread(metadata=metadata)
        self.threads.insert_one({
            "id": thread.id,
            "created_at": thread.created_at,
            "updated_at": thread.updated_at,
            "metadata": thread.metadata
        })
        return thread.id

Retrieving Threads: When retrieving a thread, you need to reconstruct the Thread object from your database record. Crucially, you must also load the associated messages and attach them to the thread.

    def get_thread(self, thread_id: str) -> Optional[Thread]:
        thread_data = self.threads.find_one({"id": thread_id})
        if not thread_data:
            return None

        thread = Thread(
            thread_id=thread_data["id"],
            metadata=thread_data.get("metadata", {}),
            created_at=thread_data["created_at"],
            updated_at=thread_data["updated_at"]
        )

        # Load messages associated with this thread, sorted by timestamp
        messages = self.messages.find({"thread_id": thread_id}).sort("timestamp", 1)
        for msg_data in messages:
            msg = Message(
                role=msg_data["role"],
                content=msg_data["content"],
                agent=msg_data.get("agent"),
                tool_call=msg_data.get("tool_call"),
                metadata=msg_data.get("metadata", {}),
                message_id=msg_data["id"],
                timestamp=msg_data["timestamp"]
            )
            thread.messages.append(msg)

        return thread

3. Message Persistence

Implement the logic to save new messages.

Appending Messages: This method is called whenever a new message is added to the history. You should save the message and update the thread's updated_at timestamp.

    def append_message(
        self,
        thread_id: str,
        role: str,
        content: Any,
        agent: Optional[str] = None,
        tool_call: Optional[Dict] = None,
        metadata: Optional[Dict] = None
    ) -> Message:
        message = Message(
            role=role,
            content=content,
            agent=agent,
            tool_call=tool_call,
            metadata=metadata
        )

        self.messages.insert_one({
            "id": message.id,
            "thread_id": thread_id,
            "timestamp": message.timestamp,
            "role": message.role,
            "content": message.content,
            "agent": message.agent,
            "tool_call": message.tool_call,
            "metadata": message.metadata
        })

        # Update thread's updated_at timestamp
        self.threads.update_one(
            {"id": thread_id},
            {"$set": {"updated_at": datetime.now()}}
        )

        return message

4. Utility Methods

Implement the remaining utility methods for listing and deleting.

    def get_messages(self, thread_id: str) -> List[Message]:
        """Retrieve all messages for a specific thread."""
        thread = self.get_thread(thread_id)
        return thread.messages if thread else []

    def list_threads(self) -> List[str]:
        """Return a list of all thread IDs."""
        return [t["id"] for t in self.threads.find({}, {"id": 1})]

    def delete_thread(self, thread_id: str) -> bool:
        """Delete a thread and all its associated messages."""
        result = self.threads.delete_one({"id": thread_id})
        if result.deleted_count > 0:
            self.messages.delete_many({"thread_id": thread_id})
            return True
        return False

Usage

Once your class is defined, you can use it just like any built-in storage backend.

With create_agent (Automatic Integration)

You can pass your custom storage backend directly to create_agent using HistoryConfig:

from peargent import create_agent, HistoryConfig
from peargent.models import openai

# Initialize your custom store
store = MongoDBHistoryStore(connection_string="mongodb://localhost:27017")

# Create agent with custom storage backend
agent = create_agent(
    name="Assistant",
    description="A helpful assistant with MongoDB history",
    persona="You are a helpful AI assistant.",
    model=openai("gpt-4o"),
    history=HistoryConfig(
        auto_manage_context=True,
        max_context_messages=20,
        strategy="smart",
        store=store  # Your custom storage backend
    )
)

# Use the agent - history is automatically managed
response1 = agent.run("My name is Alice")
# Agent creates thread and stores message in MongoDB

response2 = agent.run("What's my name?")
# Agent loads history from MongoDB and remembers: "Your name is Alice"

With create_pool (Multi-Agent with Custom Storage)

You can also use custom storage backends with agent pools for shared history across multiple agents:

from peargent import create_agent, create_pool, HistoryConfig
from peargent.models import openai

# Initialize your custom store
store = MongoDBHistoryStore(connection_string="mongodb://localhost:27017")

# Create multiple agents
researcher = create_agent(
    name="Researcher",
    description="Researches topics thoroughly",
    persona="You are a detail-oriented researcher.",
    model=openai("gpt-4o-mini")
)

writer = create_agent(
    name="Writer",
    description="Writes clear summaries",
    persona="You are a skilled technical writer.",
    model=openai("gpt-4o")
)

# Create pool with custom storage - all agents share the same MongoDB history
pool = create_pool(
    agents=[researcher, writer],
    default_model=openai("gpt-4o"),
    history=HistoryConfig(
        auto_manage_context=True,
        max_context_messages=25,
        strategy="smart",
        store=store  # Shared custom storage for all agents
    )
)

# Use the pool
result = pool.run("Research quantum computing and write a summary")
# Both agents' interactions are stored in MongoDB