Skip to main content

Agent Module Overview

The Agent Module is the core orchestrator of ARKOS, coordinating the state machine, memory, LLM, and tool systems to provide intelligent conversational interactions.

Core Concepts

The Agent class manages the complete conversation lifecycle, including context retrieval, state transitions, tool execution, and response generation.

Architecture

Agent Class

Initialization

from agent_module.agent import Agent
from state_module.state_handler import StateHandler
from memory_module.memory import Memory
from model_module.ArkModelNew import ArkModelLink
from tool_module.tool_call import MCPToolManager

# Initialize components
flow = StateHandler(yaml_path="state_module/state_graph.yaml")

memory = Memory(
    user_id="default_user",
    session_id=None,
    db_url=db_url,
    use_long_term=False
)

llm = ArkModelLink(base_url="http://localhost:30000/v1")

tool_manager = MCPToolManager(mcp_config, token_store=token_store)

# Create agent
agent = Agent(
    agent_id="my-agent",
    flow=flow,
    memory=memory,
    llm=llm,
    tool_manager=tool_manager
)

Key Attributes

class Agent:
    agent_id: str              # Unique identifier
    flow: StateHandler         # State machine
    memory: Memory             # Memory system
    llm: ArkModelLink          # LLM interface
    tool_manager: MCPToolManager  # Tool manager (optional)

    current_state: State       # Current state in flow
    system_prompt: str         # System prompt with tools
    available_tools: dict      # Discovered MCP tools
    current_user_id: str       # For per-user tool auth

Core Methods

step() - Main Processing Loop

Process messages through the state machine:
async def step(self, messages, user_id: str = None):
    """
    Runs the agent until reaching a terminal state.

    Parameters
    ----------
    messages : list
        List of messages to process
    user_id : str, optional
        User ID for per-user tool authentication

    Returns
    -------
    AIMessage
        The last AI message produced
    """
    self.current_user_id = user_id
    self.add_context(messages)

    last_ai_message = None
    retry_count = 0

    while not self.current_state.is_terminal:
        if retry_count > MAX_ITER:  # MAX_ITER = 10
            break
        retry_count += 1

        context = self.get_context()
        update = await self.current_state.run(context, self)

        if update:
            self.add_context([update])
            if isinstance(update, AIMessage):
                last_ai_message = update

        if self.current_state.is_terminal:
            break

        # Handle state transitions
        messages_list = self.memory.retrieve_short_memory(5)
        if self.current_state.check_transition_ready(messages_list):
            transition_dict = self.flow.get_transitions(
                self.current_state, messages_list
            )
            transition_names = transition_dict["tt"]

            if len(transition_names) == 1:
                next_state_name = transition_names[0]
            else:
                next_state_name = await self.choose_transition(
                    transition_dict, messages_list
                )

            self.current_state = self.flow.get_state(next_state_name)

    # Reset to initial state for next conversation
    self.current_state = self.flow.get_state("agent_reply")
    return last_ai_message

step_stream() - Streaming Response

Stream responses character by character:
async def step_stream(self, messages, user_id: str = None):
    """
    Streaming version of step. Yields characters as they're produced.

    Yields
    ------
    str
        Characters/chunks from each state's output
    """
    self.current_user_id = user_id
    self.add_context(messages)

    retry_count = 0

    while not self.current_state.is_terminal:
        if retry_count > MAX_ITER:
            yield "\n[Max iterations reached]"
            break
        retry_count += 1

        context = self.get_context()

        try:
            update = await self.current_state.run(context, self)
        except Exception as e:
            update = AIMessage(content=f"Error: {str(e)[:200]}")
            self.current_state = self.flow.get_state("agent_reply")

        # Stream character by character
        if update and hasattr(update, 'content') and update.content:
            self.add_context([update])
            for char in update.content:
                yield char

        if self.current_state.is_terminal:
            break

        # Handle transitions (same as step)
        # ... transition logic ...

        if not self.current_state.is_terminal:
            yield "\n\n"  # Separator between states

    self.current_state = self.flow.get_state("agent_reply")

call_llm() - LLM Interface

Call the language model:
async def call_llm(self, context=None, json_schema=None):
    """
    Agent's interface with chat model.

    Parameters
    ----------
    context : list
        Messages to send to LLM
    json_schema : dict, optional
        JSON schema for structured output

    Returns
    -------
    AIMessage
        The LLM's response
    """
    llm_response = await self.llm.generate_response(context, json_schema)
    return AIMessage(content=llm_response)

Context Management

def add_context(self, messages):
    """Store messages in memory."""
    for message in messages:
        self.memory.add_memory(message)

def get_context(self, turns=5, include_long_term=True):
    """
    Retrieve context for LLM.

    Returns list of messages including:
    - Long-term memories (if enabled)
    - Recent conversation turns
    """
    short_term_mem = self.memory.retrieve_short_memory(turns)

    if include_long_term:
        long_term_mem = self.memory.retrieve_long_memory(context=short_term_mem)
        if long_term_mem and long_term_mem.content.strip():
            return [long_term_mem] + short_term_mem

    return short_term_mem

