Skip to content

Supervisor

xronai.core.supervisor.Supervisor

Bases: AI

A Supervisor class that manages multiple specialized AI agents.

This class handles user queries, delegates tasks to appropriate agents, and coordinates complex multi-step processes.

The Supervisor can operate in two modes: 1. Main Supervisor: Creates and manages the workflow directory 2. Assistant Supervisor: Works within an existing workflow

Source code in xronai/core/supervisor.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
class Supervisor(AI):
    """
    A Supervisor class that manages multiple specialized AI agents.

    This class handles user queries, delegates tasks to appropriate agents,
    and coordinates complex multi-step processes.

    The Supervisor can operate in two modes:
    1. Main Supervisor: Creates and manages the workflow directory
    2. Assistant Supervisor: Works within an existing workflow
    """

    def __init__(self,
                 name: str,
                 llm_config: Dict[str, str],
                 workflow_id: Optional[str] = None,
                 is_assistant: bool = False,
                 system_message: Optional[str] = None,
                 use_agents: bool = True,
                 history_base_path: Optional[str] = None):
        """
        Initialize the Supervisor instance.

        Args:
            name (str): The name of the supervisor.
            llm_config (Dict[str, str]): Configuration for the language model.
            workflow_id (Optional[str]): ID for the workflow. Only used by main supervisor.
            is_assistant (bool): Whether this is an assistant supervisor.
            system_message (Optional[str]): The initial system message for the agent.
            use_agents (bool): Whether to use agents or not.
            history_base_path (Optional[str]): The root directory for storing history logs.

        Raises:
            ValueError: If the name is empty or if workflow management rules are violated.
        """
        super().__init__(llm_config=llm_config)

        if not name:
            raise ValueError("Supervisor name cannot be empty")

        self.name = "".join(name.split())
        self.is_assistant = is_assistant
        self.workflow_id = workflow_id
        self.history_base_path = history_base_path

        self.chat_history: List[Dict[str, str]] = []
        self._pending_registrations: List[Union[Agent, 'Supervisor']] = []
        self.system_message = system_message if system_message is not None else self._get_default_system_message()

        if not is_assistant:
            if workflow_id:
                try:
                    self.history_manager = HistoryManager(workflow_id, base_path=self.history_base_path)
                except ValueError:
                    self._initialize_workflow()
                    self.history_manager = HistoryManager(workflow_id, base_path=self.history_base_path)
                if not self.history_manager.has_system_message(self.name):
                    self._initialize_chat_history()
            else:
                self._initialize_workflow()
                self.history_manager = HistoryManager(self.workflow_id, base_path=self.history_base_path)
                self._initialize_chat_history()
        else:
            self.history_manager = None

        self.registered_agents: List[Union[Agent, 'Supervisor']] = []
        self.available_tools: List[Dict[str, Any]] = []
        self.use_agents = use_agents

        self.debugger = Debugger(name=self.name, workflow_id=self.workflow_id)
        self.debugger.start_session()

    def _emit_event(self, on_event: Optional[Callable], event_type: str, data: Dict[str, Any]):
        """Safely emits an event if the callback is provided."""
        if on_event:
            payload = {
                "id": f"evt_{uuid.uuid4()}",
                "timestamp": datetime.utcnow().isoformat() + "Z",
                "type": event_type,
                "data": data,
            }
            on_event(payload)

    def _get_default_system_message(self) -> str:
        """
        Get the default system message based on supervisor type.

        Returns:
            str: Appropriate system message for the supervisor type.
        """
        if self.is_assistant:
            return """You are an Assistant Supervisor, part of a larger workflow managed by the Main Supervisor. 
            Your role is to handle specialized tasks delegated to you and manage your assigned agents effectively."""
        else:
            return """You are the Main Supervisor, responsible for managing the entire workflow. 
            Your tasks include coordinating with Assistant Supervisors and direct agents, ensuring efficient task delegation 
            and execution."""

    def _initialize_chat_history(self) -> None:
        """
        Initialize chat history with system message and record in history manager,
        only if one doesn't already exist for this session.
        """
        if self.system_message and self.history_manager:
            if not self.history_manager.has_system_message(self.name):
                system_msg = {'role': 'system', 'content': self.system_message}

                if not self.chat_history or self.chat_history[0].get('role') != 'system':
                    self.chat_history.insert(0, system_msg)

                self.history_manager.append_message(message=system_msg,
                                                    sender_type=EntityType.MAIN_SUPERVISOR
                                                    if not self.is_assistant else EntityType.ASSISTANT_SUPERVISOR,
                                                    sender_name=self.name)

    def configure_system_prompt(self, system_prompt: str) -> None:
        """
        Configure the system prompt for the Supervisor.

        Args:
            system_prompt (str): The new system prompt to set.
        """
        self.system_message = {"role": "system", "content": system_prompt}

    def register_agent(self, agent: Union[Agent, 'Supervisor']) -> None:
        """
        Register a new agent or assistant supervisor.

        Args:
            agent (Union[Agent, Supervisor]): The agent or assistant supervisor to register.

        Raises:
            ValueError: If attempting to register a main supervisor or if registration rules are violated.
        """

        if isinstance(agent, Supervisor) and not agent.is_assistant:
            raise ValueError("Only assistant supervisors can be registered as agents")

        if self.is_assistant and not self.workflow_id:
            self._pending_registrations.append(agent)
            return

        agent.set_workflow_id(self.workflow_id, history_base_path=self.history_base_path)
        self.registered_agents.append(agent)
        self._add_agent_tool(agent)

    def set_workflow_id(self, workflow_id: str, history_base_path: Optional[str] = None) -> None:
        """
        Set the workflow ID and history base path for this supervisor and all its children.
        This is the key method for ensuring session state is correctly propagated.

        Args:
            workflow_id (str): The workflow ID (session ID) to set.
            history_base_path (Optional[str]): The root directory for storing history logs.
        """
        self.workflow_id = workflow_id
        self.history_base_path = history_base_path
        self.debugger.update_workflow_id(workflow_id)
        self.history_manager = HistoryManager(workflow_id, base_path=self.history_base_path)
        self._initialize_chat_history()
        self._process_pending_registrations()

        for agent in self.registered_agents:
            agent.set_workflow_id(workflow_id, history_base_path=history_base_path)

    def _process_pending_registrations(self) -> None:
        """Process any pending agent registrations."""
        if self._pending_registrations:
            for agent in self._pending_registrations:
                self.register_agent(agent)
            self._pending_registrations.clear()

    def _add_agent_tool(self, agent: Agent) -> None:
        """
        Add a tool for the registered agent to the available tools.

        Args:
            agent (Agent): The agent for which to add a tool.
        """
        self.available_tools.append({
            "type": "function",
            "function": {
                "name": f"delegate_to_{agent.name}",
                "description": agent.system_message,
                "parameters": {
                    "type": "object",
                    "properties": {
                        "reasoning": {
                            "type":
                                "string",
                            "description": (f"Supervisor's reasoning for choosing the {agent.name} agent. "
                                            "Explain why this agent is being invoked and what is expected of it.")
                        },
                        "query": {
                            "type": "string",
                            "description": (f"The actual query or instruction for {agent.name} agent to respond to.")
                        },
                        "context": {
                            "type":
                                "string",
                            "description": ("All relevant background information, prior facts, decisions, "
                                            "and state needed by the agent to solve the current query. "
                                            "Should be as detailed and self-contained as possible.")
                        },
                    },
                    "required": ["reasoning", "query", "context"]
                }
            },
            "strict": True,
        })

    def _initialize_workflow(self) -> None:
        """
        Initialize the workflow directory and set up necessary structures.
        Only called by main supervisor.

        Raises:
            ValueError: If an assistant supervisor attempts to initialize a workflow.
        """
        if self.is_assistant:
            raise ValueError("Assistant supervisors cannot initialize workflows")

        if not self.workflow_id:
            self.workflow_id = str(uuid.uuid4())

        base_dir = Path(self.history_base_path) if self.history_base_path else Path("xronai_logs")
        workflow_path = base_dir / self.workflow_id
        workflow_path.mkdir(parents=True, exist_ok=True)

        history_file = workflow_path / "history.jsonl"
        if not history_file.exists():
            history_file.touch()

    def get_registered_agents(self) -> List[str]:
        """
        Get the names of all registered agents.

        Returns:
            List[str]: A list of registered agent names.
        """
        return [agent.name for agent in self.registered_agents]

    def delegate_to_agent(self,
                          message: ChatCompletionMessage,
                          parent_msg_id: str,
                          supervisor_chain: Optional[List[str]] = None,
                          on_event: Optional[Callable] = None) -> str:
        """
        Delegate a task to the appropriate agent based on the supervisor's response.

        Args:
            message (ChatCompletionMessage): The message containing the delegation information.
            parent_msg_id (str): ID of the parent message in history.
            supervisor_chain (Optional[List[str]]): Chain of supervisors involved in delegation.
            on_event (Optional[Callable]): The event callback function.

        Returns:
            str: The response from the delegated agent.

        Raises:
            ValueError: If no matching agent is found for delegation or if the message structure is unexpected.
        """
        if not hasattr(message, 'tool_calls') or not message.tool_calls:
            raise ValueError("Message does not contain tool calls")

        function_call = message.tool_calls[0]
        target_agent_name = function_call.function.name.replace("delegate_to_", "")
        args = json.loads(function_call.function.arguments)
        reasoning = args.get('reasoning')
        context = args.get('context')
        query = args.get('query')

        if not query:
            raise ValueError("Query is missing from the function call")

        target_agent = next((agent for agent in self.registered_agents if agent.name == target_agent_name), None)

        if not target_agent:
            raise ValueError(f"No agent found with name '{target_agent_name}'")

        self.debugger.log(f"[DELEGATION] Agent: {target_agent_name}")
        self.debugger.log(f"[REASONING] {reasoning}")
        self.debugger.log(f"[CONTEXT] {context}")
        self.debugger.log(f"[QUERY] {query}")

        self._emit_event(
            on_event, "SUPERVISOR_DELEGATE", {
                "source": {
                    "name": self.name,
                    "type": "ASSISTANT_SUPERVISOR" if self.is_assistant else "SUPERVISOR"
                },
                "target": {
                    "name": target_agent.name,
                    "type": "AGENT" if isinstance(target_agent, Agent) else "ASSISTANT_SUPERVISOR"
                },
                "reasoning": reasoning,
                "query_for_agent": query
            })

        current_chain = supervisor_chain or []
        current_chain.append(self.name)

        agent_response = target_agent.chat(query=f"CONTEXT:\n{context}\n\nQUERY:\n{query}",
                                           sender_name=self.name,
                                           on_event=on_event)
        self.debugger.log(f"[RESPONSE] {target_agent_name}: {agent_response}")
        return agent_response

    def chat(self,
             query: str,
             sender_name: Optional[str] = None,
             supervisor_chain: Optional[List[str]] = None,
             on_event: Optional[Callable] = None) -> str:
        """
        Process user input and generate a response using the appropriate agents.

        Args:
            query (str): The user's input query.
            sender_name (Optional[str]): Name of the sender (for assistant supervisors).
            supervisor_chain (Optional[List[str]]): Chain of supervisors in delegation.
            on_event (Optional[Callable]): A callback function to stream events to.

        Returns:
            str: The final response to the user's query.

        Raises:
            RuntimeError: If there's an error in processing the user input.
        """
        self.debugger.log(f"[USER INPUT] {query}")

        self._emit_event(on_event, "WORKFLOW_START", {"user_query": query})

        current_chain = supervisor_chain or []
        if self.name not in current_chain:
            current_chain.append(self.name)

        user_msg = {'role': 'user', 'content': query}
        self.chat_history.append(user_msg)

        user_msg_id = self.history_manager.append_message(
            message=user_msg,
            sender_type=EntityType.MAIN_SUPERVISOR if sender_name else EntityType.USER,
            sender_name=sender_name or "user",
            supervisor_chain=current_chain)

        try:
            while True:
                supervisor_response = self.generate_response(self.chat_history,
                                                             tools=self.available_tools,
                                                             use_tools=self.use_agents).choices[0]

                if not supervisor_response.finish_reason == "tool_calls":
                    query_answer = supervisor_response.message.content
                    self.debugger.log(f"[SUPERVISOR RESPONSE] {query_answer}")

                    response_msg = {"role": "assistant", "content": query_answer}
                    self.chat_history.append(response_msg)

                    self.history_manager.append_message(message=response_msg,
                                                        sender_type=EntityType.MAIN_SUPERVISOR
                                                        if not self.is_assistant else EntityType.ASSISTANT_SUPERVISOR,
                                                        sender_name=self.name,
                                                        parent_id=user_msg_id,
                                                        supervisor_chain=current_chain)

                    self._emit_event(
                        on_event, "FINAL_RESPONSE", {
                            "source": {
                                "name": self.name,
                                "type": "ASSISTANT_SUPERVISOR" if self.is_assistant else "SUPERVISOR"
                            },
                            "content": query_answer
                        })
                    self._emit_event(on_event, "WORKFLOW_END", {})
                    return query_answer

                tool_call = supervisor_response.message.tool_calls[0]
                tool_msg = {
                    "role":
                        "assistant",
                    "content":
                        None,
                    "tool_calls": [{
                        'id': tool_call.id,
                        'type': 'function',
                        'function': {
                            'name': tool_call.function.name,
                            'arguments': tool_call.function.arguments
                        }
                    }]
                }
                self.chat_history.append(tool_msg)

                tool_msg_id = self.history_manager.append_message(
                    message=tool_msg,
                    sender_type=EntityType.MAIN_SUPERVISOR
                    if not self.is_assistant else EntityType.ASSISTANT_SUPERVISOR,
                    sender_name=self.name,
                    parent_id=user_msg_id,
                    tool_call_id=tool_call.id,
                    supervisor_chain=current_chain)

                if hasattr(supervisor_response.message, 'tool_calls') and supervisor_response.message.tool_calls:
                    agent_feedback = self.delegate_to_agent(supervisor_response.message,
                                                            tool_msg_id,
                                                            supervisor_chain=current_chain,
                                                            on_event=on_event)

                    self._emit_event(
                        on_event,
                        "AGENT_RESPONSE",
                        {
                            "source": {
                                "name": tool_call.function.name.replace("delegate_to_", ""),
                                "type": "AGENT"
                            },  # A bit of a hack to get agent name
                            "content": agent_feedback
                        })

                    feedback_msg = {"role": "tool", "content": agent_feedback, "tool_call_id": tool_call.id}
                    self.chat_history.append(feedback_msg)

                    target_agent_name = tool_call.function.name.replace("delegate_to_", "")
                    self.history_manager.append_message(message=feedback_msg,
                                                        sender_type=EntityType.TOOL,
                                                        sender_name=target_agent_name,
                                                        parent_id=tool_msg_id,
                                                        tool_call_id=tool_call.id,
                                                        supervisor_chain=current_chain)
                else:
                    final_content = supervisor_response.message.content
                    self._emit_event(
                        on_event, "FINAL_RESPONSE", {
                            "source": {
                                "name": self.name,
                                "type": "ASSISTANT_SUPERVISOR" if self.is_assistant else "SUPERVISOR"
                            },
                            "content": final_content
                        })
                    self._emit_event(on_event, "WORKFLOW_END", {})
                    return final_content

        except Exception as e:
            error_msg = f"Error in processing user input: {str(e)}"
            self.debugger.log(f"[ERROR] {error_msg}", level="error")
            self._emit_event(
                on_event, "ERROR", {
                    "source": {
                        "name": self.name,
                        "type": "ASSISTANT_SUPERVISOR" if self.is_assistant else "SUPERVISOR"
                    },
                    "error_message": error_msg
                })
            self._emit_event(on_event, "WORKFLOW_END", {})
            raise RuntimeError(error_msg)

    def start_interactive_session(self) -> None:
        """
        Start an interactive session with the user.

        This method initiates a loop that continuously processes user input
        until the user decides to exit.
        """
        print("Starting interactive session. Type 'exit' to end the session.")
        while True:
            user_input = input("User: ").strip()
            if user_input.lower() == "exit":
                print("Ending session. Goodbye!")
                break
            try:
                supervisor_output = self.chat(query=user_input)
                print(f"Supervisor: {supervisor_output}")
            except Exception as e:
                print(f"An error occurred: {str(e)}")

    def __str__(self) -> str:
        """Return a string representation of the Supervisor instance."""
        return f"Supervisor(name={self.name}, agents={len(self.registered_agents)})"

    def __repr__(self) -> str:
        """Return a detailed string representation of the Supervisor instance."""
        return (f"Supervisor(name={self.name}, llm_config={self.llm_config}, "
                f"registered_agents={[agent.name for agent in self.registered_agents]})")

    def reset_chat_history(self) -> None:
        """Reset chat history to initial state and clear history manager."""
        self.history_manager.clear_history()
        self._initialize_chat_history()

    def get_chat_history(self) -> List[Dict[str, str]]:
        """
        Get the current chat history.

        Returns:
            List[Dict[str, str]]: The current chat history.
        """
        return self.chat_history

    def add_to_chat_history(self, role: str, content: str) -> None:
        """
        Add a new message to both chat history and history manager.

        Args:
            role (str): The role of the message sender (e.g., 'user', 'assistant', 'system').
            content (str): The content of the message.

        Raises:
            ValueError: If an invalid role is provided.
        """
        if role not in ['user', 'assistant', 'system', 'tool']:
            raise ValueError(f"Invalid role: {role}")

        message = {"role": role, "content": content}
        self.chat_history.append(message)

        sender_type = {
            'user': EntityType.USER,
            'assistant': EntityType.MAIN_SUPERVISOR if not self.is_assistant else EntityType.ASSISTANT_SUPERVISOR,
            'system': EntityType.MAIN_SUPERVISOR if not self.is_assistant else EntityType.ASSISTANT_SUPERVISOR,
            'tool': EntityType.TOOL
        }[role]

        self.history_manager.append_message(message=message, sender_type=sender_type, sender_name=self.name)

    def get_agent_by_name(self, agent_name: str) -> Optional[Agent]:
        """
        Get a registered agent by its name.

        Args:
            agent_name (str): The name of the agent to retrieve.

        Returns:
            Optional[Agent]: The agent with the specified name, or None if not found.
        """
        return next((agent for agent in self.registered_agents if agent.name.lower() == agent_name.lower()), None)

    def remove_agent(self, agent_name: str) -> bool:
        """
        Remove a registered agent by its name.

        Args:
            agent_name (str): The name of the agent to remove.

        Returns:
            bool: True if the agent was successfully removed, False otherwise.
        """
        agent = self.get_agent_by_name(agent_name)
        if agent:
            self.registered_agents.remove(agent)
            self.available_tools = [
                tool for tool in self.available_tools if tool['function']['name'] != f"delegate_to_{agent_name}"
            ]
            return True
        return False

    def update_system_message(self) -> None:
        """
        Update the system message to reflect the current set of registered agents.
        """
        agent_descriptions = "\n".join(f"{agent.name}: {agent.system_message}" for agent in self.registered_agents)
        self.system_message = f"{self._get_default_system_message()}\n\n{agent_descriptions}"
        self.reset_chat_history()

    @property
    def is_main_supervisor(self) -> bool:
        """
        Check if this is the main supervisor.

        Returns:
            bool: True if this is the main supervisor, False if assistant.
        """
        return not self.is_assistant

    def get_workflow_info(self) -> Dict[str, Any]:
        """
        Get information about the current workflow.

        Returns:
            Dict[str, Any]: Dictionary containing workflow information.
        """
        return {
            'workflow_id':
                self.workflow_id,
            'supervisor_type':
                'main' if self.is_main_supervisor else 'assistant',
            'name':
                self.name,
            'registered_agents': [{
                'name': agent.name,
                'type': 'supervisor' if isinstance(agent, Supervisor) else 'agent'
            } for agent in self.registered_agents]
        }

    def display_agent_graph(self, indent="", skip_header=False) -> None:
        """
        Display the supervisor-agent hierarchy.

        Args:
            indent (str): Current indentation level
            skip_header (bool): Whether to skip printing the supervisor header
        """
        if not skip_header:
            supervisor_type = "Main Supervisor" if self.is_main_supervisor else "Assistant Supervisor"
            print(f"{indent}{supervisor_type}: {self.name}")

            if self.registered_agents:
                print(f"{indent}│")

        for i, agent in enumerate(self.registered_agents):
            is_last_agent = i == len(self.registered_agents) - 1
            agent_prefix = "└── " if is_last_agent else "├── "
            current_indent = indent + ("    " if is_last_agent else "│   ")

            if isinstance(agent, Supervisor):
                print(f"{indent}{agent_prefix}Assistant Supervisor: {agent.name}")
                agent.display_agent_graph(current_indent, skip_header=True)  # Skip header for recursive calls
            else:
                print(f"{indent}{agent_prefix}Agent: {agent.name}")
                if hasattr(agent, 'tools') and agent.tools:
                    for j, tool in enumerate(agent.tools):
                        is_last_tool = j == len(agent.tools) - 1
                        tool_prefix = "└── " if is_last_tool else "├── "
                        tool_name = tool['metadata']['function']['name'] if 'metadata' in tool else "Unnamed Tool"
                        print(f"{current_indent}{tool_prefix}Tool: {tool_name}")
                else:
                    print(f"{current_indent}└── No tools available")

            if not is_last_agent and i < len(self.registered_agents) - 1:
                print(f"{indent}│")

