Communication
Nodes and Flows communicate in two ways:
- Shared Store – A global data structure (often an in-memory object) that all nodes can read and write. Every Node's
prepAsync()
andpostAsync()
methods receive the sameshared
store. - Params – Each node and Flow has a unique
params
object assigned by the parent Flow, typically used as an identifier for tasks. It's strongly recommended to keep parameter keys and values immutable.
If you understand memory management, think of the Shared Store like a heap (shared by all function calls), and Params like a stack (assigned by the caller).
Why not use other communication models like Message Passing?
At a low-level between nodes, Message Passing works fine for simple DAGs, but in nested or cyclic Flows it becomes unwieldy. A shared store keeps things straightforward.
That said, high-level multi-agent patterns like Message Passing and Event-Driven Design can still be layered on top via Async Queues or Pub/Sub in a shared store (see Multi-Agents). {: .note }
1. Shared Store
Overview
A shared store is typically an in-memory object, like:
const shared = { data: {}, summary: {}, config: { /* ... */ } };
It can also contain local file handlers, database connections, or a combination for persistence. We recommend deciding the data structure or database schema first based on your application's requirements.
Example
import { BaseNode, Flow, DEFAULT_ACTION } from "../src/pocket";
// Placeholder for an asynchronous LLM call
async function callLLM(prompt: string): Promise<string> {
// Example implementation
return "This is a summary generated by the LLM.";
}
export class LoadDataNode extends BaseNode {
public async prepAsync(sharedState: any): Promise<void> {
// Simulate reading data from an API or disk
sharedState.data["my_file.txt"] = "Some text content loaded asynchronously.";
}
public async execAsync(_: void): Promise<void> {
// No execution needed for loading data
}
public async postAsync(sharedState: any, _: void, __: void): Promise<string> {
return DEFAULT_ACTION;
}
}
export class SummarizeNode extends BaseNode {
public async prepAsync(sharedState: any): Promise<string> {
// Access data loaded by LoadDataNode
return sharedState.data["my_file.txt"] || "";
}
public async execAsync(content: string): Promise<string> {
const prompt = `Summarize the following content:\n${content}`;
return await callLLM(prompt);
}
public async postAsync(sharedState: any, _: string, execResult: string): Promise<string> {
sharedState.summary["my_file.txt"] = execResult;
return "default";
}
}
// Instantiate nodes
const loadData = new LoadDataNode();
const summarize = new SummarizeNode();
// Define the flow
loadData.addSuccessor(summarize, "default");
const flow = new Flow(loadData);
// Initial shared state
const shared = { data: {}, summary: {}, config: {} };
// Run the flow
flow.runAsync(shared).then(() => {
console.log("Summary:", shared.summary["my_file.txt"]);
}).catch(error => {
console.error("Flow execution failed:", error);
});
Explanation:
- LoadDataNode:
- prepAsync(): Asynchronously loads data into the
sharedState.data
object. - execAsync(): No operation needed; data loading is handled in
prepAsync()
. -
postAsync(): Transitions to the next node by returning
DEFAULT_ACTION
. -
SummarizeNode:
- prepAsync(): Retrieves the loaded data from
sharedState
. - execAsync(): Calls an asynchronous LLM function to summarize the content.
-
postAsync(): Stores the summary in
sharedState.summary
and transitions to the next action. -
Flow Execution:
- The flow starts with
LoadDataNode
, which loads data. - Upon completion, it transitions to
SummarizeNode
to process and summarize the data. - After execution, the summary is available in
shared.summary["my_file.txt"]
.
2. Params
Params allow you to store per-Node or per-Flow configuration that doesn't need to reside in the shared store. They are:
- Immutable during a Node's run cycle (i.e., they don't change mid-
prepAsync
,execAsync
,postAsync
). - Set via
setParams()
. - Cleared and updated each time a parent Flow calls it.
Only set the uppermost Flow params because others will be overwritten by the parent Flow. If you need to set child node params, see Batch. {: .warning }
Typically, Params are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store.
Example
import { BaseNode, Flow, DEFAULT_ACTION } from "../src/pocket";
// Placeholder for an asynchronous LLM call
async function callLLM(prompt: string): Promise<string> {
// Example implementation
return `Summary for ${prompt}`;
}
export class SummarizeFileNode extends BaseNode {
public async prepAsync(sharedState: any): Promise<string> {
// Access the node's params
const filename = this.params["filename"];
return sharedState.data[filename] || "";
}
public async execAsync(content: string): Promise<string> {
const prompt = `Summarize the following content:\n${content}`;
return await callLLM(prompt);
}
public async postAsync(sharedState: any, _: string, execResult: string): Promise<string> {
const filename = this.params["filename"];
sharedState.summary[filename] = execResult;
return DEFAULT_ACTION;
}
}
// Instantiate the node
const summarizeFile = new SummarizeFileNode();
// Set Node params directly (for testing)
summarizeFile.setParams({ filename: "doc1.txt" });
// Define the flow
const flow = new Flow(summarizeFile);
// Initial shared state
const shared = { data: { "doc1.txt": "Content of document 1." }, summary: {} };
// Run the flow
flow.runAsync(shared).then(() => {
console.log("Summary:", shared.summary["doc1.txt"]);
}).catch(error => {
console.error("Flow execution failed:", error);
});
// Alternatively, set Flow params (overwrites node params)
const flowWithParams = new Flow(summarizeFile);
flowWithParams.setParams({ filename: "doc2.txt" });
flowWithParams.runAsync({ data: { "doc2.txt": "Content of document 2." }, summary: {} }).then(() => {
console.log("Summary:", shared.summary["doc2.txt"]);
}).catch(error => {
console.error("Flow execution failed:", error);
});
Explanation:
- SummarizeFileNode:
- prepAsync(): Retrieves the filename from
this.params
and fetches the corresponding data fromsharedState
. - execAsync(): Calls an asynchronous LLM function to summarize the content.
-
postAsync(): Stores the summary in
sharedState.summary
using the filename as the key. -
Flow Execution:
- Node Params: Directly setting params on the node (
doc1.txt
) and running the flow. - Flow Params: Setting params on the flow (
doc2.txt
), which overwrites the node's params, and running the flow.
3. Shared Store vs. Params
Think of the Shared Store like a heap and Params like a stack.
- Shared Store:
- Public, Global:
- Accessible by all nodes within the flow.
- Pre-populated:
- Can be initialized with data before the flow starts.
- Use Cases:
- Data results, large content, or any information multiple nodes need.
-
Organization:
- Structure it carefully, similar to designing a mini schema for clarity and maintainability.
-
Params:
- Local, Ephemeral:
- Specific to each node or flow instance.
- Set by Parent Flows:
- Assigned by the flow managing the node.
- Use Cases:
- Small values like filenames, page numbers, or identifiers.
- Characteristics:
- Immutable during execution.
- Do not persist across different nodes and are reset with each new flow run.
Key Differences
Feature | Shared Store | Params |
---|---|---|
Scope | Global within the flow | Local to the node or flow instance |
Mutability | Mutable | Immutable during execution |
Initialization | Can be pre-populated | Set dynamically by parent flows |
Use Cases | Large data, results, shared info | Identifiers, small configuration data |
Persistence | Can include persistent connections | Temporary and ephemeral |
Summary
By converting your Python-based communication examples to TypeScript, you can take full advantage of TypeScript's strong typing and modern asynchronous features. The provided examples demonstrate how to implement Shared Store and Params mechanisms within your pocket.ts
framework, ensuring efficient and organized inter-node communication.
Key Points:
- Shared Store:
- Facilitates global data sharing across nodes.
- Ideal for storing large datasets, results, and shared configurations.
-
Must be carefully structured to maintain clarity.
-
Params:
- Enables passing specific identifiers or configuration data to individual nodes.
- Ensures immutability during node execution for consistency.
- Best suited for transient data like filenames or user inputs.
Next Steps:
- Implement Actual Logic: Replace placeholder functions like
callLLM
with real implementations that interact with your services. - Enhance Error Handling: Incorporate comprehensive error checks and handling within your nodes to manage potential failures gracefully.
- Optimize Data Structures: Design and standardize your shared store's data structures to suit various application needs.
- Expand Documentation: Continue documenting other core abstractions and features of your framework to maintain consistency and clarity.
Feel free to further customize these examples to fit your project's specific requirements. If you have any questions or need additional assistance, don't hesitate to ask!