Skip to main content

Memory Module Overview

The ARKOS Memory Module implements a dual-tier memory system combining PostgreSQL for conversation context storage with Mem0/Supabase for semantic long-term memory retrieval.

Core Concepts

The Memory Module provides both short-term conversation context (PostgreSQL) and long-term semantic memory (Mem0 with Supabase vector store) for personalized AI interactions.

Memory Architecture

The system uses two complementary storage backends:
  1. Short-term Memory (PostgreSQL): Stores recent conversation turns with full message content
  2. Long-term Memory (Mem0/Supabase): Vector-based semantic storage for retrieval across sessions

Architecture Diagram

Message Types

All memory operations use Pydantic message classes:
from model_module.ArkModelNew import (
    Message,
    UserMessage,
    AIMessage,
    SystemMessage,
    ToolMessage
)

# User message
user_msg = UserMessage(content="Hello, how are you?")

# AI response
ai_msg = AIMessage(content="I'm doing well, thanks!")

# System instruction
system_msg = SystemMessage(content="You are a helpful assistant")

# Tool result
tool_msg = ToolMessage(content='{"result": "success"}')

Memory Class

Initialization

from memory_module.memory import Memory

memory = Memory(
    user_id="alice",           # Required: identifies the user
    session_id=None,           # Optional: auto-generates UUID if None
    db_url="postgresql://...", # Required: PostgreSQL connection string
    use_long_term=True         # Optional: enable Mem0 (default: True)
)

Configuration

The Memory class uses global configuration for Mem0:
# Configured in memory_module/memory.py
config = {
    "vector_store": {
        "provider": "supabase",
        "config": {
            "connection_string": os.environ["DB_URL"],
            "collection_name": "memories",
            "index_method": "hnsw",
            "index_measure": "cosine_distance",
        },
    },
    "llm": {
        "provider": "vllm",
        "config": {
            "model": "Qwen/Qwen2.5-7B-Instruct",
            "vllm_base_url": "http://localhost:30000/v1",
        },
    },
    "embedder": {
        "provider": "huggingface",
        "config": {"huggingface_base_url": "http://localhost:4444/v1"},
    },
}

Memory Operations

Adding Memories

Store a message to both PostgreSQL (immediate) and Mem0 (background):
from model_module.ArkModelNew import UserMessage, AIMessage

# Add user message
memory.add_memory(UserMessage(content="My favorite color is blue"))

# Add AI response
memory.add_memory(AIMessage(content="I'll remember that you like blue!"))
Messages are stored in PostgreSQL immediately (fast, synchronous) and sent to Mem0 in a background thread (non-blocking) for long-term storage.

Retrieving Short-term Memory

Get recent conversation turns from PostgreSQL:
# Get last 5 conversation turns
context = memory.retrieve_short_memory(turns=5)

# Returns list of Message objects in chronological order
for msg in context:
    print(f"{msg.role}: {msg.content}")

Retrieving Long-term Memory

Search for semantically relevant memories using Mem0:
# Get recent context first
context = memory.retrieve_short_memory(turns=2)

# Retrieve relevant long-term memories
long_term = memory.retrieve_long_memory(
    context=context,    # Used to build search query
    mem0_limit=10       # Max results to return
)

# Returns a SystemMessage with retrieved memories
print(long_term.content)
# Output: "retrieved memories:\nuser: My favorite color is blue\n..."

Session Management

# Start a new session (generates new UUID)
new_session_id = memory.start_new_session()

# Access current session
print(memory.session_id)

Database Schema