is_main_supervisor property

Check if this is the main supervisor.

Returns:

Name Type Description
bool bool

True if this is the main supervisor, False if assistant.

__init__(name, llm_config, workflow_id=None, is_assistant=False, system_message=None, use_agents=True, history_base_path=None)

Initialize the Supervisor instance.

Parameters:

Name Type Description Default
name str

The name of the supervisor.

required
llm_config Dict[str, str]

Configuration for the language model.

required
workflow_id Optional[str]

ID for the workflow. Only used by main supervisor.

None
is_assistant bool

Whether this is an assistant supervisor.

False
system_message Optional[str]

The initial system message for the agent.

None
use_agents bool

Whether to use agents or not.

True
history_base_path Optional[str]

The root directory for storing history logs.

None

Raises:

Type Description
ValueError

If the name is empty or if workflow management rules are violated.

Source code in xronai/core/supervisor.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(self,
             name: str,
             llm_config: Dict[str, str],
             workflow_id: Optional[str] = None,
             is_assistant: bool = False,
             system_message: Optional[str] = None,
             use_agents: bool = True,
             history_base_path: Optional[str] = None):
    """
    Initialize the Supervisor instance.

    Args:
        name (str): The name of the supervisor.
        llm_config (Dict[str, str]): Configuration for the language model.
        workflow_id (Optional[str]): ID for the workflow. Only used by main supervisor.
        is_assistant (bool): Whether this is an assistant supervisor.
        system_message (Optional[str]): The initial system message for the agent.
        use_agents (bool): Whether to use agents or not.
        history_base_path (Optional[str]): The root directory for storing history logs.

    Raises:
        ValueError: If the name is empty or if workflow management rules are violated.
    """
    super().__init__(llm_config=llm_config)

    if not name:
        raise ValueError("Supervisor name cannot be empty")

    self.name = "".join(name.split())
    self.is_assistant = is_assistant
    self.workflow_id = workflow_id
    self.history_base_path = history_base_path

    self.chat_history: List[Dict[str, str]] = []
    self._pending_registrations: List[Union[Agent, 'Supervisor']] = []
    self.system_message = system_message if system_message is not None else self._get_default_system_message()

    if not is_assistant:
        if workflow_id:
            try:
                self.history_manager = HistoryManager(workflow_id, base_path=self.history_base_path)
            except ValueError:
                self._initialize_workflow()
                self.history_manager = HistoryManager(workflow_id, base_path=self.history_base_path)
            if not self.history_manager.has_system_message(self.name):
                self._initialize_chat_history()
        else:
            self._initialize_workflow()
            self.history_manager = HistoryManager(self.workflow_id, base_path=self.history_base_path)
            self._initialize_chat_history()
    else:
        self.history_manager = None

    self.registered_agents: List[Union[Agent, 'Supervisor']] = []
    self.available_tools: List[Dict[str, Any]] = []
    self.use_agents = use_agents

    self.debugger = Debugger(name=self.name, workflow_id=self.workflow_id)
    self.debugger.start_session()

