From bba97b225ab6a2c1da199bd0b6136668544d5eac Mon Sep 17 00:00:00 2001 From: "promptless[bot]" <179508745+promptless[bot]@users.noreply.github.com> Date: Thu, 5 Feb 2026 18:53:01 +0000 Subject: [PATCH] Add WebSocket streaming tutorial for Serverless --- docs.json | 1 + tutorials/serverless/websocket-streaming.mdx | 420 +++++++++++++++++++ 2 files changed, 421 insertions(+) create mode 100644 tutorials/serverless/websocket-streaming.mdx diff --git a/docs.json b/docs.json index b15a9105..402a6f9b 100644 --- a/docs.json +++ b/docs.json @@ -283,6 +283,7 @@ "group": "Serverless", "pages": [ "tutorials/serverless/run-your-first", + "tutorials/serverless/websocket-streaming", "tutorials/serverless/comfyui", "tutorials/serverless/model-caching-text", "tutorials/serverless/generate-sdxl-turbo", diff --git a/tutorials/serverless/websocket-streaming.mdx b/tutorials/serverless/websocket-streaming.mdx new file mode 100644 index 00000000..16747984 --- /dev/null +++ b/tutorials/serverless/websocket-streaming.mdx @@ -0,0 +1,420 @@ +--- +title: "Stream data with WebSocket-style streaming" +sidebarTitle: "WebSocket streaming" +description: "Build a Serverless endpoint that streams data back to your client using WebSocket-style streaming." +--- + +In this tutorial, you'll build a Serverless endpoint that streams data back to your client using WebSocket-style streaming. This approach works well for workloads that process data incrementally—like image processing or text generation—where you want to return partial results as they become available. + +You'll create a handler that simulates chunked image processing, deploy it to Runpod Serverless, and build a Python client that receives streamed responses in real time. + +## What you'll learn + +- Create a streaming handler function that yields incremental results. +- Deploy a custom worker with streaming enabled. +- Send requests to your endpoint and receive streamed responses. +- Process and display streaming output in a Python client. + +## Requirements + +Before starting, you'll need: + +- A Runpod account with credits. +- A Runpod [API key](/get-started/api-keys). +- Python 3.9+ installed locally. +- Docker installed and configured. +- A Docker Hub account for pushing your worker image. + +## Step 1: Set up your development environment + +Create a project directory and set up a Python virtual environment: + +```bash +mkdir runpod-base64-stream +cd runpod-base64-stream +python -m venv venv +source venv/bin/activate +pip install runpod Pillow +``` + +## Step 2: Create a streaming handler function + +Create a file named `handler.py` with the following code: + +```python handler.py +import runpod +import base64 +import io +from PIL import Image +import time + +def process_image_chunk(chunk_data, chunk_number, total_chunks): + """ + Simulate processing a chunk of image data. + In a real application, you might do actual image processing here. + """ + return { + "chunk_number": chunk_number, + "chunk_size": len(chunk_data), + "processed_at": time.strftime("%H:%M:%S") + } + +def generator_handler(job): + """ + Handler that processes a base64 encoded image in chunks and streams results. + """ + job_input = job["input"] + + # Get the base64 string from input + base64_string = job_input.get("base64_image") + if not base64_string: + yield {"error": "No base64_image provided in input"} + return + + try: + # Decode base64 string + image_data = base64.b64decode(base64_string) + + # Open image to validate and get info + image = Image.open(io.BytesIO(image_data)) + + # Get image info for initial metadata + yield { + "status": "started", + "image_info": { + "format": image.format, + "size": image.size, + "mode": image.mode + } + } + + # Simulate processing image in chunks + # In a real application, you might process different parts of the image + chunk_size = len(image_data) // 4 # Process in 4 chunks + total_chunks = (len(image_data) + chunk_size - 1) // chunk_size + + for i in range(total_chunks): + start_idx = i * chunk_size + end_idx = min(start_idx + chunk_size, len(image_data)) + chunk = image_data[start_idx:end_idx] + + # Process this chunk + result = process_image_chunk(chunk, i + 1, total_chunks) + + # Add progress information + result["progress"] = f"{i + 1}/{total_chunks}" + result["percent_complete"] = ((i + 1) / total_chunks) * 100 + + # Stream the result for this chunk + yield result + + # Simulate processing time + time.sleep(1) + + # Send final completion message + yield { + "status": "completed", + "total_chunks_processed": total_chunks, + "final_timestamp": time.strftime("%H:%M:%S") + } + + except Exception as e: + yield {"error": str(e)} + +# Start the serverless function with streaming enabled +runpod.serverless.start({ + "handler": generator_handler, + "return_aggregate_stream": True +}) +``` + +This handler: +- Accepts a base64-encoded image in the request input. +- Yields an initial status message with image metadata. +- Processes the image in chunks, yielding progress updates for each chunk. +- Sends a final completion message when done. + +The key to streaming is using `yield` instead of `return`, and setting `return_aggregate_stream` to `True` when starting the serverless function. This makes the streamed results available via the `/stream` endpoint. + +## Step 3: Create a Dockerfile + +Create a `Dockerfile` to package your handler: + +```dockerfile Dockerfile +FROM python:3.9-slim + +WORKDIR /app + +# Install system dependencies for Pillow +RUN apt-get update && apt-get install -y \ + libjpeg-dev \ + zlib1g-dev \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY handler.py . + +CMD [ "python", "-u", "handler.py" ] +``` + +Create a `requirements.txt` file: + +```txt requirements.txt +runpod==1.3.0 +Pillow==9.5.0 +``` + +## Step 4: Build and push your Docker image + +Build and push your image to Docker Hub: + +```bash +docker build --platform linux/amd64 -t YOUR_DOCKERHUB_USERNAME/runpod-base64-stream:latest . +docker push YOUR_DOCKERHUB_USERNAME/runpod-base64-stream:latest +``` + +Replace `YOUR_DOCKERHUB_USERNAME` with your actual Docker Hub username. + +## Step 5: Create a Serverless endpoint + +Deploy your worker to a Serverless endpoint using the Runpod console: + +1. Go to the [Serverless](https://www.runpod.io/console/serverless) section of the Runpod console. +2. Click **New Endpoint**. +3. Click **Import from Docker Registry**. + + + + + +4. In the **Container Image** field, enter your Docker image URL (e.g., `docker.io/YOUR_DOCKERHUB_USERNAME/runpod-base64-stream:latest`). +5. Click **Next**. + + + + + +6. Configure your endpoint: + - Enter a name for your endpoint, or use the randomly generated name. + - Make sure **Endpoint Type** is set to **Queue**. + - Under **GPU Configuration**, select **16 GB** GPUs. + - Leave the rest of the settings at their defaults. + + + + + +7. Click **Deploy Endpoint**. + +Once deployed, note the **Endpoint ID** from the endpoint details page—you'll need it in the next step. + +## Step 6: Test the endpoint + +Create a file named `test_endpoint.py` to test your streaming endpoint: + +```python test_endpoint.py +import requests +import json +import time +import base64 +from PIL import Image +import io +import os + +API_KEY = "YOUR_RUNPOD_API_KEY" +ENDPOINT_ID = "YOUR_ENDPOINT_ID" + +# Set up the output directory for saving images +OUTPUT_DIR = os.path.join(os.getcwd(), "output_images") + +# Create the output directory if it doesn't exist +if not os.path.exists(OUTPUT_DIR): + os.makedirs(OUTPUT_DIR) + print(f"Created output directory: {OUTPUT_DIR}") + +def create_test_image(): + """ + Creates a test image and converts it to base64 format. + + Returns: + str: The image encoded as a base64 string + """ + # Create a new 100x100 pixel image with a red background + img = Image.new('RGB', (100, 100), color='red') + + # Create a bytes buffer to hold the image data + img_byte_arr = io.BytesIO() + + # Save the image to the buffer in PNG format + img.save(img_byte_arr, format='PNG') + + # Get the byte data from the buffer + img_byte_arr = img_byte_arr.getvalue() + + # Save a copy of the input image to disk + input_path = os.path.join(OUTPUT_DIR, 'test_image_input.png') + img.save(input_path) + print(f"Saved input test image as: {input_path}") + + # Convert the image bytes to base64 string and return it + return base64.b64encode(img_byte_arr).decode('utf-8') + +def save_base64_image(base64_string, filename): + """ + Converts a base64 string back to an image and saves it to disk. + + Args: + base64_string (str): The image data as a base64 string + filename (str): The name to give the saved file + """ + try: + # Create the full path where the file will be saved + output_path = os.path.join(OUTPUT_DIR, filename) + + # Convert the base64 string back to bytes + image_data = base64.b64decode(base64_string) + + # Create an image from the bytes + image = Image.open(io.BytesIO(image_data)) + + # Save the image as a PNG file + image.save(output_path, 'PNG') + print(f"Saved processed image as: {output_path}") + + return True + except Exception as e: + print(f"Error saving image: {str(e)}") + return False + +# Set up the API endpoint URLs +run_url = f"https://api.runpod.ai/v2/{ENDPOINT_ID}/run" + +# Set up the headers for the API request +headers = { + "Authorization": f"Bearer {API_KEY}", + "Content-Type": "application/json" +} + +# Print a redacted version of the authorization header for debugging +print("Using Authorization header:", headers["Authorization"][:10] + "...") + +# Create the test image and get its base64 representation +base64_image = create_test_image() + +# Create the payload for our API request +payload = { + "input": { + "base64_image": base64_image + } +} + +# Send the initial request to start the job +print("\nSending request to:", run_url) +response = requests.post(run_url, headers=headers, json=payload) + +# Print debug information about the response +print("Status Code:", response.status_code) + +# Check for authentication errors +if response.status_code == 401: + print("\nAuthentication Error: Please check your API key") + exit() + +try: + # Parse the JSON response + job_status = response.json() + job_id = job_status["id"] + print(f"\nStarted job: {job_id}") + + # Set up the streaming URL for getting results + stream_url = f"https://api.runpod.ai/v2/{ENDPOINT_ID}/stream/{job_id}" + + # Keep checking for results until the job is done + while True: + # Get the current status of the job + stream_response = requests.get(stream_url, headers=headers) + stream_data = stream_response.json() + + # Check if the job is completed + if stream_data["status"] == "COMPLETED": + print("\nJob completed!") + break + + # Check if the job is still running and has new data + elif stream_data["status"] == "IN_PROGRESS" and stream_data.get("stream"): + # Process each piece of output data + for output in stream_data["stream"]: + print(f"Received: {json.dumps(output, indent=2)}") + + # If we received a processed image, save it + if "processed_image" in output: + filename = f"output_image_{output.get('chunk_number', 'final')}.png" + save_base64_image(output["processed_image"], filename) + + # Check if the job failed + elif stream_data["status"] == "FAILED": + print("\nJob failed!") + print(stream_data.get("error", "No error message provided")) + break + + # Wait a bit before checking again + time.sleep(0.5) + +except json.JSONDecodeError as e: + print("\nError decoding JSON response:", str(e)) +except KeyError as e: + print("\nError accessing response data:", str(e)) + print("Full response:", job_status) +except Exception as e: + print("\nUnexpected error:", str(e)) +``` + + +Replace `YOUR_RUNPOD_API_KEY` and `YOUR_ENDPOINT_ID` with your actual API key and endpoint ID before running the script. + + +Run the test script: + +```bash +python test_endpoint.py +``` + +You should see output similar to this: + +```text +Started job: 123e4567-e89b-12d3-a456-426614174000 +Received: { + "status": "started", + "image_info": { + "format": "PNG", + "size": [100, 100], + "mode": "RGB" + } +} +Received: { + "chunk_number": 1, + "chunk_size": 2500, + "processed_at": "14:30:45", + "progress": "1/4", + "percent_complete": 25.0 +} +... +Received: { + "status": "completed", + "total_chunks_processed": 4, + "final_timestamp": "14:30:48" +} +Job completed! +``` + +The test script sends a base64-encoded image to your endpoint, then polls the `/stream` endpoint to receive incremental results as they become available. + +## Next steps + +Now that you've built a streaming endpoint, explore these related topics: + +- Learn more about [streaming handlers](/serverless/workers/handler-functions#streaming-handlers) and handler types. +- Explore the [/stream operation](/serverless/endpoints/send-requests#stream) for receiving streamed results. +- Try building a [concurrent handler](/serverless/workers/concurrent-handler) to process multiple requests simultaneously.