The Agent Module is the core orchestrator of ARKOS, coordinating the state machine, memory, LLM, and tool systems to provide intelligent conversational interactions.
async def step(self, messages, user_id: str = None): """ Runs the agent until reaching a terminal state. Parameters ---------- messages : list List of messages to process user_id : str, optional User ID for per-user tool authentication Returns ------- AIMessage The last AI message produced """ self.current_user_id = user_id self.add_context(messages) last_ai_message = None retry_count = 0 while not self.current_state.is_terminal: if retry_count > MAX_ITER: # MAX_ITER = 10 break retry_count += 1 context = self.get_context() update = await self.current_state.run(context, self) if update: self.add_context([update]) if isinstance(update, AIMessage): last_ai_message = update if self.current_state.is_terminal: break # Handle state transitions messages_list = self.memory.retrieve_short_memory(5) if self.current_state.check_transition_ready(messages_list): transition_dict = self.flow.get_transitions( self.current_state, messages_list ) transition_names = transition_dict["tt"] if len(transition_names) == 1: next_state_name = transition_names[0] else: next_state_name = await self.choose_transition( transition_dict, messages_list ) self.current_state = self.flow.get_state(next_state_name) # Reset to initial state for next conversation self.current_state = self.flow.get_state("agent_reply") return last_ai_message
async def step_stream(self, messages, user_id: str = None): """ Streaming version of step. Yields characters as they're produced. Yields ------ str Characters/chunks from each state's output """ self.current_user_id = user_id self.add_context(messages) retry_count = 0 while not self.current_state.is_terminal: if retry_count > MAX_ITER: yield "\n[Max iterations reached]" break retry_count += 1 context = self.get_context() try: update = await self.current_state.run(context, self) except Exception as e: update = AIMessage(content=f"Error: {str(e)[:200]}") self.current_state = self.flow.get_state("agent_reply") # Stream character by character if update and hasattr(update, 'content') and update.content: self.add_context([update]) for char in update.content: yield char if self.current_state.is_terminal: break # Handle transitions (same as step) # ... transition logic ... if not self.current_state.is_terminal: yield "\n\n" # Separator between states self.current_state = self.flow.get_state("agent_reply")
async def choose_transition(self, transitions_dict, messages): """Use LLM to select the best next state.""" transition_tuples = list(zip( transitions_dict["tt"], transitions_dict["td"] )) prompt = f"""Given the conversation context and these options: {transition_tuples} Output the most reasonable next state. Do not use tool result to determine the next state.""" # Create Pydantic model for structured output NextStates = self.create_next_state_class(transition_tuples) json_schema = { "type": "json_schema", "json_schema": { "name": "class_options", "schema": NextStates.model_json_schema(), }, } context_text = [SystemMessage(content=prompt)] + messages output = await self.call_llm(context=context_text, json_schema=json_schema) return json.loads(output.content)["next_state"]