__repr__()

Return a detailed string representation of the Supervisor instance.

Source code in xronai/core/supervisor.py
498
499
500
501
def __repr__(self) -> str:
    """Return a detailed string representation of the Supervisor instance."""
    return (f"Supervisor(name={self.name}, llm_config={self.llm_config}, "
            f"registered_agents={[agent.name for agent in self.registered_agents]})")

__str__()

Return a string representation of the Supervisor instance.

Source code in xronai/core/supervisor.py
494
495
496
def __str__(self) -> str:
    """Return a string representation of the Supervisor instance."""
    return f"Supervisor(name={self.name}, agents={len(self.registered_agents)})"

add_to_chat_history(role, content)

Add a new message to both chat history and history manager.

Parameters:

Name Type Description Default
role str

The role of the message sender (e.g., 'user', 'assistant', 'system').

required
content str

The content of the message.

required

Raises:

Type Description
ValueError

If an invalid role is provided.

Source code in xronai/core/supervisor.py
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
def add_to_chat_history(self, role: str, content: str) -> None:
    """
    Add a new message to both chat history and history manager.

    Args:
        role (str): The role of the message sender (e.g., 'user', 'assistant', 'system').
        content (str): The content of the message.

    Raises:
        ValueError: If an invalid role is provided.
    """
    if role not in ['user', 'assistant', 'system', 'tool']:
        raise ValueError(f"Invalid role: {role}")

    message = {"role": role, "content": content}
    self.chat_history.append(message)

    sender_type = {
        'user': EntityType.USER,
        'assistant': EntityType.MAIN_SUPERVISOR if not self.is_assistant else EntityType.ASSISTANT_SUPERVISOR,
        'system': EntityType.MAIN_SUPERVISOR if not self.is_assistant else EntityType.ASSISTANT_SUPERVISOR,
        'tool': EntityType.TOOL
    }[role]

    self.history_manager.append_message(message=message, sender_type=sender_type, sender_name=self.name)

