(Advanced) Multi-Agents

Sometimes, you want multiple agents (or flows) working together, each performing different tasks or roles. They can communicate by passing messages or updating shared states. Below are some TypeScript examples.


Example 1: Agent Communication with a Shared Queue

Here's how to implement communication using a queue-like structure in Node.js. The agent listens for messages, processes them, and loops back to await more. We will simulate an asynchronous message queue using standard JavaScript patterns (e.g., an array plus setInterval or an event-based approach).

import { BaseNode, Flow, DEFAULT_ACTION } from "../src/pocket";

// We'll define a simple queue interface
interface MessageQueue {
  messages: string[];
  // optional signals or a real concurrency approach
}

// This is our AgentNode, which reads a message from the queue each time it runs.
// For demonstration, we poll the queue at intervals to simulate asynchronous arrival of messages.
export class AgentNode extends BaseNode {
  public async prepAsync(sharedState: any): Promise<string | null> {
    const messageQueue: MessageQueue = this.params["messages"] as MessageQueue;
    if (!messageQueue || !Array.isArray(messageQueue.messages)) {
      throw new Error("Invalid message queue");
    }

    // Wait until there's at least one message or return null if no messages are pending
    if (messageQueue.messages.length === 0) {
      return null;
    }

    // Dequeue the first message
    return messageQueue.messages.shift() || null;
  }

  public async execAsync(message: string | null): Promise<string | null> {
    if (message === null) {
      // No new message
      return null;
    }

    // Process or log the message
    console.log(`Agent received: ${message}`);
    // You could also call an LLM or do more sophisticated processing here
    return message;
  }

  public async postAsync(sharedState: any, prepResult: string | null, execResult: string | null): Promise<string> {
    // We can continue if there's more work to do; otherwise, we might wait or exit.
    // For this example, we just loop forever (polling), so we return "loop"
    return "loop";
  }
}

// Example usage
(async () => {
  // Our simulated queue
  const messageQueue: MessageQueue = {
    messages: [],
  };

  // Create the agent node
  const agent = new AgentNode();
  // Connect the agent node to itself on the "loop" action
  agent.addSuccessor(agent, "loop");

  // Create a flow starting from our agent
  const flow = new Flow(agent);

  // Set the flow params so the agent node knows about the queue
  flow.setParams({ messages: messageQueue });

  // We'll also define a simple message generator that appends to the queue periodically
  setInterval(() => {
    const timestamp = Date.now();
    const sampleMessages = [
      "System status: all systems operational",
      "Memory usage: normal",
      "Network connectivity: stable",
      "Processing load: optimal",
    ];
    const msg = `${sampleMessages[timestamp % sampleMessages.length]} | timestamp_${timestamp}`;
    messageQueue.messages.push(msg);
  }, 1000);

  // Run the flow
  const shared = {};
  // Because it loops indefinitely, this flow never truly ends unless we forcibly stop it.
  // In a real app, you'd have a stopping condition or signal.
  flow.runAsync(shared).catch((err) => {
    console.error("Flow execution failed:", err);
  });
})();

Explanation: - We store messages in messageQueue.messages.
- Each agent run cycle uses prepAsync to dequeue one message.
- The node returns "loop" in postAsync, telling the flow to run the same node again for the next message.


Example 2: Interactive Multi-Agent Example: Taboo Game

Here's a more complex setup with two agents (a "Hinter" and a "Guesser") playing a simplified word-guessing game. They communicate via two queues:
- hinterQueue sends guesses to the Hinter agent,
- guesserQueue sends hints to the Guesser agent.

Warning: Below is a conceptual example. In a real Node.js environment, you might orchestrate concurrency differently (e.g., using Promise.all, or a dedicated event system).

import { BaseNode, Flow } from "../src/pocket";

// Placeholder LLM function (replace with real calls as needed)
async function callLLM(prompt: string): Promise<string> {
  // For demonstration
  return `LLM says: ${prompt.substring(0, 60)}`;
}

/** 
 * AsyncHinter:
 *  1) Waits for a guess from the guesser (via hinterQueue).
 *  2) Generates a new hint while avoiding certain forbidden words.
 *  3) Sends the hint to guesserQueue.
 */
export class AsyncHinter extends BaseNode {
  public async prepAsync(sharedState: any): Promise<{
    guess: string;
    target: string;
    forbidden: string[];
    pastGuesses: string[];
  } | null> {
    // Dequeue guess
    const hinterQueue = sharedState.hinterQueue as string[];
    if (!Array.isArray(hinterQueue)) throw new Error("hinterQueue not found");

    if (hinterQueue.length === 0) {
      return null; // no new guess in queue
    }
    const guess = hinterQueue.shift() as string;

    // If guess == "GAME_OVER", we can end the Hinter agent
    if (guess === "GAME_OVER") {
      return null;
    }

    return {
      guess,
      target: sharedState.target_word,
      forbidden: sharedState.forbidden_words,
      pastGuesses: sharedState.past_guesses ?? [],
    };
  }

  public async execAsync(inputs: {
    guess: string;
    target: string;
    forbidden: string[];
    pastGuesses: string[];
  } | null): Promise<string | null> {
    if (!inputs) {
      return null; // means we should end
    }
    const { guess, target, forbidden, pastGuesses } = inputs;

    // The prompt for generating a hint from the LLM
    let prompt = `Generate a 1-sentence hint for the word "${target}". Avoid these words: ${forbidden.join(", ")}. `;
    if (guess !== "") {
      prompt += `Previous guess was: "${guess}". `;
    }
    if (pastGuesses.length) {
      prompt += `Past wrong guesses: ${pastGuesses.join(", ")}. `;
    }
    prompt += "Hint: use at most 5 words.";

    const hint = await callLLM(prompt);
    console.log(`\nHinter: Here's your hint -> ${hint}`);
    return hint;
  }

