From bd6136ed54f8dbbd1d3487ab886d64201fa67b40 Mon Sep 17 00:00:00 2001 From: zachary62 Date: Tue, 11 Mar 2025 16:16:05 -0400 Subject: [PATCH] para --- cookbook/parallel_exp.ipynb | 204 ++++++++++++++++++++++++++++++++++++ 1 file changed, 204 insertions(+) create mode 100644 cookbook/parallel_exp.ipynb diff --git a/cookbook/parallel_exp.ipynb b/cookbook/parallel_exp.ipynb new file mode 100644 index 0000000..751e7bc --- /dev/null +++ b/cookbook/parallel_exp.ipynb @@ -0,0 +1,204 @@ +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "colab": { + "provenance": [] + }, + "kernelspec": { + "name": "python3", + "display_name": "Python 3" + }, + "language_info": { + "name": "python" + } + }, + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "ki9N8iqRxu0I", + "outputId": "fd1628a5-d2a4-44a4-89b4-31151d21c8f3" + }, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "Collecting pocketflow\n", + " Downloading pocketflow-0.0.1-py3-none-any.whl.metadata (270 bytes)\n", + "Downloading pocketflow-0.0.1-py3-none-any.whl (3.3 kB)\n", + "Installing collected packages: pocketflow\n", + "Successfully installed pocketflow-0.0.1\n" + ] + } + ], + "source": [ + "pip install pocketflow" + ] + }, + { + "cell_type": "code", + "source": [ + "import asyncio\n", + "import time\n", + "\n", + "from pocketflow import AsyncBatchNode, AsyncParallelBatchNode, AsyncFlow\n", + "\n", + "####################################\n", + "# Dummy async function (1s delay)\n", + "####################################\n", + "async def dummy_llm_summarize(text):\n", + " \"\"\"Simulates an async LLM call that takes 1 second.\"\"\"\n", + " await asyncio.sleep(1)\n", + " return f\"Summarized({len(text)} chars)\"\n", + "\n", + "###############################################\n", + "# 1) AsyncBatchNode (sequential) version\n", + "###############################################\n", + "\n", + "class SummariesAsyncNode(AsyncBatchNode):\n", + " \"\"\"\n", + " Processes items sequentially in an async manner.\n", + " The next item won't start until the previous item has finished.\n", + " \"\"\"\n", + "\n", + " async def prep_async(self, shared):\n", + " # Return a list of items to process.\n", + " # Each item is (filename, content).\n", + " return list(shared[\"data\"].items())\n", + "\n", + " async def exec_async(self, item):\n", + " filename, content = item\n", + " print(f\"[Sequential] Summarizing {filename}...\")\n", + " summary = await dummy_llm_summarize(content)\n", + " return (filename, summary)\n", + "\n", + " async def post_async(self, shared, prep_res, exec_res_list):\n", + " # exec_res_list is a list of (filename, summary)\n", + " shared[\"sequential_summaries\"] = dict(exec_res_list)\n", + " return \"done_sequential\"\n", + "\n", + "###############################################\n", + "# 2) AsyncParallelBatchNode (concurrent) version\n", + "###############################################\n", + "\n", + "class SummariesAsyncParallelNode(AsyncParallelBatchNode):\n", + " \"\"\"\n", + " Processes items in parallel. Many LLM calls start at once.\n", + " \"\"\"\n", + "\n", + " async def prep_async(self, shared):\n", + " return list(shared[\"data\"].items())\n", + "\n", + " async def exec_async(self, item):\n", + " filename, content = item\n", + " print(f\"[Parallel] Summarizing {filename}...\")\n", + " summary = await dummy_llm_summarize(content)\n", + " return (filename, summary)\n", + "\n", + " async def post_async(self, shared, prep_res, exec_res_list):\n", + " shared[\"parallel_summaries\"] = dict(exec_res_list)\n", + " return \"done_parallel\"\n", + "\n", + "###############################################\n", + "# Demo comparing the two approaches\n", + "###############################################\n", + "\n", + "async def main():\n", + " # We'll use the same data for both flows\n", + " shared_data = {\n", + " \"data\": {\n", + " \"file1.txt\": \"Hello world 1\",\n", + " \"file2.txt\": \"Hello world 2\",\n", + " \"file3.txt\": \"Hello world 3\",\n", + " }\n", + " }\n", + "\n", + " # 1) Run the sequential version\n", + " seq_node = SummariesAsyncNode()\n", + " seq_flow = AsyncFlow(start=seq_node)\n", + "\n", + " print(\"\\n=== Running Sequential (AsyncBatchNode) ===\")\n", + " t0 = time.time()\n", + " await seq_flow.run_async(shared_data)\n", + " t1 = time.time()\n", + "\n", + " # 2) Run the parallel version\n", + " par_node = SummariesAsyncParallelNode()\n", + " par_flow = AsyncFlow(start=par_node)\n", + "\n", + " print(\"\\n=== Running Parallel (AsyncParallelBatchNode) ===\")\n", + " t2 = time.time()\n", + " await par_flow.run_async(shared_data)\n", + " t3 = time.time()\n", + "\n", + " # Show times\n", + " print(\"\\n--- Results ---\")\n", + " print(f\"Sequential Summaries: {shared_data.get('sequential_summaries')}\")\n", + " print(f\"Parallel Summaries: {shared_data.get('parallel_summaries')}\")\n", + "\n", + " print(f\"Sequential took: {t1 - t0:.2f} seconds\")\n", + " print(f\"Parallel took: {t3 - t2:.2f} seconds\")\n" + ], + "metadata": { + "id": "mHZpGv8txy4L" + }, + "execution_count": 3, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "# if in a py project\n", + "# asyncio.run(main())\n", + "await main()" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "zfnhW3f-0W6o", + "outputId": "3737e2e5-5cae-4c6b-a894-e880cf338d1f" + }, + "execution_count": 5, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "\n", + "=== Running Sequential (AsyncBatchNode) ===\n", + "[Sequential] Summarizing file1.txt...\n", + "[Sequential] Summarizing file2.txt...\n", + "[Sequential] Summarizing file3.txt...\n", + "\n", + "=== Running Parallel (AsyncParallelBatchNode) ===\n", + "[Parallel] Summarizing file1.txt...\n", + "[Parallel] Summarizing file2.txt...\n", + "[Parallel] Summarizing file3.txt...\n", + "\n", + "--- Results ---\n", + "Sequential Summaries: {'file1.txt': 'Summarized(13 chars)', 'file2.txt': 'Summarized(13 chars)', 'file3.txt': 'Summarized(13 chars)'}\n", + "Parallel Summaries: {'file1.txt': 'Summarized(13 chars)', 'file2.txt': 'Summarized(13 chars)', 'file3.txt': 'Summarized(13 chars)'}\n", + "Sequential took: 3.00 seconds\n", + "Parallel took: 1.00 seconds\n" + ] + } + ] + }, + { + "cell_type": "code", + "source": [], + "metadata": { + "id": "ystwa74D0Z_k" + }, + "execution_count": null, + "outputs": [] + } + ] +} \ No newline at end of file