chat(query, sender_name=None, supervisor_chain=None, on_event=None)

Process user input and generate a response using the appropriate agents.

Parameters:

Name Type Description Default
query str

The user's input query.

required
sender_name Optional[str]

Name of the sender (for assistant supervisors).

None
supervisor_chain Optional[List[str]]

Chain of supervisors in delegation.

None
on_event Optional[Callable]

A callback function to stream events to.

None

Returns:

Name Type Description
str str

The final response to the user's query.

Raises:

Type Description
RuntimeError

If there's an error in processing the user input.

Source code in xronai/core/supervisor.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
def chat(self,
         query: str,
         sender_name: Optional[str] = None,
         supervisor_chain: Optional[List[str]] = None,
         on_event: Optional[Callable] = None) -> str:
    """
    Process user input and generate a response using the appropriate agents.

    Args:
        query (str): The user's input query.
        sender_name (Optional[str]): Name of the sender (for assistant supervisors).
        supervisor_chain (Optional[List[str]]): Chain of supervisors in delegation.
        on_event (Optional[Callable]): A callback function to stream events to.

    Returns:
        str: The final response to the user's query.

    Raises:
        RuntimeError: If there's an error in processing the user input.
    """
    self.debugger.log(f"[USER INPUT] {query}")

    self._emit_event(on_event, "WORKFLOW_START", {"user_query": query})

    current_chain = supervisor_chain or []
    if self.name not in current_chain:
        current_chain.append(self.name)

    user_msg = {'role': 'user', 'content': query}
    self.chat_history.append(user_msg)

    user_msg_id = self.history_manager.append_message(
        message=user_msg,
        sender_type=EntityType.MAIN_SUPERVISOR if sender_name else EntityType.USER,
        sender_name=sender_name or "user",
        supervisor_chain=current_chain)

    try:
        while True:
            supervisor_response = self.generate_response(self.chat_history,
                                                         tools=self.available_tools,
                                                         use_tools=self.use_agents).choices[0]

            if not supervisor_response.finish_reason == "tool_calls":
                query_answer = supervisor_response.message.content
                self.debugger.log(f"[SUPERVISOR RESPONSE] {query_answer}")

                response_msg = {"role": "assistant", "content": query_answer}
                self.chat_history.append(response_msg)

                self.history_manager.append_message(message=response_msg,
                                                    sender_type=EntityType.MAIN_SUPERVISOR
                                                    if not self.is_assistant else EntityType.ASSISTANT_SUPERVISOR,
                                                    sender_name=self.name,
                                                    parent_id=user_msg_id,
                                                    supervisor_chain=current_chain)

                self._emit_event(
                    on_event, "FINAL_RESPONSE", {
                        "source": {
                            "name": self.name,
                            "type": "ASSISTANT_SUPERVISOR" if self.is_assistant else "SUPERVISOR"
                        },
                        "content": query_answer
                    })
                self._emit_event(on_event, "WORKFLOW_END", {})
                return query_answer

            tool_call = supervisor_response.message.tool_calls[0]
            tool_msg = {
                "role":
                    "assistant",
                "content":
                    None,
                "tool_calls": [{
                    'id': tool_call.id,
                    'type': 'function',
                    'function': {
                        'name': tool_call.function.name,
                        'arguments': tool_call.function.arguments
                    }
                }]
            }
            self.chat_history.append(tool_msg)

            tool_msg_id = self.history_manager.append_message(
                message=tool_msg,
                sender_type=EntityType.MAIN_SUPERVISOR
                if not self.is_assistant else EntityType.ASSISTANT_SUPERVISOR,
                sender_name=self.name,
                parent_id=user_msg_id,
                tool_call_id=tool_call.id,
                supervisor_chain=current_chain)

            if hasattr(supervisor_response.message, 'tool_calls') and supervisor_response.message.tool_calls:
                agent_feedback = self.delegate_to_agent(supervisor_response.message,
                                                        tool_msg_id,
                                                        supervisor_chain=current_chain,
                                                        on_event=on_event)

                self._emit_event(
                    on_event,
                    "AGENT_RESPONSE",
                    {
                        "source": {
                            "name": tool_call.function.name.replace("delegate_to_", ""),
                            "type": "AGENT"
                        },  # A bit of a hack to get agent name
                        "content": agent_feedback
                    })

                feedback_msg = {"role": "tool", "content": agent_feedback, "tool_call_id": tool_call.id}
                self.chat_history.append(feedback_msg)

                target_agent_name = tool_call.function.name.replace("delegate_to_", "")
                self.history_manager.append_message(message=feedback_msg,
                                                    sender_type=EntityType.TOOL,
                                                    sender_name=target_agent_name,
                                                    parent_id=tool_msg_id,
                                                    tool_call_id=tool_call.id,
                                                    supervisor_chain=current_chain)
            else:
                final_content = supervisor_response.message.content
                self._emit_event(
                    on_event, "FINAL_RESPONSE", {
                        "source": {
                            "name": self.name,
                            "type": "ASSISTANT_SUPERVISOR" if self.is_assistant else "SUPERVISOR"
                        },
                        "content": final_content
                    })
                self._emit_event(on_event, "WORKFLOW_END", {})
                return final_content

    except Exception as e:
        error_msg = f"Error in processing user input: {str(e)}"
        self.debugger.log(f"[ERROR] {error_msg}", level="error")
        self._emit_event(
            on_event, "ERROR", {
                "source": {
                    "name": self.name,
                    "type": "ASSISTANT_SUPERVISOR" if self.is_assistant else "SUPERVISOR"
                },
                "error_message": error_msg
            })
        self._emit_event(on_event, "WORKFLOW_END", {})
        raise RuntimeError(error_msg)

