{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "! pip install pocketflow\n", "! pip install faiss-cpu\n", "! pip install openai" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "

\n", "Cookbook: Pocket Flow + Cursor AI\n", "

" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "

\n", "1. Utility Function\n", "

\n", "\n", "

\n", " Utility Functions are the helper functions like calling an LLM, generating embeddings, or using external APIs. Pocket Flow is deliberately kept minimal and does NOT provide any of these. \n", "

\n", "\n", "

\n", "But don’t worry: you can simply ask Cursor AI to create them for you. \n", "

\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "> Help me implement (1) `call_llm` function that takes a prompt and returns the response from the OpenAI gpt-4o model. (2) `get_embedding` function that takes a text and returns the embedding from the OpenAI text-embedding-ada-002 model. " ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "from openai import OpenAI\n", "import os\n", "\n", "def call_llm(prompt):\n", " client = OpenAI(api_key=API_KEY)\n", " response = client.chat.completions.create(\n", " model=\"gpt-4o\",\n", " messages=[{\"role\": \"user\", \"content\": prompt}]\n", " )\n", " return response.choices[0].message.content\n", "\n", "def get_embedding(text):\n", " client = OpenAI(api_key=API_KEY)\n", " response = client.embeddings.create(\n", " model=\"text-embedding-ada-002\",\n", " input=text\n", " )\n", " return response.data[0].embedding\n", "\n", "# Example usage:\n", "response = call_llm(\"What's the meaning of life?\")\n", "print(response)\n", "embedding = get_embedding(\"What's the meaning of life?\")\n", "print(embedding)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "

\n", "2. Node\n", "

\n", "\n", " \n", "

\n", " A Node is your smallest unit of work with 3 steps \n", " prep->exec->post:\n", "

\n", "\n", "\n", "
    \n", "\n", "
  1. \n", "

    \n", " prep(shared)\n", "

    \n", "

    \n", " - Reads and preprocess data from the shared store.\n", "

    \n", "

    \n", " - E.g., load a file, query a database, or turn data into a string.\n", "

    \n", "
  2. \n", "\n", "
  3. \n", "

    \n", " exec(prep_res)\n", "

    \n", "

    \n", " - Executes the core logic\n", "

    \n", "

    \n", " - E.g., call an LLM, invoke remote APIs, or embed texts.\n", "

    \n", "
  4. \n", "\n", "
  5. \n", "

    \n", " post(shared, prep_res, exec_res)\n", "

    \n", "

    \n", " - Writes data back to the shared store.\n", "

    \n", "
  6. \n", "\n", "
\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "> Help me implement a single summarization node that reads data from the shared store, calls an LLM to summarize the text into 50 words, and writes the summary back to the shared store. Then, test it with a shared store that have pre-loaded data from `./data/PaulGrahamEssaysLarge/before.txt`." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Summary: This essay highlights the counterintuitive nature of startups, emphasizing that instincts often lead to mistakes. Key advice includes trusting instincts about people, not needing deep startup knowledge, and focusing on creating products users want. Startups are all-consuming, best pursued after college, and require openness to learning and serendipity.\n" ] } ], "source": [ "from pocketflow import Node\n", "\n", "class SummarizeNode(Node):\n", " def prep(self, shared):\n", " # Read data from shared store\n", " return shared[\"data\"][\"before.txt\"]\n", " \n", " def exec(self, text):\n", " # Call LLM to summarize\n", " prompt = f\"Summarize this text in 50 words:\\n\\n{text}\"\n", " return call_llm(prompt)\n", " \n", " def post(self, shared, prep_res, exec_res):\n", " # Store the summary back\n", " shared[\"summary\"] = exec_res\n", " # No specific next action needed\n", " return \"default\"\n", "\n", "# Create test data\n", "shared = {\n", " \"data\": {},\n", " \"summary\": None\n", "}\n", "\n", "# Load the file\n", "with open(\"./data/PaulGrahamEssaysLarge/before.txt\", \"r\") as f:\n", " shared[\"data\"][\"before.txt\"] = f.read()\n", "\n", "# Create and run the node\n", "summarize_node = SummarizeNode()\n", "summarize_node.run(shared)\n", "\n", "# Print the result\n", "print(\"Summary:\", shared[\"summary\"])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " \n", "

\n", " 3. Batch\n", "

\n", "\n", " \n", "

\n", " Batch helps repeat the same work multiple items. \n", " Instead of calling exec() once, a Batch Node calls \n", " exec() \n", " for each item in a list from prep(). \n", "

\n", "

\n", " Think of it as \"item-by-item\" processing:\n", "

\n", "\n", " \n", " \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "> Help me implement a batch summarization node that reads the list of data from the shared store, calls an LLM to summarize the text into 50 words, and writes the summary back to the shared store. Then, test it with a shared store that have pre-loaded all text files from `./data/PaulGrahamEssaysLarge/`." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Summaries:\n", "\n", "aord.txt:\n", "The text discusses the critical concern of whether startups are \"default alive\" or \"default dead,\" meaning whether they can reach profitability with existing resources. Many founders are unaware of this status. Addressing this concern early is vital since assumptions about easy fundraising can be misleading. Over-hiring is a common pitfall, emphasizing growth over prudent scaling.\n", "\n", "apple.txt:\n", "Apple's App Store approval process is harming its reputation with developers, damaging their goodwill and causing app delays. The approval system, akin to outdated software publishing, obstructs modern iterative app development. This misalignment with programmers' needs risks alienating talented potential employees and developers essential for Apple's platform success.\n", "\n", "avg.txt:\n", "In 1995, Paul Graham and Robert Morris founded Viaweb, a startup enabling users to create online stores. Using Lisp for its innovative capabilities, they gained a competitive edge due to Lisp's rapid development potential. Viaweb's success highlighted Lisp’s power, challenging conventional language choices and showcasing unconventional advantages in business.\n", "\n", "before.txt:\n", "The text advises potential startup founders to understand the counterintuitive nature of startups, emphasizing trust in instincts about people, focusing on solving user problems, and avoiding the illusion of gaming the system. It suggests gaining broad knowledge, exploring diverse interests, and delaying startup efforts until post-college to maximize potential and personal growth.\n", "\n", "addiction.txt:\n", "The text discusses the accelerating process of technological progress, leading to more addictive forms of various substances and experiences. It warns that this trend will continue, making it harder to distinguish between beneficial and harmful advancements. Society must adapt by developing new customs to manage increasing addiction, while individuals need to find personal strategies to avoid negative impacts.\n" ] } ], "source": [ "from pocketflow import BatchNode\n", "import os\n", "\n", "class BatchSummarizeNode(BatchNode):\n", " def prep(self, shared):\n", " # Return list of (filename, content) tuples from shared store\n", " return [(fn, content) for fn, content in shared[\"data\"].items()]\n", " \n", " def exec(self, item):\n", " # Unpack the filename and content\n", " filename, text = item\n", " # Call LLM to summarize\n", " prompt = f\"Summarize this text in 50 words:\\n\\n{text}\"\n", " summary = call_llm(prompt)\n", " return filename, summary\n", " \n", " def post(self, shared, prep_res, exec_res_list):\n", " # Store all summaries in a dict by filename\n", " shared[\"summaries\"] = {\n", " filename: summary \n", " for filename, summary in exec_res_list\n", " }\n", " return \"default\"\n", "\n", "# Create test data structure\n", "shared = {\n", " \"data\": {},\n", " \"summaries\": {}\n", "}\n", "\n", "# Load all files from the directory\n", "path = \"./data/PaulGrahamEssaysLarge\"\n", "for filename in os.listdir(path):\n", " with open(os.path.join(path, filename), \"r\") as f:\n", " shared[\"data\"][filename] = f.read()\n", "\n", "# Create and run the batch node\n", "batch_summarize = BatchSummarizeNode()\n", "batch_summarize.run(shared)\n", "\n", "# Print results\n", "print(\"Summaries:\")\n", "for filename, summary in shared[\"summaries\"].items():\n", " print(f\"\\n{filename}:\")\n", " print(summary)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " \n", "

\n", " 4. Flow\n", "

\n", "\n", " \n", "

\n", " Flow connects your Nodes to a graph.\n", "

\n", "\n", " \n", " \n", "\n", " \n", "

\n", " That’s it! You can nest Flows, branch your actions, or keep it simple with a straight chain of Nodes.\n", "

" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "> Help me implement a RAG chatbot that, given a user’s input question, finds the most relevant file based on embeddings and then answers the user's question. Test it with a shared store that has preloaded all text files from `./data/PaulGrahamEssaysLarge/`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "Q: how to find startup idea\n", "A: To find a startup idea, the context advises not to make a conscious effort to think of startup ideas, as this often results in bad and plausible-sounding ideas that can waste time. Instead, it suggests turning your mind into the type that generates startup ideas unconsciously. This can be achieved by:\n", "\n", "1. Learning extensively about things that matter.\n", "2. Working on problems that genuinely interest you.\n", "3. Collaborating with people you like and respect.\n", "\n", "By engaging in these activities, you'll naturally start to encounter ideas that have the potential to become startups, often without initially realizing it. The essay emphasizes that many successful startups, like Apple, Yahoo, Google, and Facebook, began as side projects rather than direct pursuits to start a company.\n", "\n", "Source: before.txt\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "/home/zh2408/.venv/lib/python3.9/site-packages/pocketflow/__init__.py:43: UserWarning: Flow ends: 'end' not found in ['answer', 'end']\n", " if not nxt and curr.successors: warnings.warn(f\"Flow ends: '{action}' not found in {list(curr.successors)}\")\n" ] } ], "source": [ "from pocketflow import Node, Flow\n", "import faiss\n", "import numpy as np\n", "import os\n", "\n", "class PrepareEmbeddings(Node):\n", " def prep(self, shared):\n", " # Get list of (filename, content) pairs\n", " return list(shared[\"data\"].items())\n", " \n", " def exec(self, items):\n", " # Create embeddings for each document\n", " embeddings = []\n", " filenames = []\n", " for filename, content in items:\n", " embedding = get_embedding(content)\n", " embeddings.append(embedding)\n", " filenames.append(filename)\n", " \n", " # Create FAISS index\n", " dim = len(embeddings[0])\n", " index = faiss.IndexFlatL2(dim)\n", " index.add(np.array(embeddings).astype('float32'))\n", " \n", " return index, filenames\n", " \n", " def post(self, shared, prep_res, exec_res):\n", " # Store index and filenames in shared store\n", " index, filenames = exec_res\n", " shared[\"search_index\"] = index\n", " shared[\"filenames\"] = filenames\n", " return \"default\"\n", "\n", "class FindRelevantDocument(Node):\n", " def prep(self, shared):\n", " # Get user question\n", " question = input(\"Enter your question (or press Enter to quit): \")\n", " if not question:\n", " return None\n", " return question\n", " \n", " def exec(self, question):\n", " if question is None:\n", " return None\n", " \n", " # Get question embedding and search\n", " query_embedding = get_embedding(question)\n", " \n", " # Search for most similar document\n", " D, I = shared[\"search_index\"].search(\n", " np.array([query_embedding]).astype('float32'),\n", " k=1\n", " )\n", " most_relevant_idx = I[0][0]\n", " most_relevant_file = shared[\"filenames\"][most_relevant_idx]\n", " \n", " return question, most_relevant_file\n", " \n", " def post(self, shared, prep_res, exec_res):\n", " if exec_res is None:\n", " return \"end\"\n", " \n", " question, filename = exec_res\n", " shared[\"current_question\"] = question\n", " shared[\"relevant_file\"] = filename\n", " shared[\"context\"] = shared[\"data\"][filename]\n", " return \"answer\"\n", " \n", "class AnswerQuestion(Node):\n", " def prep(self, shared):\n", " return (\n", " shared[\"current_question\"],\n", " shared[\"context\"]\n", " )\n", " \n", " def exec(self, inputs):\n", " question, context = inputs\n", " prompt = f\"\"\"\n", "Context: {context}\n", "\n", "Question: {question}\n", "\n", "Answer the question based on the context above. If the context doesn't contain relevant information, say so.\n", "Answer:\"\"\"\n", " return call_llm(prompt)\n", " \n", " def post(self, shared, prep_res, exec_res):\n", " print(f\"\\nQ: {shared['current_question']}\")\n", " print(f\"A: {exec_res}\")\n", " print(f\"\\nSource: {shared['relevant_file']}\")\n", " return \"continue\" # Loop back for more questions\n", "\n", "# Create test data\n", "shared = {\"data\": {}}\n", "\n", "# Load all files\n", "path = \"./data/PaulGrahamEssaysLarge\"\n", "for filename in os.listdir(path):\n", " with open(os.path.join(path, filename), \"r\") as f:\n", " shared[\"data\"][filename] = f.read()\n", "\n", "# Create nodes and flow\n", "prep_embeddings = PrepareEmbeddings()\n", "find_relevant = FindRelevantDocument()\n", "answer = AnswerQuestion()\n", "\n", "# Connect nodes\n", "prep_embeddings >> find_relevant\n", "find_relevant - \"answer\" >> answer\n", "find_relevant - \"end\" >> None\n", "answer - \"continue\" >> find_relevant\n", "\n", "# Create and run flow\n", "rag_flow = Flow(start=prep_embeddings)\n", "rag_flow.run(shared)" ] } ], "metadata": { "kernelspec": { "display_name": "myvenv", "language": "python", "name": "myvenv" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.2" } }, "nbformat": 4, "nbformat_minor": 2 }