add streaming

This commit is contained in:
zachary62 2025-03-20 16:28:19 -04:00
parent bf143672c9
commit 795c132018
5 changed files with 156 additions and 105 deletions

View File

@ -1,104 +0,0 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "ki9N8iqRxu0I",
"outputId": "fd1628a5-d2a4-44a4-89b4-31151d21c8f3"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Collecting pocketflow\n",
" Downloading pocketflow-0.0.1-py3-none-any.whl.metadata (270 bytes)\n",
"Downloading pocketflow-0.0.1-py3-none-any.whl (3.3 kB)\n",
"Installing collected packages: pocketflow\n",
"Successfully installed pocketflow-0.0.1\n"
]
}
],
"source": [
"pip install pocketflow"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"id": "mHZpGv8txy4L"
},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "zfnhW3f-0W6o",
"outputId": "3737e2e5-5cae-4c6b-a894-e880cf338d1f"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"=== Running Sequential (AsyncBatchNode) ===\n",
"[Sequential] Summarizing file1.txt...\n",
"[Sequential] Summarizing file2.txt...\n",
"[Sequential] Summarizing file3.txt...\n",
"\n",
"=== Running Parallel (AsyncParallelBatchNode) ===\n",
"[Parallel] Summarizing file1.txt...\n",
"[Parallel] Summarizing file2.txt...\n",
"[Parallel] Summarizing file3.txt...\n",
"\n",
"--- Results ---\n",
"Sequential Summaries: {'file1.txt': 'Summarized(13 chars)', 'file2.txt': 'Summarized(13 chars)', 'file3.txt': 'Summarized(13 chars)'}\n",
"Parallel Summaries: {'file1.txt': 'Summarized(13 chars)', 'file2.txt': 'Summarized(13 chars)', 'file3.txt': 'Summarized(13 chars)'}\n",
"Sequential took: 3.00 seconds\n",
"Parallel took: 1.00 seconds\n"
]
}
],
"source": [
"# if in a notebook\n",
"await main()\n",
"\n",
"asyncio.run(main())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "ystwa74D0Z_k"
},
"outputs": [],
"source": []
}
],
"metadata": {
"colab": {
"provenance": []
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}

View File

@ -0,0 +1,46 @@
# LLM Streaming and Interruption
Demonstrates real-time LLM response streaming with user interrupt capability.
## Features
- Real-time display of LLM responses as they're generated
- User interrupt with ENTER key at any time
## Run It
```bash
pip install -r requirements.txt
python main.py
```
## How It Works
StreamNode:
1. Creates interrupt listener thread
2. Fetches content chunks from LLM
3. Displays chunks in real-time
4. Handles user interruption
## API Key
By default, demo uses fake streaming responses. To use real OpenAI streaming:
1. Edit main.py to replace the fake_stream_llm with stream_llm:
```python
# Change this line:
chunks = fake_stream_llm(prompt)
# To this:
chunks = stream_llm(prompt)
```
2. Make sure your OpenAI API key is set:
```bash
export OPENAI_API_KEY="your-api-key-here"
```
## Files
- `main.py`: StreamNode implementation
- `utils.py`: Real and fake LLM streaming functions

View File

@ -0,0 +1,52 @@
import time
import threading
from pocketflow import Node, Flow
from utils import fake_stream_llm
class StreamNode(Node):
def __init__(self, max_retries=1, wait=0):
super().__init__(max_retries=max_retries, wait=wait)
def prep(self, shared):
# Create interrupt event
interrupt_event = threading.Event()
# Start a thread to listen for user interrupt
def wait_for_interrupt():
input("Press ENTER at any time to interrupt streaming...\n")
interrupt_event.set()
listener_thread = threading.Thread(target=wait_for_interrupt)
listener_thread.start()
# Get prompt from shared store
prompt = shared["prompt"]
# Get chunks from LLM function
chunks = fake_stream_llm(prompt)
return chunks, interrupt_event, listener_thread
def exec(self, prep_res):
chunks, interrupt_event, listener_thread = prep_res
for chunk in chunks:
if interrupt_event.is_set():
print("User interrupted streaming.")
break
if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content is not None:
chunk_content = chunk.choices[0].delta.content
print(chunk_content, end="", flush=True)
time.sleep(0.1) # simulate latency
return interrupt_event, listener_thread
def post(self, shared, prep_res, exec_res):
interrupt_event, listener_thread = exec_res
# Join the interrupt listener so it doesn't linger
interrupt_event.set()
listener_thread.join()
return "default"
# Usage:
node = StreamNode()
flow = Flow(start=node)
shared = {"prompt": "What's the meaning of life?"}
flow.run(shared)

View File

@ -0,0 +1,58 @@
from openai import OpenAI
import os
def stream_llm(prompt):
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "your-api-key"))
# Make a streaming chat completion request
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": prompt}
],
temperature=0.7,
stream=True # Enable streaming
)
return response
def fake_stream_llm(prompt, predefined_text="This is a fake response. Today is a sunny day. The sun is shining. The birds are singing. The flowers are blooming. The bees are buzzing. The wind is blowing. The clouds are drifting. The sky is blue. The grass is green. The trees are tall. The water is clear. The fish are swimming. The sun is shining. The birds are singing. The flowers are blooming. The bees are buzzing. The wind is blowing. The clouds are drifting. The sky is blue. The grass is green. The trees are tall. The water is clear. The fish are swimming."):
"""
Returns a list of simple objects that mimic the structure needed
for OpenAI streaming responses.
"""
# Split text into small chunks
chunk_size = 10
chunks = []
# Create the chunks using a simple class outside the nested structure
class SimpleObject:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
# Build the chunks
for i in range(0, len(predefined_text), chunk_size):
text_chunk = predefined_text[i:i+chunk_size]
# Create the nested structure using simple objects
delta = SimpleObject(content=text_chunk)
choice = SimpleObject(delta=delta)
chunk = SimpleObject(choices=[choice])
chunks.append(chunk)
return chunks
if __name__ == "__main__":
print("## Testing streaming LLM")
prompt = "What's the meaning of life?"
print(f"## Prompt: {prompt}")
# response = fake_stream_llm(prompt)
response = stream_llm(prompt)
print(f"## Response: ")
for chunk in response:
if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content is not None:
chunk_content = chunk.choices[0].delta.content
# Print the incoming text without a newline (simulate real-time streaming)
print(chunk_content, end="", flush=True)

View File

@ -1 +0,0 @@
pocketflow>=0.0.1