configure_system_prompt(system_prompt)

Configure the system prompt for the Supervisor.

Parameters:

Name Type Description Default
system_prompt str

The new system prompt to set.

required
Source code in xronai/core/supervisor.py
134
135
136
137
138
139
140
141
def configure_system_prompt(self, system_prompt: str) -> None:
    """
    Configure the system prompt for the Supervisor.

    Args:
        system_prompt (str): The new system prompt to set.
    """
    self.system_message = {"role": "system", "content": system_prompt}

delegate_to_agent(message, parent_msg_id, supervisor_chain=None, on_event=None)

Delegate a task to the appropriate agent based on the supervisor's response.

Parameters:

Name Type Description Default
message ChatCompletionMessage

The message containing the delegation information.

required
parent_msg_id str

ID of the parent message in history.

required
supervisor_chain Optional[List[str]]

Chain of supervisors involved in delegation.

None
on_event Optional[Callable]

The event callback function.

None

Returns:

Name Type Description
str str

The response from the delegated agent.

Raises:

Type Description
ValueError

If no matching agent is found for delegation or if the message structure is unexpected.

Source code in xronai/core/supervisor.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def delegate_to_agent(self,
                      message: ChatCompletionMessage,
                      parent_msg_id: str,
                      supervisor_chain: Optional[List[str]] = None,
                      on_event: Optional[Callable] = None) -> str:
    """
    Delegate a task to the appropriate agent based on the supervisor's response.

    Args:
        message (ChatCompletionMessage): The message containing the delegation information.
        parent_msg_id (str): ID of the parent message in history.
        supervisor_chain (Optional[List[str]]): Chain of supervisors involved in delegation.
        on_event (Optional[Callable]): The event callback function.

    Returns:
        str: The response from the delegated agent.

    Raises:
        ValueError: If no matching agent is found for delegation or if the message structure is unexpected.
    """
    if not hasattr(message, 'tool_calls') or not message.tool_calls:
        raise ValueError("Message does not contain tool calls")

    function_call = message.tool_calls[0]
    target_agent_name = function_call.function.name.replace("delegate_to_", "")
    args = json.loads(function_call.function.arguments)
    reasoning = args.get('reasoning')
    context = args.get('context')
    query = args.get('query')

    if not query:
        raise ValueError("Query is missing from the function call")

    target_agent = next((agent for agent in self.registered_agents if agent.name == target_agent_name), None)

    if not target_agent:
        raise ValueError(f"No agent found with name '{target_agent_name}'")

    self.debugger.log(f"[DELEGATION] Agent: {target_agent_name}")
    self.debugger.log(f"[REASONING] {reasoning}")
    self.debugger.log(f"[CONTEXT] {context}")
    self.debugger.log(f"[QUERY] {query}")

    self._emit_event(
        on_event, "SUPERVISOR_DELEGATE", {
            "source": {
                "name": self.name,
                "type": "ASSISTANT_SUPERVISOR" if self.is_assistant else "SUPERVISOR"
            },
            "target": {
                "name": target_agent.name,
                "type": "AGENT" if isinstance(target_agent, Agent) else "ASSISTANT_SUPERVISOR"
            },
            "reasoning": reasoning,
            "query_for_agent": query
        })

    current_chain = supervisor_chain or []
    current_chain.append(self.name)

    agent_response = target_agent.chat(query=f"CONTEXT:\n{context}\n\nQUERY:\n{query}",
                                       sender_name=self.name,
                                       on_event=on_event)
    self.debugger.log(f"[RESPONSE] {target_agent_name}: {agent_response}")
    return agent_response

