Communication

Nodes and Flows communicate in two ways:

  1. Shared Store – A global data structure (often an in-memory object) that all nodes can read and write. Every Node's prepAsync() and postAsync() methods receive the same shared store.
  2. Params – Each node and Flow has a unique params object assigned by the parent Flow, typically used as an identifier for tasks. It's strongly recommended to keep parameter keys and values immutable.

If you understand memory management, think of the Shared Store like a heap (shared by all function calls), and Params like a stack (assigned by the caller).

Why not use other communication models like Message Passing?

At a low-level between nodes, Message Passing works fine for simple DAGs, but in nested or cyclic Flows it becomes unwieldy. A shared store keeps things straightforward.

That said, high-level multi-agent patterns like Message Passing and Event-Driven Design can still be layered on top via Async Queues or Pub/Sub in a shared store (see Multi-Agents). {: .note }


1. Shared Store

Overview

A shared store is typically an in-memory object, like:

const shared = { data: {}, summary: {}, config: { /* ... */ } };

It can also contain local file handlers, database connections, or a combination for persistence. We recommend deciding the data structure or database schema first based on your application's requirements.

Example

import { BaseNode, Flow, DEFAULT_ACTION } from "../src/pocket";

// Placeholder for an asynchronous LLM call
async function callLLM(prompt: string): Promise<string> {
  // Example implementation
  return "This is a summary generated by the LLM.";
}

export class LoadDataNode extends BaseNode {
  public async prepAsync(sharedState: any): Promise<void> {
    // Simulate reading data from an API or disk
    sharedState.data["my_file.txt"] = "Some text content loaded asynchronously.";
  }

  public async execAsync(_: void): Promise<void> {
    // No execution needed for loading data
  }

  public async postAsync(sharedState: any, _: void, __: void): Promise<string> {
    return DEFAULT_ACTION;
  }
}

export class SummarizeNode extends BaseNode {
  public async prepAsync(sharedState: any): Promise<string> {
    // Access data loaded by LoadDataNode
    return sharedState.data["my_file.txt"] || "";
  }

  public async execAsync(content: string): Promise<string> {
    const prompt = `Summarize the following content:\n${content}`;
    return await callLLM(prompt);
  }

  public async postAsync(sharedState: any, _: string, execResult: string): Promise<string> {
    sharedState.summary["my_file.txt"] = execResult;
    return "default";
  }
}

// Instantiate nodes
const loadData = new LoadDataNode();
const summarize = new SummarizeNode();

// Define the flow
loadData.addSuccessor(summarize, "default");
const flow = new Flow(loadData);

// Initial shared state
const shared = { data: {}, summary: {}, config: {} };

// Run the flow
flow.runAsync(shared).then(() => {
  console.log("Summary:", shared.summary["my_file.txt"]);
}).catch(error => {
  console.error("Flow execution failed:", error);
});

Explanation:

  • LoadDataNode:
  • prepAsync(): Asynchronously loads data into the sharedState.data object.
  • execAsync(): No operation needed; data loading is handled in prepAsync().
  • postAsync(): Transitions to the next node by returning DEFAULT_ACTION.

  • SummarizeNode:

  • prepAsync(): Retrieves the loaded data from sharedState.
  • execAsync(): Calls an asynchronous LLM function to summarize the content.
  • postAsync(): Stores the summary in sharedState.summary and transitions to the next action.

  • Flow Execution:

  • The flow starts with LoadDataNode, which loads data.
  • Upon completion, it transitions to SummarizeNode to process and summarize the data.
  • After execution, the summary is available in shared.summary["my_file.txt"].

2. Params

Params allow you to store per-Node or per-Flow configuration that doesn't need to reside in the shared store. They are:

  • Immutable during a Node's run cycle (i.e., they don't change mid-prepAsync, execAsync, postAsync).
  • Set via setParams().
  • Cleared and updated each time a parent Flow calls it.

Only set the uppermost Flow params because others will be overwritten by the parent Flow. If you need to set child node params, see Batch. {: .warning }

Typically, Params are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store.

Example

import { BaseNode, Flow, DEFAULT_ACTION } from "../src/pocket";

// Placeholder for an asynchronous LLM call
async function callLLM(prompt: string): Promise<string> {
  // Example implementation
  return `Summary for ${prompt}`;
}

