Flow
A Flow orchestrates how Nodes connect and run, based on Actions returned from each Node's postAsync()
method. You can chain Nodes in a sequence or create branching logic depending on the Action string.
1. Action-based Transitions
Each Node's postAsync(shared, prepResult, execResult)
method returns an Action string. By default, if postAsync()
doesn't explicitly return anything, we treat that as "default"
.
You define transitions with the syntax:
-
Basic default transition:
nodeA >> nodeB
This means ifnodeA.postAsync()
returns"default"
(orundefined
), proceed tonodeB
.
(Equivalent tonodeA - "default" >> nodeB
) -
Named action transition:
nodeA - "actionName" >> nodeB
This means ifnodeA.postAsync()
returns"actionName"
, proceed tonodeB
.
It's possible to create loops, branching, or multi-step flows.
2. Creating a Flow
A Flow begins with a start node (or another Flow). You create it using new Flow(startNode)
to specify the entry point. When you call flow.runAsync(sharedState)
, it executes the first node, looks at its postAsync()
return Action, follows the corresponding transition, and continues until there's no next node or you explicitly stop.
Example: Simple Sequence
Here's a minimal flow of two nodes in a chain:
import { BaseNode, Flow, DEFAULT_ACTION } from "../src/pocket";
// Define NodeA
class NodeA extends BaseNode {
public async prepAsync(sharedState: any): Promise<void> {
// Preparation logic for NodeA
}
public async execAsync(_: void): Promise<void> {
// Execution logic for NodeA
}
public async postAsync(sharedState: any, _: void, __: void): Promise<string> {
// Transition to NodeB
return "default";
}
}
// Define NodeB
class NodeB extends BaseNode {
public async prepAsync(sharedState: any): Promise<void> {
// Preparation logic for NodeB
}
public async execAsync(_: void): Promise<void> {
// Execution logic for NodeB
}
public async postAsync(sharedState: any, _: void, __: void): Promise<string> {
// No further nodes to transition to
return DEFAULT_ACTION;
}
}
// Instantiate nodes
const nodeA = new NodeA();
const nodeB = new NodeB();
// Define the flow connections
nodeA.addSuccessor(nodeB, "default");
// Create the flow starting with nodeA
const flow = new Flow(nodeA);
// Initial shared state
const sharedState = {};
// Run the flow
flow.runAsync(sharedState).then(() => {
console.log("Flow completed successfully.");
}).catch(error => {
console.error("Flow execution failed:", error);
});
- When you run the flow, it executes
NodeA
. - Suppose
NodeA.postAsync()
returns"default"
. - The flow then sees the
"default"
Action is linked toNodeB
and runsNodeB
. - If
NodeB.postAsync()
returns"default"
but we didn't defineNodeB >> somethingElse
, the flow ends there.
Example: Branching & Looping
Here's a simple expense approval flow that demonstrates branching and looping. The ReviewExpenseNode
can return three possible Actions:
"approved"
: Expense is approved, move to payment processing."needs_revision"
: Expense needs changes, send back for revision."rejected"
: Expense is denied, finish the process.
We can wire them like this:
import { BaseNode, Flow, DEFAULT_ACTION } from "../src/pocket";
// Define ReviewExpenseNode
class ReviewExpenseNode extends BaseNode {
public async prepAsync(sharedState: any): Promise<void> {
// Prepare expense data
}
public async execAsync(_: void): Promise<void> {
// Execute review logic
}
public async postAsync(sharedState: any, prepResult: void, execResult: void): Promise<string> {
// Example decision logic
const decision = "approved"; // Replace with actual decision-making
return decision;
}
}
// Define PaymentNode
class PaymentNode extends BaseNode {
public async prepAsync(sharedState: any): Promise<void> {
// Prepare payment data
}
public async execAsync(_: void): Promise<void> {
// Execute payment processing
}
public async postAsync(sharedState: any, prepResult: void, execResult: void): Promise<string> {
return DEFAULT_ACTION;
}
}
// Define ReviseExpenseNode
class ReviseExpenseNode extends BaseNode {
public async prepAsync(sharedState: any): Promise<void> {
// Prepare revision data
}
public async execAsync(_: void): Promise<void> {
// Execute revision logic
}
public async postAsync(sharedState: any, prepResult: void, execResult: void): Promise<string> {
return "needs_revision";
}
}
// Define FinishNode
class FinishNode extends BaseNode {
public async prepAsync(sharedState: any): Promise<void> {
// Prepare finish data
}
public async execAsync(_: void): Promise<void> {
// Execute finish logic
}
public async postAsync(sharedState: any, prepResult: void, execResult: void): Promise<string> {
return DEFAULT_ACTION;
}
}
// Instantiate nodes
const reviewExpense = new ReviewExpenseNode();
const payment = new PaymentNode();
const reviseExpense = new ReviseExpenseNode();
const finish = new FinishNode();
// Define the flow connections
reviewExpense.addSuccessor(payment, "approved");
reviewExpense.addSuccessor(reviseExpense, "needs_revision");
reviewExpense.addSuccessor(finish, "rejected");
reviseExpense.addSuccessor(reviewExpense, "needs_revision"); // Loop back for revision
payment.addSuccessor(finish, "default"); // Proceed to finish after payment
// Create the flow starting with reviewExpense
const expenseFlow = new Flow(reviewExpense);
// Initial shared state
const sharedState = {};
// Run the flow
expenseFlow.runAsync(sharedState).then(() => {
console.log("Expense flow completed successfully.");
}).catch(error => {
console.error("Expense flow execution failed:", error);
});
Flow Diagram
flowchart TD
reviewExpense[Review Expense] -->|approved| payment[Process Payment]
reviewExpense -->|needs_revision| reviseExpense[Revise Report]
reviewExpense -->|rejected| finish[Finish Process]
reviseExpense --> reviewExpense
payment --> finish
Running Individual Nodes vs. Running a Flow
-
node.runAsync(sharedState)
:
Just runs that node alone (callsprepAsync()
,execAsync()
,postAsync()
), and returns an Action.
Use this for debugging or testing a single node. -
flow.runAsync(sharedState)
:
Executes from the start node, follows Actions to the next node, and so on until the flow can't continue (no next node or no next Action).
Use this in production to ensure the full pipeline runs correctly.
Warning:
node.runAsync(sharedState)
does not proceed automatically to the successor and may use incorrect parameters.
Always useflow.runAsync(sharedState)
in production.
3. Nested Flows
A Flow can act like a Node, enabling powerful composition patterns:
// Define sub-flow nodes
const nodeA = new NodeA();
const nodeB = new NodeB();
nodeA.addSuccessor(nodeB);
// Create sub-flow
const subFlow = new Flow(nodeA);
// Connect sub-flow to another node
const nodeC = new NodeC();
subFlow.addSuccessor(nodeC);
// Create parent flow
const parentFlow = new Flow(subFlow);
When parentFlow.run(sharedState)
executes:
1. It starts the subFlow
2. subFlow
runs through its nodes (NodeA
then NodeB
)
3. After subFlow
completes, execution continues to NodeC
Summary
- Flow: Orchestrates Node execution based on Actions
- Action-based Transitions: Define Node connections using
addSuccessor()
- Nested Flows: Flows can be used as Nodes within other Flows
Remember to handle errors appropriately and test your implementations thoroughly.
4. Specialized Flows
In addition to the basic Flow
, your framework supports specialized flows like AsyncFlow
, BatchFlow
, and more, allowing for complex orchestration patterns. These specialized flows can handle asynchronous operations, batch processing, and other advanced scenarios seamlessly within your application.
Example: Nested Flows in Order Processing
Continuing from the previous order processing example, suppose you want to add another layer of processing, such as logging and notification. You can create additional sub-flows or combine existing ones.
import { BaseNode, Flow, DEFAULT_ACTION } from "../src/pocket";
// Define LoggingNode
class LoggingNode extends BaseNode {
public async prepAsync(sharedState: any): Promise<void> {
// Prepare logging data
}
public async execAsync(_: void): Promise<void> {
// Execute logging logic
}
public async postAsync(sharedState: any, _, __: void): Promise<string> {
console.log(`Order ${sharedState.orderId} processed for ${sharedState.customer}`);
return DEFAULT_ACTION;
}
}
// Define NotificationNode
class NotificationNode extends BaseNode {
public async prepAsync(sharedState: any): Promise<void> {
// Prepare notification data
}
public async execAsync(_: void): Promise<void> {
// Execute notification logic
}
public async postAsync(sharedState: any, _, __: void): Promise<string> {
// Send notification
return DEFAULT_ACTION;
}
}
// Instantiate logging and notification nodes
const loggingNode = new LoggingNode();
const notificationNode = new NotificationNode();
// Connect shippingFlow to logging and notification
shippingFlow.addSuccessor(loggingNode, "default");
loggingNode.addSuccessor(notificationNode, "default");
// Update the master flow to include logging and notification
orderProcessingFlow.addSuccessor(loggingNode, "default");
loggingNode.addSuccessor(notificationNode, "default");
// Run the updated flow
orderProcessingFlow.runAsync(sharedState).then(() => {
console.log("Order processing with logging and notification completed successfully.");
}).catch(error => {
console.error("Order processing failed:", error);
});
Flow Diagram with Nested Flows
flowchart LR
subgraph orderProcessingFlow["Order Processing Flow"]
subgraph paymentFlow["Payment Flow"]
A[Validate Payment] -->|process_payment| B[Process Payment]
B -->|payment_confirmation| C[Payment Confirmation]
A -->|reject_payment| D[Handle Rejection]
end
subgraph inventoryFlow["Inventory Flow"]
E[Check Stock] -->|reserve_items| F[Reserve Items]
F -->|update_inventory| G[Update Inventory]
E -->|out_of_stock| H[Handle Out of Stock]
end
subgraph shippingFlow["Shipping Flow"]
I[Create Label] -->|assign_carrier| J[Assign Carrier]
J -->|schedule_pickup| K[Schedule Pickup]
end
subgraph loggingFlow["Logging & Notification Flow"]
L[Logging] -->|default| M[Notification]
end
paymentFlow --> inventoryFlow
inventoryFlow --> shippingFlow
shippingFlow --> loggingFlow
end
Summary
By converting your Python-based Flow examples to TypeScript, you can leverage TypeScript's strong typing and modern asynchronous features. The provided examples demonstrate how to implement Flows, handle Action-based Transitions, and manage Nested Flows within your pocket.ts
framework, ensuring efficient and organized workflow orchestration.
Key Points:
-
Flow:
Orchestrates the execution of Nodes based on Actions returned by each node'spostAsync()
method. -
Action-based Transitions:
Define how Nodes transition to one another based on specific Action strings. -
Nested Flows:
Allows flows to act as Nodes within other flows, enabling complex and reusable workflow patterns. -
Running Flows:
Useflow.runAsync(sharedState)
to execute the entire pipeline, ensuring all transitions and nodes are processed correctly.
Next Steps:
- Implement Actual Logic:
Replace placeholder functions likecallLLM