display_agent_graph(indent='', skip_header=False)

Display the supervisor-agent hierarchy.

Parameters:

Name Type Description Default
indent str

Current indentation level

''
skip_header bool

Whether to skip printing the supervisor header

False
Source code in xronai/core/supervisor.py
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
def display_agent_graph(self, indent="", skip_header=False) -> None:
    """
    Display the supervisor-agent hierarchy.

    Args:
        indent (str): Current indentation level
        skip_header (bool): Whether to skip printing the supervisor header
    """
    if not skip_header:
        supervisor_type = "Main Supervisor" if self.is_main_supervisor else "Assistant Supervisor"
        print(f"{indent}{supervisor_type}: {self.name}")

        if self.registered_agents:
            print(f"{indent}│")

    for i, agent in enumerate(self.registered_agents):
        is_last_agent = i == len(self.registered_agents) - 1
        agent_prefix = "└── " if is_last_agent else "├── "
        current_indent = indent + ("    " if is_last_agent else "│   ")

        if isinstance(agent, Supervisor):
            print(f"{indent}{agent_prefix}Assistant Supervisor: {agent.name}")
            agent.display_agent_graph(current_indent, skip_header=True)  # Skip header for recursive calls
        else:
            print(f"{indent}{agent_prefix}Agent: {agent.name}")
            if hasattr(agent, 'tools') and agent.tools:
                for j, tool in enumerate(agent.tools):
                    is_last_tool = j == len(agent.tools) - 1
                    tool_prefix = "└── " if is_last_tool else "├── "
                    tool_name = tool['metadata']['function']['name'] if 'metadata' in tool else "Unnamed Tool"
                    print(f"{current_indent}{tool_prefix}Tool: {tool_name}")
            else:
                print(f"{current_indent}└── No tools available")

        if not is_last_agent and i < len(self.registered_agents) - 1:
            print(f"{indent}│")

