Skip to main content

Model Module Overview

The ARKOS Model Module provides an asynchronous interface for communicating with LLMs using the AsyncOpenAI client. It’s designed to work with SGLANG servers and any OpenAI-compatible endpoint.

Core Components

ArkModelLink

Main class for LLM communication using AsyncOpenAI

Message Classes

Pydantic models for different message types

Async Support

Non-blocking I/O for better performance

Streaming

Real-time token streaming for responsive UX

Architecture

Message Classes

All messages extend the base Message Pydantic model:
from model_module.ArkModelNew import (
    Message,
    UserMessage,
    AIMessage,
    SystemMessage,
    ToolMessage
)

# Base class
class Message(BaseModel):
    content: str
    role: str

# User input
user_msg = UserMessage(content="Hello!")
# role = "user"

# AI response
ai_msg = AIMessage(content="Hi there!")
# role = "assistant"
# content can be None for tool-only responses
# tool_calls: Optional[dict] = None

# System instruction
system_msg = SystemMessage(content="You are helpful")
# role = "system"

# Tool result
tool_msg = ToolMessage(content='{"result": "data"}')
# role = "tool"
# tool_calls: Optional[dict] = None

Initialization

from model_module.ArkModelNew import ArkModelLink

# Default configuration
llm = ArkModelLink()
# base_url = "http://0.0.0.0:30000/v1"
# model_name = "tgi"
# max_tokens = 1024
# temperature = 0.7

# Custom configuration
llm = ArkModelLink(
    base_url="http://localhost:30000/v1",
    model_name="qwen-2.5",
    max_tokens=2048,
    temperature=0.5
)

Configuration Options

ParameterDefaultDescription
model_name"tgi"Model identifier for the API
base_url"http://0.0.0.0:30000/v1"LLM server endpoint
max_tokens1024Maximum response tokens
temperature0.7Creativity (0-2)

Core Methods

generate_response()

Main method for getting LLM responses:
async def generate_response(
    self,
    messages: List[Message],
    json_schema: Optional[dict] = None
) -> str:
    """
    Asynchronously generate a response from the model.

    Parameters
    ----------
    messages : List[Message]
        List of messages in the conversation
    json_schema : dict, optional
        JSON schema for structured output

    Returns
    -------
    str
        The raw response content
    """
Usage:
from model_module.ArkModelNew import ArkModelLink, UserMessage, SystemMessage

llm = ArkModelLink(base_url="http://localhost:30000/v1")

messages = [
    SystemMessage(content="You are a helpful assistant"),
    UserMessage(content="What is Python?")
]

# Basic generation
response = await llm.generate_response(messages, json_schema=None)
print(response)  # "Python is a programming language..."

generate_stream()

Stream tokens as they’re generated:
async def generate_stream(
    self,
    messages: List[Message]
) -> AsyncIterator[str]:
    """
    Stream tokens as they're generated.

    Yields
    ------
    str
        Individual tokens/chunks
    """
Usage:
async for token in llm.generate_stream(messages):
    print(token, end="", flush=True)

make_llm_call()

Low-level method for API calls:
async def make_llm_call(
    self,
    messages: List[Message],
    json_schema: Optional[dict],
    stream: bool = False
) -> Union[Dict[str, Any], str]:
    """
    Make an asynchronous call to the LLM endpoint.

    Parameters
    ----------
    messages : List[Message]
        Conversation messages
    json_schema : dict, optional
        Response format schema
    stream : bool
        Enable streaming (not fully implemented)

    Returns
    -------
    str
        The LLM response content
    """

Structured Output

Use JSON schemas for structured responses:
from pydantic import BaseModel, Field

# Define schema using Pydantic
class MovieReview(BaseModel):
    title: str = Field(description="Movie title")
    rating: int = Field(description="Rating 1-10")
    summary: str = Field(description="Brief review")

# Create JSON schema
json_schema = {
    "type": "json_schema",
    "json_schema": {
        "name": "movie_review",
        "schema": MovieReview.model_json_schema()
    }
}

