{ "nbformat": 4, "nbformat_minor": 0, "metadata": { "colab": { "provenance": [] }, "kernelspec": { "name": "python3", "display_name": "Python 3" }, "language_info": { "name": "python" } }, "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "ki9N8iqRxu0I", "outputId": "fd1628a5-d2a4-44a4-89b4-31151d21c8f3" }, "outputs": [ { "output_type": "stream", "name": "stdout", "text": [ "Collecting pocketflow\n", " Downloading pocketflow-0.0.1-py3-none-any.whl.metadata (270 bytes)\n", "Downloading pocketflow-0.0.1-py3-none-any.whl (3.3 kB)\n", "Installing collected packages: pocketflow\n", "Successfully installed pocketflow-0.0.1\n" ] } ], "source": [ "pip install pocketflow" ] }, { "cell_type": "code", "source": [ "import asyncio\n", "import time\n", "\n", "from pocketflow import AsyncBatchNode, AsyncParallelBatchNode, AsyncFlow\n", "\n", "####################################\n", "# Dummy async function (1s delay)\n", "####################################\n", "async def dummy_llm_summarize(text):\n", " \"\"\"Simulates an async LLM call that takes 1 second.\"\"\"\n", " await asyncio.sleep(1)\n", " return f\"Summarized({len(text)} chars)\"\n", "\n", "###############################################\n", "# 1) AsyncBatchNode (sequential) version\n", "###############################################\n", "\n", "class SummariesAsyncNode(AsyncBatchNode):\n", " \"\"\"\n", " Processes items sequentially in an async manner.\n", " The next item won't start until the previous item has finished.\n", " \"\"\"\n", "\n", " async def prep_async(self, shared):\n", " # Return a list of items to process.\n", " # Each item is (filename, content).\n", " return list(shared[\"data\"].items())\n", "\n", " async def exec_async(self, item):\n", " filename, content = item\n", " print(f\"[Sequential] Summarizing {filename}...\")\n", " summary = await dummy_llm_summarize(content)\n", " return (filename, summary)\n", "\n", " async def post_async(self, shared, prep_res, exec_res_list):\n", " # exec_res_list is a list of (filename, summary)\n", " shared[\"sequential_summaries\"] = dict(exec_res_list)\n", " return \"done_sequential\"\n", "\n", "###############################################\n", "# 2) AsyncParallelBatchNode (concurrent) version\n", "###############################################\n", "\n", "class SummariesAsyncParallelNode(AsyncParallelBatchNode):\n", " \"\"\"\n", " Processes items in parallel. Many LLM calls start at once.\n", " \"\"\"\n", "\n", " async def prep_async(self, shared):\n", " return list(shared[\"data\"].items())\n", "\n", " async def exec_async(self, item):\n", " filename, content = item\n", " print(f\"[Parallel] Summarizing {filename}...\")\n", " summary = await dummy_llm_summarize(content)\n", " return (filename, summary)\n", "\n", " async def post_async(self, shared, prep_res, exec_res_list):\n", " shared[\"parallel_summaries\"] = dict(exec_res_list)\n", " return \"done_parallel\"\n", "\n", "###############################################\n", "# Demo comparing the two approaches\n", "###############################################\n", "\n", "async def main():\n", " # We'll use the same data for both flows\n", " shared_data = {\n", " \"data\": {\n", " \"file1.txt\": \"Hello world 1\",\n", " \"file2.txt\": \"Hello world 2\",\n", " \"file3.txt\": \"Hello world 3\",\n", " }\n", " }\n", "\n", " # 1) Run the sequential version\n", " seq_node = SummariesAsyncNode()\n", " seq_flow = AsyncFlow(start=seq_node)\n", "\n", " print(\"\\n=== Running Sequential (AsyncBatchNode) ===\")\n", " t0 = time.time()\n", " await seq_flow.run_async(shared_data)\n", " t1 = time.time()\n", "\n", " # 2) Run the parallel version\n", " par_node = SummariesAsyncParallelNode()\n", " par_flow = AsyncFlow(start=par_node)\n", "\n", " print(\"\\n=== Running Parallel (AsyncParallelBatchNode) ===\")\n", " t2 = time.time()\n", " await par_flow.run_async(shared_data)\n", " t3 = time.time()\n", "\n", " # Show times\n", " print(\"\\n--- Results ---\")\n", " print(f\"Sequential Summaries: {shared_data.get('sequential_summaries')}\")\n", " print(f\"Parallel Summaries: {shared_data.get('parallel_summaries')}\")\n", "\n", " print(f\"Sequential took: {t1 - t0:.2f} seconds\")\n", " print(f\"Parallel took: {t3 - t2:.2f} seconds\")\n" ], "metadata": { "id": "mHZpGv8txy4L" }, "execution_count": 3, "outputs": [] }, { "cell_type": "code", "source": [ "# if in a py project\n", "# asyncio.run(main())\n", "await main()" ], "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "zfnhW3f-0W6o", "outputId": "3737e2e5-5cae-4c6b-a894-e880cf338d1f" }, "execution_count": 5, "outputs": [ { "output_type": "stream", "name": "stdout", "text": [ "\n", "=== Running Sequential (AsyncBatchNode) ===\n", "[Sequential] Summarizing file1.txt...\n", "[Sequential] Summarizing file2.txt...\n", "[Sequential] Summarizing file3.txt...\n", "\n", "=== Running Parallel (AsyncParallelBatchNode) ===\n", "[Parallel] Summarizing file1.txt...\n", "[Parallel] Summarizing file2.txt...\n", "[Parallel] Summarizing file3.txt...\n", "\n", "--- Results ---\n", "Sequential Summaries: {'file1.txt': 'Summarized(13 chars)', 'file2.txt': 'Summarized(13 chars)', 'file3.txt': 'Summarized(13 chars)'}\n", "Parallel Summaries: {'file1.txt': 'Summarized(13 chars)', 'file2.txt': 'Summarized(13 chars)', 'file3.txt': 'Summarized(13 chars)'}\n", "Sequential took: 3.00 seconds\n", "Parallel took: 1.00 seconds\n" ] } ] }, { "cell_type": "code", "source": [], "metadata": { "id": "ystwa74D0Z_k" }, "execution_count": null, "outputs": [] } ] }