get_agent_by_name(agent_name)

Get a registered agent by its name.

Parameters:

Name Type Description Default
agent_name str

The name of the agent to retrieve.

required

Returns:

Type Description
Optional[Agent]

Optional[Agent]: The agent with the specified name, or None if not found.

Source code in xronai/core/supervisor.py
543
544
545
546
547
548
549
550
551
552
553
def get_agent_by_name(self, agent_name: str) -> Optional[Agent]:
    """
    Get a registered agent by its name.

    Args:
        agent_name (str): The name of the agent to retrieve.

    Returns:
        Optional[Agent]: The agent with the specified name, or None if not found.
    """
    return next((agent for agent in self.registered_agents if agent.name.lower() == agent_name.lower()), None)

get_chat_history()

Get the current chat history.

Returns:

Type Description
List[Dict[str, str]]

List[Dict[str, str]]: The current chat history.

Source code in xronai/core/supervisor.py
508
509
510
511
512
513
514
515
def get_chat_history(self) -> List[Dict[str, str]]:
    """
    Get the current chat history.

    Returns:
        List[Dict[str, str]]: The current chat history.
    """
    return self.chat_history

get_registered_agents()

Get the names of all registered agents.

Returns:

Type Description
List[str]

List[str]: A list of registered agent names.

Source code in xronai/core/supervisor.py
252
253
254
255
256
257
258
259
def get_registered_agents(self) -> List[str]:
    """
    Get the names of all registered agents.

    Returns:
        List[str]: A list of registered agent names.
    """
    return [agent.name for agent in self.registered_agents]