  public async postAsync(
    sharedState: any,
    prepResult: {
      guess: string;
      target: string;
      forbidden: string[];
      pastGuesses: string[];
    } | null,
    execResult: string | null
  ): Promise<string> {
    // If no inputs or execResult, game is over or no messages left
    if (!prepResult || execResult === null) {
      return "end"; 
    }

    // Send the generated hint to guesserQueue
    const guesserQueue = sharedState.guesserQueue as string[];
    guesserQueue.push(execResult);

    return "continue"; 
  }
}

/** 
 * AsyncGuesser:
 *  1) Waits for a hint from guesserQueue.
 *  2) Generates a guess.
 *  3) Checks correctness. If correct, game ends; else adds guess to pastGuesses and re-queues for Hinter.
 */
export class AsyncGuesser extends BaseNode {
  public async prepAsync(sharedState: any): Promise<{
    hint: string;
    pastGuesses: string[];
    target: string;
  } | null> {
    const guesserQueue = sharedState.guesserQueue as string[];
    if (!Array.isArray(guesserQueue)) throw new Error("guesserQueue not found");

    if (guesserQueue.length === 0) {
      return null;
    }
    const hint = guesserQueue.shift() as string;
    return {
      hint,
      pastGuesses: sharedState.past_guesses ?? [],
      target: sharedState.target_word,
    };
  }

  public async execAsync(inputs: {
    hint: string;
    pastGuesses: string[];
    target: string;
  } | null): Promise<string | null> {
    if (!inputs) {
      return null;
    }

    const { hint, pastGuesses, target } = inputs;
    let prompt = `We have hint: "${hint}". Past wrong guesses: ${pastGuesses.join(", ")}. Make a new single-word guess:`;
    // In reality, you'd refine this logic or call an actual LLM
    const guess = await callLLM(prompt);
    console.log(`Guesser: I guess it's -> ${guess}`);
    return guess;
  }

  public async postAsync(
    sharedState: any,
    prepResult: { hint: string; pastGuesses: string[]; target: string } | null,
    execResult: string | null
  ): Promise<string> {
    if (!prepResult || execResult === null) {
      return "end";
    }

    // Check correctness
    const guessLower = execResult.trim().toLowerCase();
    const targetLower = prepResult.target.trim().toLowerCase();
    if (guessLower === targetLower) {
      console.log("Game Over -> Correct guess!");
      // Signal the hinter to stop
      const hinterQueue = sharedState.hinterQueue as string[];
      hinterQueue.push("GAME_OVER");
      return "end";
    }

    // If guess is wrong, add to pastGuesses
    if (!sharedState.past_guesses) {
      sharedState.past_guesses = [];
    }
    sharedState.past_guesses.push(execResult);

    // Send guess to the Hinter for feedback
    const hinterQueue = sharedState.hinterQueue as string[];
    hinterQueue.push(execResult);

    return "continue";
  }
}

// Example usage
(async () => {
  const shared = {
    target_word: "nostalgia",
    forbidden_words: ["memory", "past", "remember", "feeling", "longing"],
    hinterQueue: [] as string[],
    guesserQueue: [] as string[],
    past_guesses: [] as string[],
  };

  console.log("Game starting!");
  console.log(`Target word: ${shared.target_word}`);
  console.log(`Forbidden words: ${shared.forbidden_words}`);

  // Initialize by sending an empty guess to Hinter
  shared.hinterQueue.push("");

  const hinter = new AsyncHinter();
  const guesser = new AsyncGuesser();

  // In pocket.ts, you might have AsyncFlow (if your BaseNode variants are async).
  // For demonstration, assume Flow can handle async as well. 
  const hinterFlow = new Flow(hinter);
  const guesserFlow = new Flow(guesser);

  // Connect each node to itself to allow multiple turns
  hinter.addSuccessor(hinter, "continue");
  guesser.addSuccessor(guesser, "continue");

  // Start both flows concurrently
  // Typically you'd want a coordination mechanism like Promise.all or a dedicated runner
  hinterFlow.runAsync(shared).catch((err) => console.error("Hinter flow failed:", err));
  guesserFlow.runAsync(shared).catch((err) => console.error("Guesser flow failed:", err));
})();

Explanation: 1. Queues:
- hinterQueue carries guesses from the Guesser to the Hinter.
- guesserQueue carries hints from the Hinter to the Guesser. 2. AsyncHinter: - Awaits a guess. If "GAME_OVER", the agent ends. Otherwise, it generates a new hint and puts it into guesserQueue. 3. AsyncGuesser: - Pulls a hint from guesserQueue, generates a guess, and checks if correct.
- If correct, ends the game; otherwise, pushes the guess back to hinterQueue. 4. Loops:
- Each agent calls addSuccessor(node, "continue") to keep running in a loop until the game finishes or no more messages are available.


Building Multi-Agent Systems

  • Shared State: Store data structures like queues, agent statuses, or global game state in sharedState.
  • Flow or AsyncFlow: Each agent can be a node or sub-flow that loops for repeated interactions.
  • Communication: Use shared structures (queues or specialized abstractions) to pass messages or signals between agents.

This design enables flexible multi-agent architectures, letting you break down complex tasks among multiple specialized agents.