export class SummarizeFileNode extends BaseNode {
  public async prepAsync(sharedState: any): Promise<string> {
    // Access the node's params
    const filename = this.params["filename"];
    return sharedState.data[filename] || "";
  }

  public async execAsync(content: string): Promise<string> {
    const prompt = `Summarize the following content:\n${content}`;
    return await callLLM(prompt);
  }

  public async postAsync(sharedState: any, _: string, execResult: string): Promise<string> {
    const filename = this.params["filename"];
    sharedState.summary[filename] = execResult;
    return DEFAULT_ACTION;
  }
}

// Instantiate the node
const summarizeFile = new SummarizeFileNode();

// Set Node params directly (for testing)
summarizeFile.setParams({ filename: "doc1.txt" });

// Define the flow
const flow = new Flow(summarizeFile);

// Initial shared state
const shared = { data: { "doc1.txt": "Content of document 1." }, summary: {} };

// Run the flow
flow.runAsync(shared).then(() => {
  console.log("Summary:", shared.summary["doc1.txt"]);
}).catch(error => {
  console.error("Flow execution failed:", error);
});

// Alternatively, set Flow params (overwrites node params)
const flowWithParams = new Flow(summarizeFile);
flowWithParams.setParams({ filename: "doc2.txt" });
flowWithParams.runAsync({ data: { "doc2.txt": "Content of document 2." }, summary: {} }).then(() => {
  console.log("Summary:", shared.summary["doc2.txt"]);
}).catch(error => {
  console.error("Flow execution failed:", error);
});

Explanation:

  1. SummarizeFileNode:
  2. prepAsync(): Retrieves the filename from this.params and fetches the corresponding data from sharedState.
  3. execAsync(): Calls an asynchronous LLM function to summarize the content.
  4. postAsync(): Stores the summary in sharedState.summary using the filename as the key.

  5. Flow Execution:

  6. Node Params: Directly setting params on the node (doc1.txt) and running the flow.
  7. Flow Params: Setting params on the flow (doc2.txt), which overwrites the node's params, and running the flow.

3. Shared Store vs. Params

Think of the Shared Store like a heap and Params like a stack.

  • Shared Store:
  • Public, Global:
    • Accessible by all nodes within the flow.
  • Pre-populated:
    • Can be initialized with data before the flow starts.
  • Use Cases:
    • Data results, large content, or any information multiple nodes need.
  • Organization:

    • Structure it carefully, similar to designing a mini schema for clarity and maintainability.
  • Params:

  • Local, Ephemeral:
    • Specific to each node or flow instance.
  • Set by Parent Flows:
    • Assigned by the flow managing the node.
  • Use Cases:
    • Small values like filenames, page numbers, or identifiers.
  • Characteristics:
    • Immutable during execution.
    • Do not persist across different nodes and are reset with each new flow run.

Key Differences

Feature Shared Store Params
Scope Global within the flow Local to the node or flow instance
Mutability Mutable Immutable during execution
Initialization Can be pre-populated Set dynamically by parent flows
Use Cases Large data, results, shared info Identifiers, small configuration data
Persistence Can include persistent connections Temporary and ephemeral

Summary

By converting your Python-based communication examples to TypeScript, you can take full advantage of TypeScript's strong typing and modern asynchronous features. The provided examples demonstrate how to implement Shared Store and Params mechanisms within your pocket.ts framework, ensuring efficient and organized inter-node communication.

Key Points:

  • Shared Store:
  • Facilitates global data sharing across nodes.
  • Ideal for storing large datasets, results, and shared configurations.
  • Must be carefully structured to maintain clarity.

  • Params:

  • Enables passing specific identifiers or configuration data to individual nodes.
  • Ensures immutability during node execution for consistency.
  • Best suited for transient data like filenames or user inputs.

Next Steps:

  • Implement Actual Logic: Replace placeholder functions like callLLM with real implementations that interact with your services.
  • Enhance Error Handling: Incorporate comprehensive error checks and handling within your nodes to manage potential failures gracefully.
  • Optimize Data Structures: Design and standardize your shared store's data structures to suit various application needs.
  • Expand Documentation: Continue documenting other core abstractions and features of your framework to maintain consistency and clarity.

Feel free to further customize these examples to fit your project's specific requirements. If you have any questions or need additional assistance, don't hesitate to ask!