Memory Module Overview
The ARKOS Memory Module implements a dual-tier memory system combining PostgreSQL for conversation context storage with Mem0/Supabase for semantic long-term memory retrieval.
Core Concepts
The Memory Module provides both short-term conversation context (PostgreSQL) and long-term semantic memory (Mem0 with Supabase vector store) for personalized AI interactions.
Memory Architecture
The system uses two complementary storage backends:
- Short-term Memory (PostgreSQL): Stores recent conversation turns with full message content
- Long-term Memory (Mem0/Supabase): Vector-based semantic storage for retrieval across sessions
Architecture Diagram
Message Types
All memory operations use Pydantic message classes:
from model_module.ArkModelNew import (
Message,
UserMessage,
AIMessage,
SystemMessage,
ToolMessage
)
# User message
user_msg = UserMessage(content="Hello, how are you?")
# AI response
ai_msg = AIMessage(content="I'm doing well, thanks!")
# System instruction
system_msg = SystemMessage(content="You are a helpful assistant")
# Tool result
tool_msg = ToolMessage(content='{"result": "success"}')
Memory Class
Initialization
from memory_module.memory import Memory
memory = Memory(
user_id="alice", # Required: identifies the user
session_id=None, # Optional: auto-generates UUID if None
db_url="postgresql://...", # Required: PostgreSQL connection string
use_long_term=True # Optional: enable Mem0 (default: True)
)
Configuration
The Memory class uses global configuration for Mem0:
# Configured in memory_module/memory.py
config = {
"vector_store": {
"provider": "supabase",
"config": {
"connection_string": os.environ["DB_URL"],
"collection_name": "memories",
"index_method": "hnsw",
"index_measure": "cosine_distance",
},
},
"llm": {
"provider": "vllm",
"config": {
"model": "Qwen/Qwen2.5-7B-Instruct",
"vllm_base_url": "http://localhost:30000/v1",
},
},
"embedder": {
"provider": "huggingface",
"config": {"huggingface_base_url": "http://localhost:4444/v1"},
},
}
Memory Operations
Adding Memories
Store a message to both PostgreSQL (immediate) and Mem0 (background):
from model_module.ArkModelNew import UserMessage, AIMessage
# Add user message
memory.add_memory(UserMessage(content="My favorite color is blue"))
# Add AI response
memory.add_memory(AIMessage(content="I'll remember that you like blue!"))
Messages are stored in PostgreSQL immediately (fast, synchronous) and sent to Mem0 in a background thread (non-blocking) for long-term storage.
Retrieving Short-term Memory
Get recent conversation turns from PostgreSQL:
# Get last 5 conversation turns
context = memory.retrieve_short_memory(turns=5)
# Returns list of Message objects in chronological order
for msg in context:
print(f"{msg.role}: {msg.content}")
Retrieving Long-term Memory
Search for semantically relevant memories using Mem0:
# Get recent context first
context = memory.retrieve_short_memory(turns=2)
# Retrieve relevant long-term memories
long_term = memory.retrieve_long_memory(
context=context, # Used to build search query
mem0_limit=10 # Max results to return
)
# Returns a SystemMessage with retrieved memories
print(long_term.content)
# Output: "retrieved memories:\nuser: My favorite color is blue\n..."
Session Management
# Start a new session (generates new UUID)
new_session_id = memory.start_new_session()
# Access current session
print(memory.session_id)
Database Schema
The PostgreSQL conversation_context table:
CREATE TABLE conversation_context (
id SERIAL PRIMARY KEY,
user_id VARCHAR(255) NOT NULL,
session_id VARCHAR(255) NOT NULL,
role VARCHAR(50) NOT NULL,
message TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_user_id ON conversation_context(user_id);
CREATE INDEX idx_session_id ON conversation_context(session_id);
Message Serialization
Messages are serialized to JSON for storage:
# Serialize message to JSON string
json_str = memory.serialize(user_msg)
# '{"content": "Hello", "role": "user"}'
# Deserialize back to Message object
msg = memory.deserialize(json_str, role="user")
# UserMessage(content="Hello", role="user")
Connection Pooling
The module uses a global connection pool for efficiency:
# Pool is initialized lazily on first use
# Configured with:
# - minconn=1 (minimum connections)
# - maxconn=10 (maximum connections)
# Connections are automatically managed
conn = pool.getconn()
try:
# Use connection
cur = conn.cursor()
cur.execute(...)
conn.commit()
finally:
pool.putconn(conn) # Return to pool
Background Processing
Long-term memory storage happens in a background thread to avoid blocking:
# Background executor for non-blocking mem0 operations
_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="mem0_bg")
def _add_to_mem0_background(self, content: str, metadata: dict):
"""Background task to add to mem0 (non-blocking)."""
try:
if self._mem0:
self._mem0.add(
messages=content,
metadata=metadata,
user_id=self.user_id
)
except Exception as e:
print(f"[mem0 background] Error: {e}")
Integration with Agent
The Agent module uses Memory for context management:
class Agent:
def __init__(self, ..., memory: Memory, ...):
self.memory = memory
def add_context(self, messages):
"""Store messages in memory."""
for message in messages:
self.memory.add_memory(message)
def get_context(self, turns=5, include_long_term=True):
"""Retrieve context for LLM."""
short_term = self.memory.retrieve_short_memory(turns)
if include_long_term:
long_term = self.memory.retrieve_long_memory(context=short_term)
if long_term and long_term.content.strip():
return [long_term] + short_term
return short_term
Optimizations Implemented
- Connection Pooling: Reuses database connections
- Background Processing: Mem0 operations don’t block the main thread
- Lazy Initialization: Mem0 only initialized if
use_long_term=True
- Limited Query Scope: Long-term search uses only recent context (last 2 messages)
Disabling Long-term Memory
For faster responses, disable long-term memory:
memory = Memory(
user_id="alice",
session_id=None,
db_url=db_url,
use_long_term=False # Skip Mem0 initialization and operations
)
Environment Variables
Required environment variables:
# PostgreSQL connection (REQUIRED)
DB_URL=postgresql://user:password@host:port/database
# OpenAI API key (required by Mem0, can be placeholder)
OPENAI_API_KEY=sk-placeholder
Error Handling
The module includes robust error handling:
def add_memory(self, message) -> bool:
"""Returns True on success, False on error."""
try:
# Store in PostgreSQL
# Store in Mem0 (background)
return True
except Exception as e:
traceback.print_exc()
return False
def retrieve_short_memory(self, turns):
"""Returns empty list on error."""
try:
# Query PostgreSQL
return messages
except Exception as e:
print(f"[retrieve_short_memory] Error: {e}")
return []
def retrieve_long_memory(self, context, mem0_limit=10):
"""Returns empty SystemMessage on error."""
try:
# Query Mem0
return SystemMessage(content=memory_string)
except Exception as e:
print(f"[retrieve_long_memory] Error: {e}")
return SystemMessage(content="")
Testing
Basic test example:
if __name__ == "__main__":
test_instance = Memory(
user_id="alice_test",
session_id="session_test",
db_url=os.environ["DB_URL"]
)
# Test adding memory
print(test_instance.add_memory(
SystemMessage(content="My favorite color is blue")
))
# Test retrieval
context = test_instance.retrieve_short_memory(turns=2)
print(context)
print(test_instance.retrieve_long_memory(context))
Next Steps