Skip to main content

Base Module Overview

The Base Module provides the primary interface layer for ARKOS, including a FastAPI server with OpenAI-compatible endpoints and an interactive CLI for testing.

Core Components

FastAPI Server

OpenAI-compatible /v1/chat/completions endpoint with streaming support

CLI Interface

Interactive command-line interface for testing and development

OAuth Authentication

Google OAuth integration for per-user tool authentication

Health Monitoring

Health check endpoint for system status

Architecture

FastAPI Server (app.py)

Starting the Server

python base_module/app.py
This starts the server on port 1111 by default.

Endpoints

POST /v1/chat/completions

OpenAI-compatible chat completions endpoint:
@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
    """OAI-compatible endpoint wrapping the full ArkOS agent."""
    payload = await request.json()

    messages = payload.get("messages", [])
    model = payload.get("model", "ark-agent")
    stream = payload.get("stream", False)
    user_id = request.headers.get("X-User-ID") or payload.get("user_id")

    # Convert to internal message format
    context_msgs = [SystemMessage(content=agent.system_prompt)]
    for msg in messages:
        if msg["role"] == "system":
            context_msgs.append(SystemMessage(content=msg["content"]))
        elif msg["role"] == "user":
            context_msgs.append(UserMessage(content=msg["content"]))
        elif msg["role"] == "assistant":
            context_msgs.append(AIMessage(content=msg["content"]))

    if stream:
        return StreamingResponse(generate_stream(), media_type="text/event-stream")

    agent_response = await agent.step(context_msgs, user_id=user_id)
    return JSONResponse(content=completion)
Request Format:
{
  "model": "ark-agent",
  "messages": [
    {"role": "system", "content": "You are helpful"},
    {"role": "user", "content": "Hello!"}
  ],
  "stream": false,
  "user_id": "alice"
}
Response Format:
{
  "id": "chatcmpl-abc123",
  "object": "chat.completion",
  "created": 1706300000,
  "model": "ark-agent",
  "choices": [{
    "index": 0,
    "message": {"role": "assistant", "content": "Hello! How can I help?"},
    "finish_reason": "stop"
  }]
}

GET /health

Health check endpoint:
@app.get("/health")
async def health_check():
    """Health check endpoint to verify server and dependencies."""
    llm_status = "unknown"
    try:
        response = requests.get("http://localhost:30000/v1/models", timeout=2)
        llm_status = "running" if response.status_code == 200 else "error"
    except:
        llm_status = "not_running"

    return JSONResponse(content={
        "status": "ok",
        "llm_server": llm_status,
        "port": 1111
    })

Streaming Responses

async def generate_stream():
    chunk_id = f"chatcmpl-{uuid.uuid4().hex[:8]}"

    async for chunk in agent.step_stream(context_msgs, user_id=user_id):
        data = {
            "id": chunk_id,
            "object": "chat.completion.chunk",
            "created": int(time.time()),
            "model": model,
            "choices": [{
                "index": 0,
                "delta": {"content": chunk},
                "finish_reason": None,
            }],
        }
        yield f"data: {json.dumps(data)}\n\n"

    # Final chunk
    yield f"data: {json.dumps(final_data)}\n\n"
    yield "data: [DONE]\n\n"

Startup Initialization

@app.on_event("startup")
async def startup():
    base_system_prompt = config.get("app.system_prompt") or ""

    if tool_manager:
        await tool_manager.initialize_servers()
        print(f"Initialized {len(tool_manager.clients)} MCP servers")

        agent.available_tools = await tool_manager.list_all_tools()
        print(f"Available tools: {list(agent.available_tools.keys())}")

        tool_prompt = format_tools_for_system_prompt(agent.available_tools)
        agent.system_prompt = base_system_prompt + "\n\n" + tool_prompt
    else:
        agent.system_prompt = base_system_prompt

CLI Interface (main_interface.py)

Starting the CLI

python base_module/main_interface.py

Features

  • Interactive conversation loop
  • Type messages and press Enter to send
  • Type exit or quit to stop
  • Rich terminal output support (main_interface_rich.py)