The PostgreSQL conversation_context table:
CREATE TABLE conversation_context (
    id SERIAL PRIMARY KEY,
    user_id VARCHAR(255) NOT NULL,
    session_id VARCHAR(255) NOT NULL,
    role VARCHAR(50) NOT NULL,
    message TEXT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_user_id ON conversation_context(user_id);
CREATE INDEX idx_session_id ON conversation_context(session_id);

Message Serialization

Messages are serialized to JSON for storage:
# Serialize message to JSON string
json_str = memory.serialize(user_msg)
# '{"content": "Hello", "role": "user"}'

# Deserialize back to Message object
msg = memory.deserialize(json_str, role="user")
# UserMessage(content="Hello", role="user")

Connection Pooling

The module uses a global connection pool for efficiency:
# Pool is initialized lazily on first use
# Configured with:
# - minconn=1 (minimum connections)
# - maxconn=10 (maximum connections)

# Connections are automatically managed
conn = pool.getconn()
try:
    # Use connection
    cur = conn.cursor()
    cur.execute(...)
    conn.commit()
finally:
    pool.putconn(conn)  # Return to pool

Background Processing

Long-term memory storage happens in a background thread to avoid blocking:
# Background executor for non-blocking mem0 operations
_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="mem0_bg")

def _add_to_mem0_background(self, content: str, metadata: dict):
    """Background task to add to mem0 (non-blocking)."""
    try:
        if self._mem0:
            self._mem0.add(
                messages=content,
                metadata=metadata,
                user_id=self.user_id
            )
    except Exception as e:
        print(f"[mem0 background] Error: {e}")

Integration with Agent

The Agent module uses Memory for context management:
class Agent:
    def __init__(self, ..., memory: Memory, ...):
        self.memory = memory

    def add_context(self, messages):
        """Store messages in memory."""
        for message in messages:
            self.memory.add_memory(message)

    def get_context(self, turns=5, include_long_term=True):
        """Retrieve context for LLM."""
        short_term = self.memory.retrieve_short_memory(turns)

        if include_long_term:
            long_term = self.memory.retrieve_long_memory(context=short_term)
            if long_term and long_term.content.strip():
                return [long_term] + short_term

        return short_term

Performance Considerations

Optimizations Implemented

  1. Connection Pooling: Reuses database connections
  2. Background Processing: Mem0 operations don’t block the main thread
  3. Lazy Initialization: Mem0 only initialized if use_long_term=True
  4. Limited Query Scope: Long-term search uses only recent context (last 2 messages)

Disabling Long-term Memory

For faster responses, disable long-term memory:
memory = Memory(
    user_id="alice",
    session_id=None,
    db_url=db_url,
    use_long_term=False  # Skip Mem0 initialization and operations
)

Environment Variables

Required environment variables:
# PostgreSQL connection (REQUIRED)
DB_URL=postgresql://user:password@host:port/database

# OpenAI API key (required by Mem0, can be placeholder)
OPENAI_API_KEY=sk-placeholder

Error Handling

The module includes robust error handling:
def add_memory(self, message) -> bool:
    """Returns True on success, False on error."""
    try:
        # Store in PostgreSQL
        # Store in Mem0 (background)
        return True
    except Exception as e:
        traceback.print_exc()
        return False

def retrieve_short_memory(self, turns):
    """Returns empty list on error."""
    try:
        # Query PostgreSQL
        return messages
    except Exception as e:
        print(f"[retrieve_short_memory] Error: {e}")
        return []

def retrieve_long_memory(self, context, mem0_limit=10):
    """Returns empty SystemMessage on error."""
    try:
        # Query Mem0
        return SystemMessage(content=memory_string)
    except Exception as e:
        print(f"[retrieve_long_memory] Error: {e}")
        return SystemMessage(content="")

Testing

Basic test example:
if __name__ == "__main__":
    test_instance = Memory(
        user_id="alice_test",
        session_id="session_test",
        db_url=os.environ["DB_URL"]
    )

    # Test adding memory
    print(test_instance.add_memory(
        SystemMessage(content="My favorite color is blue")
    ))

    # Test retrieval
    context = test_instance.retrieve_short_memory(turns=2)
    print(context)

    print(test_instance.retrieve_long_memory(context))

Next Steps