get_workflow_info()

Get information about the current workflow.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Dictionary containing workflow information.

Source code in xronai/core/supervisor.py
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
def get_workflow_info(self) -> Dict[str, Any]:
    """
    Get information about the current workflow.

    Returns:
        Dict[str, Any]: Dictionary containing workflow information.
    """
    return {
        'workflow_id':
            self.workflow_id,
        'supervisor_type':
            'main' if self.is_main_supervisor else 'assistant',
        'name':
            self.name,
        'registered_agents': [{
            'name': agent.name,
            'type': 'supervisor' if isinstance(agent, Supervisor) else 'agent'
        } for agent in self.registered_agents]
    }

register_agent(agent)

Register a new agent or assistant supervisor.

Parameters:

Name Type Description Default
agent Union[Agent, Supervisor]

The agent or assistant supervisor to register.

required

Raises:

Type Description
ValueError

If attempting to register a main supervisor or if registration rules are violated.

Source code in xronai/core/supervisor.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def register_agent(self, agent: Union[Agent, 'Supervisor']) -> None:
    """
    Register a new agent or assistant supervisor.

    Args:
        agent (Union[Agent, Supervisor]): The agent or assistant supervisor to register.

    Raises:
        ValueError: If attempting to register a main supervisor or if registration rules are violated.
    """

    if isinstance(agent, Supervisor) and not agent.is_assistant:
        raise ValueError("Only assistant supervisors can be registered as agents")

    if self.is_assistant and not self.workflow_id:
        self._pending_registrations.append(agent)
        return

    agent.set_workflow_id(self.workflow_id, history_base_path=self.history_base_path)
    self.registered_agents.append(agent)
    self._add_agent_tool(agent)

remove_agent(agent_name)

Remove a registered agent by its name.

Parameters:

Name Type Description Default
agent_name str

The name of the agent to remove.

required

Returns:

Name Type Description
bool bool

True if the agent was successfully removed, False otherwise.

Source code in xronai/core/supervisor.py
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
def remove_agent(self, agent_name: str) -> bool:
    """
    Remove a registered agent by its name.

    Args:
        agent_name (str): The name of the agent to remove.

    Returns:
        bool: True if the agent was successfully removed, False otherwise.
    """
    agent = self.get_agent_by_name(agent_name)
    if agent:
        self.registered_agents.remove(agent)
        self.available_tools = [
            tool for tool in self.available_tools if tool['function']['name'] != f"delegate_to_{agent_name}"
        ]
        return True
    return False

reset_chat_history()

Reset chat history to initial state and clear history manager.

Source code in xronai/core/supervisor.py
503
504
505
506
def reset_chat_history(self) -> None:
    """Reset chat history to initial state and clear history manager."""
    self.history_manager.clear_history()
    self._initialize_chat_history()

set_workflow_id(workflow_id, history_base_path=None)

Set the workflow ID and history base path for this supervisor and all its children. This is the key method for ensuring session state is correctly propagated.

Parameters:

Name Type Description Default
workflow_id str

The workflow ID (session ID) to set.

required
history_base_path Optional[str]

The root directory for storing history logs.

None
Source code in xronai/core/supervisor.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def set_workflow_id(self, workflow_id: str, history_base_path: Optional[str] = None) -> None:
    """
    Set the workflow ID and history base path for this supervisor and all its children.
    This is the key method for ensuring session state is correctly propagated.

    Args:
        workflow_id (str): The workflow ID (session ID) to set.
        history_base_path (Optional[str]): The root directory for storing history logs.
    """
    self.workflow_id = workflow_id
    self.history_base_path = history_base_path
    self.debugger.update_workflow_id(workflow_id)
    self.history_manager = HistoryManager(workflow_id, base_path=self.history_base_path)
    self._initialize_chat_history()
    self._process_pending_registrations()

    for agent in self.registered_agents:
        agent.set_workflow_id(workflow_id, history_base_path=history_base_path)

start_interactive_session()

Start an interactive session with the user.

This method initiates a loop that continuously processes user input until the user decides to exit.

Source code in xronai/core/supervisor.py
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
def start_interactive_session(self) -> None:
    """
    Start an interactive session with the user.

    This method initiates a loop that continuously processes user input
    until the user decides to exit.
    """
    print("Starting interactive session. Type 'exit' to end the session.")
    while True:
        user_input = input("User: ").strip()
        if user_input.lower() == "exit":
            print("Ending session. Goodbye!")
            break
        try:
            supervisor_output = self.chat(query=user_input)
            print(f"Supervisor: {supervisor_output}")
        except Exception as e:
            print(f"An error occurred: {str(e)}")

update_system_message()

Update the system message to reflect the current set of registered agents.

Source code in xronai/core/supervisor.py
574
575
576
577
578
579
580
def update_system_message(self) -> None:
    """
    Update the system message to reflect the current set of registered agents.
    """
    agent_descriptions = "\n".join(f"{agent.name}: {agent.system_message}" for agent in self.registered_agents)
    self.system_message = f"{self._get_default_system_message()}\n\n{agent_descriptions}"
    self.reset_chat_history()