Basic Usage

from base_module.main_interface import MainInterface

interface = MainInterface(
    config_path="config_module/config.yaml"
)

interface.run_interactive()

OAuth Authentication (auth.py)

Routes for per-user OAuth authentication:
from base_module.auth import router as auth_router

app.include_router(auth_router)

# Provides:
# GET /auth/google/login?user_id=alice
# GET /auth/google/callback

Google Calendar OAuth Flow

  1. User calls /auth/google/login?user_id=alice
  2. Redirects to Google OAuth consent screen
  3. Google redirects to /auth/google/callback
  4. Token stored in UserTokenStore
  5. User can now use Google Calendar tools

Configuration

Environment Variables

# .env
# Database (REQUIRED)
DB_URL=postgresql://user:pass@host:port/db

# Google OAuth (for calendar integration)
GOOGLE_OAUTH_CREDENTIALS=/path/to/credentials.json

# MCP tool credentials
BRAVE_API_KEY=your-api-key

YAML Configuration

# config_module/config.yaml
app:
  host: "0.0.0.0"
  port: 1111
  reload: false
  system_prompt: "You are a helpful AI assistant."

llm:
  base_url: "http://localhost:30000/v1"

database:
  url: "${DB_URL}"

memory:
  user_id: "default_user"
  use_long_term: false

state:
  graph_path: "state_module/state_graph.yaml"

mcp_servers:
  google-calendar:
    transport: stdio
    command: npx
    args: ["-y", "@anthropic/google-calendar-mcp"]

Using the API

Python Client

from openai import OpenAI

client = OpenAI(
    base_url="http://localhost:1111/v1",
    api_key="not-needed"
)

# Non-streaming
response = client.chat.completions.create(
    model="ark-agent",
    messages=[{"role": "user", "content": "Hello!"}]
)
print(response.choices[0].message.content)

# Streaming
stream = client.chat.completions.create(
    model="ark-agent",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True
)
for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")

cURL

# Non-streaming
curl -X POST http://localhost:1111/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{
    "model": "ark-agent",
    "messages": [{"role": "user", "content": "Hello!"}]
  }'

# Streaming
curl -X POST http://localhost:1111/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{
    "model": "ark-agent",
    "messages": [{"role": "user", "content": "Hello!"}],
    "stream": true
  }'

# With user ID for tool auth
curl -X POST http://localhost:1111/v1/chat/completions \
  -H "Content-Type: application/json" \
  -H "X-User-ID: alice" \
  -d '{
    "model": "ark-agent",
    "messages": [{"role": "user", "content": "What's on my calendar?"}]
  }'

Tool System Prompt

Tools are formatted for the system prompt:
def format_tools_for_system_prompt(tools: dict) -> str:
    lines = [
        "You have access to the following tools.",
        "Use them when appropriate. Only call tools that are listed below.",
        ""
    ]

    for name, tool in tools.items():
        lines.append(f"Tool name: {name}")
        if getattr(tool, "description", None):
            lines.append(f"Description: {tool.description}")
        if getattr(tool, "input_schema", None):
            lines.append("Input schema:")
            lines.append(str(tool.input_schema))
        lines.append("")

    return "\n".join(lines)

Error Handling

The server handles common error cases:
  • Missing LLM server: Health check reports llm_server: "not_running"
  • Database errors: Logged with traceback
  • Tool auth required: Returns structured error for frontend handling

Deployment

Development

python base_module/app.py

Production with Uvicorn

uvicorn base_module.app:app \
  --host 0.0.0.0 \
  --port 1111 \
  --workers 4

Shared Server Considerations

On shared servers like MIT SIPB:
  1. Check if LLM server is already running on port 30000
  2. Use unique ports if running multiple instances
  3. Coordinate with team on port assignments

Troubleshooting

Ensure the server is running:
python base_module/app.py
Check if SGLang is running:
curl http://localhost:30000/v1/models
Verify user has connected the service:
status = tool_manager.get_user_service_status(user_id)
Find and kill the process:
lsof -i :1111
kill -9 <PID>

Next Steps