From 795c1320182d29812e0fdcd2c4a736c10cb57b9b Mon Sep 17 00:00:00 2001 From: zachary62 Date: Thu, 20 Mar 2025 16:28:19 -0400 Subject: [PATCH] add streaming --- cookbook/parallel_exp.ipynb | 104 ------------------ cookbook/pocketflow-llm-streaming/README.md | 46 ++++++++ cookbook/pocketflow-llm-streaming/main.py | 52 +++++++++ cookbook/pocketflow-llm-streaming/utils.py | 58 ++++++++++ .../requirements.txt | 1 - 5 files changed, 156 insertions(+), 105 deletions(-) delete mode 100644 cookbook/parallel_exp.ipynb create mode 100644 cookbook/pocketflow-llm-streaming/README.md create mode 100644 cookbook/pocketflow-llm-streaming/main.py create mode 100644 cookbook/pocketflow-llm-streaming/utils.py delete mode 100644 cookbook/pocketflow-parallel-batch/requirements.txt diff --git a/cookbook/parallel_exp.ipynb b/cookbook/parallel_exp.ipynb deleted file mode 100644 index 8d9cf3d..0000000 --- a/cookbook/parallel_exp.ipynb +++ /dev/null @@ -1,104 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" - }, - "id": "ki9N8iqRxu0I", - "outputId": "fd1628a5-d2a4-44a4-89b4-31151d21c8f3" - }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Collecting pocketflow\n", - " Downloading pocketflow-0.0.1-py3-none-any.whl.metadata (270 bytes)\n", - "Downloading pocketflow-0.0.1-py3-none-any.whl (3.3 kB)\n", - "Installing collected packages: pocketflow\n", - "Successfully installed pocketflow-0.0.1\n" - ] - } - ], - "source": [ - "pip install pocketflow" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "metadata": { - "id": "mHZpGv8txy4L" - }, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": 5, - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" - }, - "id": "zfnhW3f-0W6o", - "outputId": "3737e2e5-5cae-4c6b-a894-e880cf338d1f" - }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\n", - "=== Running Sequential (AsyncBatchNode) ===\n", - "[Sequential] Summarizing file1.txt...\n", - "[Sequential] Summarizing file2.txt...\n", - "[Sequential] Summarizing file3.txt...\n", - "\n", - "=== Running Parallel (AsyncParallelBatchNode) ===\n", - "[Parallel] Summarizing file1.txt...\n", - "[Parallel] Summarizing file2.txt...\n", - "[Parallel] Summarizing file3.txt...\n", - "\n", - "--- Results ---\n", - "Sequential Summaries: {'file1.txt': 'Summarized(13 chars)', 'file2.txt': 'Summarized(13 chars)', 'file3.txt': 'Summarized(13 chars)'}\n", - "Parallel Summaries: {'file1.txt': 'Summarized(13 chars)', 'file2.txt': 'Summarized(13 chars)', 'file3.txt': 'Summarized(13 chars)'}\n", - "Sequential took: 3.00 seconds\n", - "Parallel took: 1.00 seconds\n" - ] - } - ], - "source": [ - "# if in a notebook\n", - "await main()\n", - "\n", - "asyncio.run(main())" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "ystwa74D0Z_k" - }, - "outputs": [], - "source": [] - } - ], - "metadata": { - "colab": { - "provenance": [] - }, - "kernelspec": { - "display_name": "Python 3", - "name": "python3" - }, - "language_info": { - "name": "python" - } - }, - "nbformat": 4, - "nbformat_minor": 0 -} diff --git a/cookbook/pocketflow-llm-streaming/README.md b/cookbook/pocketflow-llm-streaming/README.md new file mode 100644 index 0000000..c38c4e4 --- /dev/null +++ b/cookbook/pocketflow-llm-streaming/README.md @@ -0,0 +1,46 @@ +# LLM Streaming and Interruption + +Demonstrates real-time LLM response streaming with user interrupt capability. + +## Features + +- Real-time display of LLM responses as they're generated +- User interrupt with ENTER key at any time + +## Run It + +```bash +pip install -r requirements.txt +python main.py +``` + +## How It Works + +StreamNode: +1. Creates interrupt listener thread +2. Fetches content chunks from LLM +3. Displays chunks in real-time +4. Handles user interruption + +## API Key + +By default, demo uses fake streaming responses. To use real OpenAI streaming: + +1. Edit main.py to replace the fake_stream_llm with stream_llm: +```python +# Change this line: +chunks = fake_stream_llm(prompt) +# To this: +chunks = stream_llm(prompt) +``` + +2. Make sure your OpenAI API key is set: +```bash +export OPENAI_API_KEY="your-api-key-here" +``` + +## Files + +- `main.py`: StreamNode implementation +- `utils.py`: Real and fake LLM streaming functions + \ No newline at end of file diff --git a/cookbook/pocketflow-llm-streaming/main.py b/cookbook/pocketflow-llm-streaming/main.py new file mode 100644 index 0000000..756962c --- /dev/null +++ b/cookbook/pocketflow-llm-streaming/main.py @@ -0,0 +1,52 @@ +import time +import threading +from pocketflow import Node, Flow +from utils import fake_stream_llm + +class StreamNode(Node): + def __init__(self, max_retries=1, wait=0): + super().__init__(max_retries=max_retries, wait=wait) + + def prep(self, shared): + # Create interrupt event + interrupt_event = threading.Event() + + # Start a thread to listen for user interrupt + def wait_for_interrupt(): + input("Press ENTER at any time to interrupt streaming...\n") + interrupt_event.set() + listener_thread = threading.Thread(target=wait_for_interrupt) + listener_thread.start() + + # Get prompt from shared store + prompt = shared["prompt"] + # Get chunks from LLM function + chunks = fake_stream_llm(prompt) + return chunks, interrupt_event, listener_thread + + def exec(self, prep_res): + chunks, interrupt_event, listener_thread = prep_res + for chunk in chunks: + if interrupt_event.is_set(): + print("User interrupted streaming.") + break + + if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content is not None: + chunk_content = chunk.choices[0].delta.content + print(chunk_content, end="", flush=True) + time.sleep(0.1) # simulate latency + return interrupt_event, listener_thread + + def post(self, shared, prep_res, exec_res): + interrupt_event, listener_thread = exec_res + # Join the interrupt listener so it doesn't linger + interrupt_event.set() + listener_thread.join() + return "default" + +# Usage: +node = StreamNode() +flow = Flow(start=node) + +shared = {"prompt": "What's the meaning of life?"} +flow.run(shared) diff --git a/cookbook/pocketflow-llm-streaming/utils.py b/cookbook/pocketflow-llm-streaming/utils.py new file mode 100644 index 0000000..956d99c --- /dev/null +++ b/cookbook/pocketflow-llm-streaming/utils.py @@ -0,0 +1,58 @@ +from openai import OpenAI +import os + +def stream_llm(prompt): + client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "your-api-key")) + + # Make a streaming chat completion request + response = client.chat.completions.create( + model="gpt-4o", + messages=[ + {"role": "user", "content": prompt} + ], + temperature=0.7, + stream=True # Enable streaming + ) + return response + +def fake_stream_llm(prompt, predefined_text="This is a fake response. Today is a sunny day. The sun is shining. The birds are singing. The flowers are blooming. The bees are buzzing. The wind is blowing. The clouds are drifting. The sky is blue. The grass is green. The trees are tall. The water is clear. The fish are swimming. The sun is shining. The birds are singing. The flowers are blooming. The bees are buzzing. The wind is blowing. The clouds are drifting. The sky is blue. The grass is green. The trees are tall. The water is clear. The fish are swimming."): + """ + Returns a list of simple objects that mimic the structure needed + for OpenAI streaming responses. + """ + # Split text into small chunks + chunk_size = 10 + chunks = [] + + # Create the chunks using a simple class outside the nested structure + class SimpleObject: + def __init__(self, **kwargs): + for key, value in kwargs.items(): + setattr(self, key, value) + + # Build the chunks + for i in range(0, len(predefined_text), chunk_size): + text_chunk = predefined_text[i:i+chunk_size] + + # Create the nested structure using simple objects + delta = SimpleObject(content=text_chunk) + choice = SimpleObject(delta=delta) + chunk = SimpleObject(choices=[choice]) + + chunks.append(chunk) + + return chunks + +if __name__ == "__main__": + print("## Testing streaming LLM") + prompt = "What's the meaning of life?" + print(f"## Prompt: {prompt}") + # response = fake_stream_llm(prompt) + response = stream_llm(prompt) + print(f"## Response: ") + for chunk in response: + if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content is not None: + chunk_content = chunk.choices[0].delta.content + # Print the incoming text without a newline (simulate real-time streaming) + print(chunk_content, end="", flush=True) + diff --git a/cookbook/pocketflow-parallel-batch/requirements.txt b/cookbook/pocketflow-parallel-batch/requirements.txt deleted file mode 100644 index fcb64c3..0000000 --- a/cookbook/pocketflow-parallel-batch/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -pocketflow>=0.0.1 \ No newline at end of file