# Generate structured response
response = await llm.generate_response(messages, json_schema=json_schema)

# Parse JSON response
import json
review = json.loads(response)
print(review["title"], review["rating"])

Message Formatting

Messages are automatically converted to OpenAI format:
def _format_messages(self, messages: List[Message]) -> List[Dict[str, str]]:
    """Convert Message objects to OpenAI format."""
    formatted = []
    for msg in messages:
        if isinstance(msg, (UserMessage, SystemMessage, ToolMessage)):
            formatted.append({"role": msg.role, "content": msg.content or ""})
        elif isinstance(msg, AIMessage):
            formatted.append({"role": "assistant", "content": msg.content or ""})
    return formatted

Integration with Agent

The Agent module uses ArkModelLink through call_llm():
class Agent:
    def __init__(self, ..., llm: ArkModelLink, ...):
        self.llm = llm

    async def call_llm(self, context=None, json_schema=None):
        """Agent's interface with chat model."""
        llm_response = await self.llm.generate_response(context, json_schema)
        return AIMessage(content=llm_response)

AsyncOpenAI Client

The module uses AsyncOpenAI internally:
@property
def client(self) -> AsyncOpenAI:
    """Returns the configured AsyncOpenAI client."""
    return AsyncOpenAI(
        base_url=self.base_url,
        api_key="-",  # Placeholder for local deployment
    )

Configuration via YAML

Configure the LLM endpoint in config_module/config.yaml:
llm:
  base_url: "http://localhost:30000/v1"
Access in code:
from config_module.loader import config

llm = ArkModelLink(base_url=config.get("llm.base_url"))

Error Handling

async def make_llm_call(self, messages, json_schema, stream=False):
    try:
        chat_completion = await self.client.chat.completions.create(
            model=self.model_name,
            messages=openai_messages_payload,
            max_tokens=self.max_tokens,
            temperature=self.temperature,
            response_format=json_schema,
        )
        return chat_completion.choices[0].message.content

    except Exception as e:
        print(f"Error during async LLM call: {e}")
        return f"Error: An error occurred during async LLM call: {e}"

Streaming Implementation

async def generate_stream(self, messages: List[Message]) -> AsyncIterator[str]:
    """Stream tokens as they're generated."""
    openai_messages = self._format_messages(messages)

    try:
        stream = await self.client.chat.completions.create(
            model=self.model_name,
            messages=openai_messages,
            max_tokens=self.max_tokens,
            temperature=self.temperature,
            stream=True,
        )
        async for chunk in stream:
            if chunk.choices and chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content
    except Exception as e:
        print(f"Error during streaming: {e}")
        yield f"Error: {e}"

SGLANG Server

The model module is designed to work with SGLANG:
# Start SGLANG server (port 30000)
bash model_module/run.sh
This runs Qwen 2.5-7B-Instruct with an OpenAI-compatible API.

Verify Server

curl http://localhost:30000/v1/models

Testing

Basic test example:
# model_module/tests_arkmodel.py
import asyncio
from model_module.ArkModelNew import ArkModelLink, UserMessage, SystemMessage

async def test():
    llm = ArkModelLink(base_url="http://localhost:30000/v1")

    messages = [
        SystemMessage(content="You are helpful"),
        UserMessage(content="Say hello")
    ]

    response = await llm.generate_response(messages, None)
    print(response)

asyncio.run(test())

Best Practices

  1. Use async/await: All LLM calls should be awaited
  2. Set appropriate timeouts: Prevent hanging on slow responses
  3. Handle errors gracefully: Catch exceptions and provide fallbacks
  4. Use streaming for UX: Better user experience for long responses
  5. Validate schemas: Test JSON schemas before production use

Troubleshooting

Ensure SGLANG server is running:
curl http://localhost:30000/v1/models
Check GPU utilization and model loading:
nvidia-smi
Verify your schema matches expected output:
print(json_schema)
print(response)

Next Steps