para
This commit is contained in:
parent
07b68556a1
commit
bd6136ed54
|
|
@ -0,0 +1,204 @@
|
|||
{
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 0,
|
||||
"metadata": {
|
||||
"colab": {
|
||||
"provenance": []
|
||||
},
|
||||
"kernelspec": {
|
||||
"name": "python3",
|
||||
"display_name": "Python 3"
|
||||
},
|
||||
"language_info": {
|
||||
"name": "python"
|
||||
}
|
||||
},
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"metadata": {
|
||||
"colab": {
|
||||
"base_uri": "https://localhost:8080/"
|
||||
},
|
||||
"id": "ki9N8iqRxu0I",
|
||||
"outputId": "fd1628a5-d2a4-44a4-89b4-31151d21c8f3"
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"output_type": "stream",
|
||||
"name": "stdout",
|
||||
"text": [
|
||||
"Collecting pocketflow\n",
|
||||
" Downloading pocketflow-0.0.1-py3-none-any.whl.metadata (270 bytes)\n",
|
||||
"Downloading pocketflow-0.0.1-py3-none-any.whl (3.3 kB)\n",
|
||||
"Installing collected packages: pocketflow\n",
|
||||
"Successfully installed pocketflow-0.0.1\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"pip install pocketflow"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"source": [
|
||||
"import asyncio\n",
|
||||
"import time\n",
|
||||
"\n",
|
||||
"from pocketflow import AsyncBatchNode, AsyncParallelBatchNode, AsyncFlow\n",
|
||||
"\n",
|
||||
"####################################\n",
|
||||
"# Dummy async function (1s delay)\n",
|
||||
"####################################\n",
|
||||
"async def dummy_llm_summarize(text):\n",
|
||||
" \"\"\"Simulates an async LLM call that takes 1 second.\"\"\"\n",
|
||||
" await asyncio.sleep(1)\n",
|
||||
" return f\"Summarized({len(text)} chars)\"\n",
|
||||
"\n",
|
||||
"###############################################\n",
|
||||
"# 1) AsyncBatchNode (sequential) version\n",
|
||||
"###############################################\n",
|
||||
"\n",
|
||||
"class SummariesAsyncNode(AsyncBatchNode):\n",
|
||||
" \"\"\"\n",
|
||||
" Processes items sequentially in an async manner.\n",
|
||||
" The next item won't start until the previous item has finished.\n",
|
||||
" \"\"\"\n",
|
||||
"\n",
|
||||
" async def prep_async(self, shared):\n",
|
||||
" # Return a list of items to process.\n",
|
||||
" # Each item is (filename, content).\n",
|
||||
" return list(shared[\"data\"].items())\n",
|
||||
"\n",
|
||||
" async def exec_async(self, item):\n",
|
||||
" filename, content = item\n",
|
||||
" print(f\"[Sequential] Summarizing {filename}...\")\n",
|
||||
" summary = await dummy_llm_summarize(content)\n",
|
||||
" return (filename, summary)\n",
|
||||
"\n",
|
||||
" async def post_async(self, shared, prep_res, exec_res_list):\n",
|
||||
" # exec_res_list is a list of (filename, summary)\n",
|
||||
" shared[\"sequential_summaries\"] = dict(exec_res_list)\n",
|
||||
" return \"done_sequential\"\n",
|
||||
"\n",
|
||||
"###############################################\n",
|
||||
"# 2) AsyncParallelBatchNode (concurrent) version\n",
|
||||
"###############################################\n",
|
||||
"\n",
|
||||
"class SummariesAsyncParallelNode(AsyncParallelBatchNode):\n",
|
||||
" \"\"\"\n",
|
||||
" Processes items in parallel. Many LLM calls start at once.\n",
|
||||
" \"\"\"\n",
|
||||
"\n",
|
||||
" async def prep_async(self, shared):\n",
|
||||
" return list(shared[\"data\"].items())\n",
|
||||
"\n",
|
||||
" async def exec_async(self, item):\n",
|
||||
" filename, content = item\n",
|
||||
" print(f\"[Parallel] Summarizing {filename}...\")\n",
|
||||
" summary = await dummy_llm_summarize(content)\n",
|
||||
" return (filename, summary)\n",
|
||||
"\n",
|
||||
" async def post_async(self, shared, prep_res, exec_res_list):\n",
|
||||
" shared[\"parallel_summaries\"] = dict(exec_res_list)\n",
|
||||
" return \"done_parallel\"\n",
|
||||
"\n",
|
||||
"###############################################\n",
|
||||
"# Demo comparing the two approaches\n",
|
||||
"###############################################\n",
|
||||
"\n",
|
||||
"async def main():\n",
|
||||
" # We'll use the same data for both flows\n",
|
||||
" shared_data = {\n",
|
||||
" \"data\": {\n",
|
||||
" \"file1.txt\": \"Hello world 1\",\n",
|
||||
" \"file2.txt\": \"Hello world 2\",\n",
|
||||
" \"file3.txt\": \"Hello world 3\",\n",
|
||||
" }\n",
|
||||
" }\n",
|
||||
"\n",
|
||||
" # 1) Run the sequential version\n",
|
||||
" seq_node = SummariesAsyncNode()\n",
|
||||
" seq_flow = AsyncFlow(start=seq_node)\n",
|
||||
"\n",
|
||||
" print(\"\\n=== Running Sequential (AsyncBatchNode) ===\")\n",
|
||||
" t0 = time.time()\n",
|
||||
" await seq_flow.run_async(shared_data)\n",
|
||||
" t1 = time.time()\n",
|
||||
"\n",
|
||||
" # 2) Run the parallel version\n",
|
||||
" par_node = SummariesAsyncParallelNode()\n",
|
||||
" par_flow = AsyncFlow(start=par_node)\n",
|
||||
"\n",
|
||||
" print(\"\\n=== Running Parallel (AsyncParallelBatchNode) ===\")\n",
|
||||
" t2 = time.time()\n",
|
||||
" await par_flow.run_async(shared_data)\n",
|
||||
" t3 = time.time()\n",
|
||||
"\n",
|
||||
" # Show times\n",
|
||||
" print(\"\\n--- Results ---\")\n",
|
||||
" print(f\"Sequential Summaries: {shared_data.get('sequential_summaries')}\")\n",
|
||||
" print(f\"Parallel Summaries: {shared_data.get('parallel_summaries')}\")\n",
|
||||
"\n",
|
||||
" print(f\"Sequential took: {t1 - t0:.2f} seconds\")\n",
|
||||
" print(f\"Parallel took: {t3 - t2:.2f} seconds\")\n"
|
||||
],
|
||||
"metadata": {
|
||||
"id": "mHZpGv8txy4L"
|
||||
},
|
||||
"execution_count": 3,
|
||||
"outputs": []
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"source": [
|
||||
"# if in a py project\n",
|
||||
"# asyncio.run(main())\n",
|
||||
"await main()"
|
||||
],
|
||||
"metadata": {
|
||||
"colab": {
|
||||
"base_uri": "https://localhost:8080/"
|
||||
},
|
||||
"id": "zfnhW3f-0W6o",
|
||||
"outputId": "3737e2e5-5cae-4c6b-a894-e880cf338d1f"
|
||||
},
|
||||
"execution_count": 5,
|
||||
"outputs": [
|
||||
{
|
||||
"output_type": "stream",
|
||||
"name": "stdout",
|
||||
"text": [
|
||||
"\n",
|
||||
"=== Running Sequential (AsyncBatchNode) ===\n",
|
||||
"[Sequential] Summarizing file1.txt...\n",
|
||||
"[Sequential] Summarizing file2.txt...\n",
|
||||
"[Sequential] Summarizing file3.txt...\n",
|
||||
"\n",
|
||||
"=== Running Parallel (AsyncParallelBatchNode) ===\n",
|
||||
"[Parallel] Summarizing file1.txt...\n",
|
||||
"[Parallel] Summarizing file2.txt...\n",
|
||||
"[Parallel] Summarizing file3.txt...\n",
|
||||
"\n",
|
||||
"--- Results ---\n",
|
||||
"Sequential Summaries: {'file1.txt': 'Summarized(13 chars)', 'file2.txt': 'Summarized(13 chars)', 'file3.txt': 'Summarized(13 chars)'}\n",
|
||||
"Parallel Summaries: {'file1.txt': 'Summarized(13 chars)', 'file2.txt': 'Summarized(13 chars)', 'file3.txt': 'Summarized(13 chars)'}\n",
|
||||
"Sequential took: 3.00 seconds\n",
|
||||
"Parallel took: 1.00 seconds\n"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"source": [],
|
||||
"metadata": {
|
||||
"id": "ystwa74D0Z_k"
|
||||
},
|
||||
"execution_count": null,
|
||||
"outputs": []
|
||||
}
|
||||
]
|
||||
}
|
||||
Loading…
Reference in New Issue