Transition Selection

When multiple state transitions are possible:
async def choose_transition(self, transitions_dict, messages):
    """Use LLM to select the best next state."""

    transition_tuples = list(zip(
        transitions_dict["tt"],
        transitions_dict["td"]
    ))

    prompt = f"""Given the conversation context and these options:
    {transition_tuples}
    Output the most reasonable next state.
    Do not use tool result to determine the next state."""

    # Create Pydantic model for structured output
    NextStates = self.create_next_state_class(transition_tuples)
    json_schema = {
        "type": "json_schema",
        "json_schema": {
            "name": "class_options",
            "schema": NextStates.model_json_schema(),
        },
    }

    context_text = [SystemMessage(content=prompt)] + messages
    output = await self.call_llm(context=context_text, json_schema=json_schema)

    return json.loads(output.content)["next_state"]

Tool Integration

Tool Argument Classes

def fill_tool_args_class(self, tool_name: str, tool_args: Dict[str, Any]):
    """Create Pydantic model for tool call."""
    ToolCall = create_model(
        "ToolCall",
        tool_name=(str, Field(description="Tool name to execute")),
        tool_args=(Dict[str, Any], Field(default_factory=dict)),
    )
    return ToolCall(tool_name=tool_name, tool_args=tool_args)

async def create_tool_option_class(self):
    """Create enum of available tools for LLM selection."""
    server_tool_map = await self.tool_manager.list_all_tools()

    enum_members = {}
    for server_name in server_tool_map:
        for tool_name in server_tool_map[server_name]:
            enum_members[tool_name] = tool_name

    ToolEnum = Enum("ToolEnum", enum_members)

    return create_model(
        "ToolCall",
        tool_name=(ToolEnum, Field(description="Tool to execute next"))
    )

Usage with FastAPI

The agent is used in the API server:
# base_module/app.py
from agent_module.agent import Agent

# Initialize agent at startup
agent = Agent(
    agent_id=config.get("memory.user_id"),
    flow=flow,
    memory=memory,
    llm=llm,
    tool_manager=tool_manager,
)

@app.on_event("startup")
async def startup():
    if tool_manager:
        await tool_manager.initialize_servers()
        agent.available_tools = await tool_manager.list_all_tools()
        agent.system_prompt = base_prompt + "\n\n" + tool_prompt

@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
    # ... parse request ...

    if stream:
        async def generate_stream():
            async for chunk in agent.step_stream(context_msgs, user_id=user_id):
                # Format as SSE
                yield f"data: {json.dumps(data)}\n\n"
        return StreamingResponse(generate_stream(), media_type="text/event-stream")

    # Non-streaming
    agent_response = await agent.step(context_msgs, user_id=user_id)
    return JSONResponse(content=completion)

Performance Timing

The agent includes built-in timing logs:
# Output during execution:
# [TIMING] add_context: 0.015s
# [TIMING] get_context: 0.102s
# [TIMING] state.run: 1.234s
# [TIMING] loop total: 1.351s
# [TIMING] step total: 2.847s

Configuration

Agents are configured via config_module/config.yaml:
memory:
  user_id: "default_user"
  use_long_term: false

llm:
  base_url: "http://localhost:30000/v1"

state:
  graph_path: "state_module/state_graph.yaml"

app:
  system_prompt: "You are a helpful AI assistant."

Error Handling

async def step_stream(self, messages, user_id=None):
    # ...
    try:
        update = await self.current_state.run(context, self)
    except Exception as e:
        print(f"[STREAM] State error: {e}")
        update = AIMessage(content=f"Error: {str(e)[:200]}")
        self.current_state = self.flow.get_state("agent_reply")
    # ...

Debug Logging

The agent logs state transitions:
print("agent.py received message")
print("agent.py CURR STATE:", self.current_state)
print("agent.py IS TERMINAL?:", self.current_state.is_terminal)
print(f"Inner loop #{retry_count + 1}")
print("REACHED TERMINAL")
print("LAST_AI_MSG", last_ai_message)

Best Practices

  1. Always set user_id: For per-user tool authentication
  2. Monitor MAX_ITER: Prevent infinite loops
  3. Use streaming: For better UX with long responses
  4. Reset state: Agent resets to initial state after each conversation
  5. Handle errors: Graceful degradation on state failures

Troubleshooting

Check state machine for deadlocks:
print(f"Current state: {agent.current_state.name}")
print(f"Is terminal: {agent.current_state.is_terminal}")
Verify tool manager is initialized:
print(f"Available tools: {agent.available_tools.keys()}")
Increase the turns parameter (config.yaml):
  short_term_turns: 50
  long_term_limit: 50

Next Steps