If your CLI commands in Windsurf stop executing or hang on macOS, the issue is usually caused by zsh conflicts inside Cascade. The quick fix is to make Windsurf use bash instead of zsh for its internal terminal.
MCP (Model Context Protocol) is a way to give AI assistants access to your application’s data and functionality. Think of it as creating “tools” that an AI can use to help users.
Simple Analogy:
Your app is like a toolbox 🧰
MCP tools are like individual tools (hammer, screwdriver, etc.)
The AI is like a smart assistant that knows which tool to use
🎯 What We’ll Build
A simple “Hello World” MCP server that:
Has a basic API endpoint
Provides data to an AI assistant
Lets the AI answer questions about your app
Time to complete: 15 minutes ⏱️
📋 Prerequisites
You need:
✅ Node.js 18+ installed
✅ Basic knowledge of JavaScript/TypeScript
✅ A text editor (VS Code recommended)
✅ Terminal/Command line access
🏗️ Step-by-Step Tutorial
Step 1: Create a New Next.js Project
# Create a new Next.js app
npx create-next-app@latest my-mcp-app
# When prompted, choose:# ✅ TypeScript: Yes# ✅ ESLint: Yes# ✅ Tailwind CSS: Yes# ✅ src/ directory: No# ✅ App Router: Yes (IMPORTANT!)# ✅ Turbopack: No# ✅ Import alias: Yes (@/*)# Navigate to the project
cd my-mcp-app
What just happened?
Created a new Next.js project with App Router
App Router is needed for MCP (uses /app directory)
# Create the API directory structure
mkdir -p app/api/hello
Create app/api/hello/route.ts:
// app/api/hello/route.ts
import { NextResponse } from 'next/server';
export async function GET() {
// This is your data that the AI can access
const data = {
message: "Hello from MCP!",
timestamp: new Date().toISOString(),
tips: [
"MCP lets AI access your app data",
"You can create multiple endpoints",
"AI can call these endpoints automatically"
]
};
return NextResponse.json(data);
}
What does this do?
Creates an API endpoint at /api/hello
Returns JSON data that includes a message and tips
This data will be available to the AI
Step 4: Test Your Endpoint
Start the development server:
npm run dev
Open your browser and visit:
http://localhost:3000/api/hello
You should see:
{
"message": "Hello from MCP!",
"timestamp": "2025-10-25T00:00:00.000Z",
"tips": [
"MCP lets AI access your app data",
"You can create multiple endpoints",
"AI can call these endpoints automatically"
]
}
✅ Success! Your first MCP endpoint is working!
Step 5: Install AI SDK
Now let’s add AI capabilities:
# Install Vercel AI SDK and OpenAI
npm install ai @ai-sdk/openai zod
What are these packages?
ai – Vercel AI SDK for building AI apps
@ai-sdk/openai – OpenAI integration
zod – Schema validation (for tool parameters)
Step 6: Create MCP Tools
Create a file to define your MCP tools:
# Create lib directory
mkdir -p lib
Create lib/mcp-tools.ts:
// lib/mcp-tools.ts
import { tool } from 'ai';
import { z } from 'zod';
// Define your MCP tools
export const mcpTools = {
// Tool 1: Get hello message
get_hello_message: tool({
description: 'Get a hello message from the server',
parameters: z.object({}), // No parameters needed
execute: async () => {
// Fetch data from your API
const response = await fetch('http://localhost:3000/api/hello');
const data = await response.json();
return data;
},
}),
// Tool 2: Get current time
get_current_time: tool({
description: 'Get the current server time',
parameters: z.object({}),
execute: async () => {
return {
time: new Date().toLocaleTimeString(),
date: new Date().toLocaleDateString(),
timezone: Intl.DateTimeFormat().resolvedOptions().timeZone,
};
},
}),
};
What does this do?
Defines 2 tools that the AI can use
get_hello_message – Fetches data from your API
get_current_time – Returns current time
Each tool has a description (tells AI what it does)
Each tool has parameters (inputs the AI can provide)
Each tool has an execute function (what it actually does)
Step 7: Create AI Chat Endpoint
Create app/api/chat/route.ts:
// app/api/chat/route.ts
import { openai } from '@ai-sdk/openai';
import { streamText } from 'ai';
import { mcpTools } from '@/lib/mcp-tools';
export async function POST(request: Request) {
// Get the user's message
const { messages } = await request.json();
// Call OpenAI with MCP tools
const result = await streamText({
model: openai('gpt-4o-mini'),
messages,
tools: mcpTools, // Give AI access to your tools
maxSteps: 5, // Allow AI to use multiple tools
});
// Stream the response back to the user
return result.toDataStreamResponse();
}
The hello message is "Hello from MCP!"
Here are some tips:
- MCP lets AI access your app data
- You can create multiple endpoints
- AI can call these endpoints automatically
🔧 Used tools: get_hello_message
Question 2:
What time is it?
AI Response:
The current time is 12:34:56 PM
Date: October 25, 2025
Timezone: America/New_York
🔧 Used tools: get_current_time
🎉 Congratulations! Your MCP server is working!
🎨 Visual Flow
/api/helloMCP ToolsOpenAI/api/chatChat UIUser/api/helloMCP ToolsOpenAI/api/chatChat UIUser"What's the hello message?"POST /api/chatSend message + toolsDecide to use get_hello_messageCall get_hello_message()GET /api/hello{message, tips}Return dataGenerate responseStream responseServer-Sent EventsShow formatted response
User: "Who are the users and how old are they?"
AI: (uses get_users tool automatically)
→ Calls /api/users
→ Gets data
→ Processes it
→ Responds naturally
AI: "There are 2 users:
- Alice is 30 years old
- Bob is 25 years old"
Key Differences Table
Aspect
API Endpoint
MCP Tool
What it is
HTTP route
Function wrapper
Who calls it
Anyone (HTTP)
Only AI
How to call
fetch(), curl
AI decides automatically
Response
Raw JSON
Processed by AI
Purpose
Serve data
Give AI capabilities
Reusable
Yes, by anyone
Only by AI
Why Do You Need BOTH?
API Endpoint:
✅ The actual data source
✅ Can be used by other parts of your app
✅ Can be tested independently
✅ Follows REST conventions
✅ Can be cached and secured
MCP Tool:
✅ Tells AI what the endpoint does
✅ Provides context (description)
✅ Defines parameters AI can use
✅ Allows AI to use it intelligently
✅ Makes your app AI-powered
Restaurant Analogy
API Endpoint = Kitchen
Has the actual food (data)
Anyone can order from it
Returns raw dishes
MCP Tool = Waiter
Knows what’s available (description)
Takes your order (parameters)
Gets food from kitchen (calls API)
Serves it nicely (formats response)
AI = Smart Waiter
Understands what you want
Knows which dishes to recommend
Can combine multiple dishes
Explains the menu in your language
The Complete Flow
Your DataAPI EndpointMCP ToolOpenAIUserYour DataAPI EndpointMCP ToolOpenAIUserAI reads tool descriptionsDecides to use get_hello_messageAI processes & formats"What's the hello message?"execute()GET /api/helloFetch dataReturn dataJSON responseStructured data"The hello message is..."
Can You Skip the API Endpoint?
Short answer: Yes, but not recommended!
// ❌ MCP Tool without API (works but not ideal)
get_data: tool({
execute: async () => {
// Directly query database
const data = await db.query('SELECT * FROM users');
return data;
},
})
// ✅ Better: MCP Tool + API (recommended)
get_data: tool({
execute: async () => {
// Call your API
const res = await fetch('http://localhost:3000/api/users');
return await res.json();
},
})
Why separate API is better:
✅ Reusable by other parts of your app
✅ Can be tested independently
✅ Can be called by non-AI clients
✅ Easier to maintain and debug
✅ Can add authentication/caching
When to Use What
Use Just API Endpoint When:
Building a regular web app
Need data for your frontend
Other services need to call it
No AI involved
Use API + MCP Tool When:
Want AI to access the data
Building AI chat features
Need intelligent data retrieval
Want natural language interface
Summary: The Relationship
API Endpoint (data source)
↓
MCP Tool (AI access layer)
↓
OpenAI (intelligence)
↓
User (natural language)
Think of it this way:
API Endpoint = What you have (data)
MCP Tool = How AI accesses it (wrapper)
Together = AI-powered application 🚀
📚 Key Takeaways
MCP = Tools for AI
You create tools (functions)
AI decides when to use them
AI can use multiple tools to answer questions
Three Main Parts:
API endpoints (your data)
MCP tools (how AI accesses data)
Chat endpoint (connects AI to tools)
Simple Pattern:User asks question → AI reads question → AI calls your MCP tools → Tools fetch data → AI generates answer → User sees response
You’ve built your first MCP server! You now understand:
✅ What MCP is and why it’s useful ✅ How to create API endpoints ✅ How to define MCP tools ✅ How to connect AI to your tools ✅ How to build a chat interface
Next Challenge: Try building a tool that:
Reads data from a database
Calls an external API
Performs calculations
Analyzes files
💡 Pro Tips
Start Simple
Begin with one tool
Add more as you learn
Test each tool individually
Good Tool Descriptions// ❌ Bad description: 'Gets data' // ✅ Good description: 'Get user profile information including name, email, and registration date'
Use Parameters// Let AI provide inputs parameters: z.object({ userId: z.number().describe('The ID of the user to fetch'), includeOrders: z.boolean().describe('Whether to include order history'), })
NAME CPU(cores) MEMORY(bytes)
app-xxxxxxxxxx-xxxxx 1500m 587Mi ← CPU at limit!
app-xxxxxxxxxx-xxxxx 1m 524Mi ← Crashed pod
app-xxxxxxxxxx-xxxxx 1m 526Mi ← Crashed pod
Problem: PHP pods crashing due to worker exhaustion and short health check timeouts.
Solution: Increased PHP-FPM workers from 5 to 20 and health check timeout from 10s to 30s.
Result: All 22 pods stable, no crashes, 4x capacity increase.
Key Learning: Always calculate worker count based on available memory, not arbitrary numbers. The formula (RAM - Base) / 40MB ensures you stay within limits while maximizing capacity.
Author’s Note: This solution was implemented on a production Kubernetes cluster running a PHP application. The fix eliminated all CrashLoopBackOff issues and improved stability from 45% to 100% pod availability.
📊 Current Production Metrics Analysis
Actual Memory Usage (Right Now):
Average Memory per Pod: 665Mi Memory Range: 544Mi - 616Mi Memory Limit: 1024Mi (1Gi) Utilization: 65% average (53-60% range)
Current Configuration:
Pods: 22 total Workers per pod: 20 CPU Limit: 1500m Average CPU Usage: 935m (62% utilization) Restarts: 0-4 (minimal, stable)
🔍 Is 20 Workers Optimal? Let’s Calculate:
Method 1: Reverse Engineering from Memory Usage
Current Memory Usage: 665Mi average Memory Limit: 1024Mi
⚠️ 30 Workers (Too Aggressive): - Memory: 83% (risky, little headroom) - CPU: Would need more (1500m not enough) - Risk: Traffic spikes could OOMKill - Not recommended
❌ 15 Workers (Too Conservative): - Memory: 54% (underutilized) - Would crash under current 263+ concurrent requests - Wasted resources
🎯 Final Recommendation: KEEP 20 WORKERS
Reasons:
Memory Utilization: 65% – Perfect balance (not too high, not too low)
CPU Utilization: 62% – Efficient, room to grow
35% Headroom – Enough buffer for traffic spikes
Stable: 0-4 restarts – Proves it’s working well
Actual per-worker memory: 18-20MB – More efficient than estimated
Could We Optimize Further?
Option A: Increase to 25 workers (Moderate)
yamlpm.max_children = 25
Memory would be: ~750Mi (73%)
Headroom: ~274Mi (27%)
Risk: Medium (less buffer for spikes)
Benefit: 25% more capacity
Verdict: Only if you see consistent high load
Option B: Keep 20 workers (Recommended)
yamlpm.max_children = 20 # Current
Memory: 665Mi (65%) ✅
Headroom: 358Mi (35%) ✅
Risk: Low ✅
Stability: Proven ✅
Verdict: OPTIMAL – Don’t change!
📈 When to Reconsider:
Monitor these metrics and adjust if:
bash# Check if workers are maxed out kubectl exec <pod> -c php-php -- curl -s http://localhost:9000/fpm-status
from google import genai
from google.genai import types
from google.genai.types import Content, Part
from playwright.sync_api import sync_playwright
import time
# Initialize the Gemini client
client = genai.Client()
# Screen dimensions
SCREEN_WIDTH = 1440
SCREEN_HEIGHT = 900
def denormalize_x(x: int, screen_width: int) -> int:
"""Convert normalized x coordinate (0-1000) to actual pixel coordinate."""
return int(x / 1000 * screen_width)
def denormalize_y(y: int, screen_height: int) -> int:
"""Convert normalized y coordinate (0-1000) to actual pixel coordinate."""
return int(y / 1000 * screen_height)
def execute_function_calls(candidate, page, screen_width, screen_height):
"""Execute the actions suggested by the model."""
results = []
function_calls = []
for part in candidate.content.parts:
if part.function_call:
function_calls.append(part.function_call)
for function_call in function_calls:
action_result = {}
fname = function_call.name
args = function_call.args
print(f" -> Executing: {fname}")
try:
if fname == "open_web_browser":
pass # Already open
elif fname == "click_at":
actual_x = denormalize_x(args["x"], screen_width)
actual_y = denormalize_y(args["y"], screen_height)
page.mouse.click(actual_x, actual_y)
elif fname == "type_text_at":
actual_x = denormalize_x(args["x"], screen_width)
actual_y = denormalize_y(args["y"], screen_height)
text = args["text"]
press_enter = args.get("press_enter", False)
page.mouse.click(actual_x, actual_y)
page.keyboard.press("Meta+A")
page.keyboard.press("Backspace")
page.keyboard.type(text)
if press_enter:
page.keyboard.press("Enter")
page.wait_for_load_state(timeout=5000)
time.sleep(1)
except Exception as e:
print(f"Error executing {fname}: {e}")
action_result = {"error": str(e)}
results.append((fname, action_result))
return results
def get_function_responses(page, results):
"""Capture screenshot and URL after actions."""
screenshot_bytes = page.screenshot(type="png")
current_url = page.url
function_responses = []
for name, result in results:
response_data = {"url": current_url}
response_data.update(result)
function_responses.append(
types.FunctionResponse(
name=name,
response=response_data,
parts=[types.FunctionResponsePart(
inline_data=types.FunctionResponseBlob(
mime_type="image/png",
data=screenshot_bytes))
]
)
)
return function_responses
# Main program
print("Initialising browser...")
playwright = sync_playwright().start()
browser = playwright.chromium.launch(headless=False)
context = browser.new_context(viewport={"width": SCREEN_WIDTH, "height": SCREEN_HEIGHT})
page = context.new_page()
try:
# Go to initial page
page.goto("https://tinyurl.com/pet-care-signup")
# Configure the model with Computer Use tool
config = types.GenerateContentConfig(
tools=[types.Tool(computer_use=types.ComputerUse(
environment=types.Environment.ENVIRONMENT_BROWSER
))],
)
# Take initial screenshot
initial_screenshot = page.screenshot(type="png")
USER_PROMPT = """
From https://tinyurl.com/pet-care-signup,
get all details for any pet with a California residency.
Output all the information you find in a clear, readable format.
"""
print(f"Goal: {USER_PROMPT}")
contents = [
Content(role="user", parts=[
Part(text=USER_PROMPT),
Part.from_bytes(data=initial_screenshot, mime_type='image/png')
])
]
# Agent loop - maximum 5 turns
for i in range(5):
print(f"\n--- Turn {i+1} ---")
print("Thinking...")
response = client.models.generate_content(
model='gemini-2.5-computer-use-preview-10-2025',
contents=contents,
config=config,
)
candidate = response.candidates[0]
contents.append(candidate.content)
# Check if there are function calls to execute
has_function_calls = any(part.function_call for part in candidate.content.parts)
if not has_function_calls:
text_response = " ".join([part.text for part in candidate.content.parts if part.text])
print("Agent finished:", text_response)
break
print("Executing actions...")
results = execute_function_calls(candidate, page, SCREEN_WIDTH, SCREEN_HEIGHT)
print("Capturing state...")
function_responses = get_function_responses(page, results)
contents.append(
Content(role="user", parts=[Part(function_response=fr) for fr in function_responses])
)
finally:
print("\nClosing browser...")
browser.close()
playwright.stop()
print("Done!")
from google import genai
from google.genai import types
from google.genai.types import Content, Part
from playwright.sync_api import sync_playwright
import time
# Initialize the Gemini client
client = genai.Client()
# Screen dimensions
SCREEN_WIDTH = 1440
SCREEN_HEIGHT = 900
def denormalize_x(x: int, screen_width: int) -> int:
"""Convert normalized x coordinate (0-1000) to actual pixel coordinate."""
return int(x / 1000 * screen_width)
def denormalize_y(y: int, screen_height: int) -> int:
"""Convert normalized y coordinate (0-1000) to actual pixel coordinate."""
return int(y / 1000 * screen_height)
def execute_function_calls(candidate, page, screen_width, screen_height):
"""Execute the actions suggested by the model."""
results = []
function_calls = []
for part in candidate.content.parts:
if part.function_call:
function_calls.append(part.function_call)
for function_call in function_calls:
action_result = {}
fname = function_call.name
args = function_call.args
print(f" -> Executing: {fname}")
try:
if fname == "open_web_browser":
pass # Already open
elif fname == "click_at":
actual_x = denormalize_x(args["x"], screen_width)
actual_y = denormalize_y(args["y"], screen_height)
page.mouse.click(actual_x, actual_y)
elif fname == "type_text_at":
actual_x = denormalize_x(args["x"], screen_width)
actual_y = denormalize_y(args["y"], screen_height)
text = args["text"]
press_enter = args.get("press_enter", False)
page.mouse.click(actual_x, actual_y)
page.keyboard.press("Meta+A")
page.keyboard.press("Backspace")
page.keyboard.type(text)
if press_enter:
page.keyboard.press("Enter")
elif fname == "drag_and_drop":
start_x = denormalize_x(args["x"], screen_width)
start_y = denormalize_y(args["y"], screen_height)
dest_x = denormalize_x(args["destination_x"], screen_width)
dest_y = denormalize_y(args["destination_y"], screen_height)
# Perform drag and drop
page.mouse.move(start_x, start_y)
page.mouse.down()
page.mouse.move(dest_x, dest_y)
page.mouse.up()
page.wait_for_load_state(timeout=5000)
time.sleep(1)
except Exception as e:
print(f"Error executing {fname}: {e}")
action_result = {"error": str(e)}
results.append((fname, action_result))
return results
def get_function_responses(page, results):
"""Capture screenshot and URL after actions."""
screenshot_bytes = page.screenshot(type="png")
current_url = page.url
function_responses = []
for name, result in results:
response_data = {"url": current_url}
response_data.update(result)
function_responses.append(
types.FunctionResponse(
name=name,
response=response_data,
parts=[types.FunctionResponsePart(
inline_data=types.FunctionResponseBlob(
mime_type="image/png",
data=screenshot_bytes))
]
)
)
return function_responses
# Main program
print("Initialising browser...")
playwright = sync_playwright().start()
browser = playwright.chromium.launch(headless=False)
context = browser.new_context(viewport={"width": SCREEN_WIDTH, "height": SCREEN_HEIGHT})
page = context.new_page()
try:
# Go to initial page
page.goto("https://sticky-note-jam.web.app")
# Configure the model with Computer Use tool
config = types.GenerateContentConfig(
tools=[types.Tool(computer_use=types.ComputerUse(
environment=types.Environment.ENVIRONMENT_BROWSER
))],
)
# Take initial screenshot
initial_screenshot = page.screenshot(type="png")
USER_PROMPT = """
My art club brainstormed tasks ahead of our fair.
The board is chaotic and I need your help organising the tasks into some categories I created.
Go to sticky-note-jam.web.app and
ensure notes are clearly in the right sections.
Drag them there if not.
In your output, describe what the initial stage looked like and
what the final stage looks like after organisation.
"""
print(f"Goal: {USER_PROMPT}")
contents = [
Content(role="user", parts=[
Part(text=USER_PROMPT),
Part.from_bytes(data=initial_screenshot, mime_type='image/png')
])
]
# Agent loop - maximum 10 turns (more turns for drag operations)
for i in range(10):
print(f"\n--- Turn {i+1} ---")
print("Thinking...")
response = client.models.generate_content(
model='gemini-2.5-computer-use-preview-10-2025',
contents=contents,
config=config,
)
candidate = response.candidates[0]
contents.append(candidate.content)
# Check if there are function calls to execute
has_function_calls = any(part.function_call for part in candidate.content.parts)
if not has_function_calls:
text_response = " ".join([part.text for part in candidate.content.parts if part.text])
print("Agent finished:", text_response)
break
print("Executing actions...")
results = execute_function_calls(candidate, page, SCREEN_WIDTH, SCREEN_HEIGHT)
print("Capturing state...")
function_responses = get_function_responses(page, results)
contents.append(
Content(role="user", parts=[Part(function_response=fr) for fr in function_responses])
)
finally:
print("\nClosing browser...")
browser.close()
playwright.stop()
print("Done!")
# Install Claude Code
npm install -g @anthropic-ai/claude-code
# Install Claude Agents SDK
pip install claude-agent-sdk
# Set API Key
export ANTHROPIC_API_KEY=your_api_key_here
Basic
import asyncio
from claude_agent_sdk import query
async def main():
async for message in query(prompt="Hello, how are you?"):
print(message)
asyncio.run(main())
Inbuild Tools
import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions
from rich import print
async def main():
options = ClaudeAgentOptions(
allowed_tools=["Read", "Write"],
permission_mode="acceptEdits"
)
async for msg in query(
prompt="Create a file called greeting.txt with 'Hello Mervin Praison!'",
options=options
):
print(msg)
asyncio.run(main())
Custom Tools
import asyncio
from typing import Any
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions, tool, create_sdk_mcp_server
from rich import print
@tool("greet", "Greet a user", {"name": str})
async def greet(args: dict[str, Any]) -> dict[str, Any]:
return {
"content": [{
"type": "text",
"text": f"Hello, {args['name']}!"
}]
}
server = create_sdk_mcp_server(
name="my-tools",
version="1.0.0",
tools=[greet]
)
async def main():
options = ClaudeAgentOptions(
mcp_servers={"tools": server},
allowed_tools=["mcp__tools__greet"]
)
async with ClaudeSDKClient(options=options) as client:
await client.query("Greet Mervin Praison")
async for msg in client.receive_response():
print(msg)
asyncio.run(main())
Claude Agent Options
import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions
from rich import print
async def main():
options = ClaudeAgentOptions(
system_prompt="You are an expert Python developer",
permission_mode='acceptEdits',
cwd="/Users/praison/cc"
)
async for message in query(
prompt="Create a Python web server in my current directory",
options=options
):
print(message)
asyncio.run(main())
from google import genai
from PIL import Image
from io import BytesIO
client = genai.Client()
prompt = "Add a Cap to the person's head"
image = Image.open('mervinpraison.jpeg')
response = client.models.generate_content(
model="gemini-2.5-flash-image-preview",
contents=[prompt, image],
)
for part in response.candidates[0].content.parts:
if part.text is not None:
print(part.text)
elif part.inline_data is not None:
image = Image.open(BytesIO(part.inline_data.data))
image.save("generated_image.png")
print("Generated image saved as 'generated_image.png'")
UI.py
import gradio as gr
from google import genai
from PIL import Image
from io import BytesIO
client = genai.Client()
def edit_image(image, prompt):
response = client.models.generate_content(
model="gemini-2.5-flash-image-preview",
contents=[prompt, image],
)
if response.candidates[0].finish_reason.name == 'PROHIBITED_CONTENT':
return None, "Content blocked by safety filters"
elif response.candidates[0].content is None:
return None, f"No content generated: {response.candidates[0].finish_reason.name}"
for part in response.candidates[0].content.parts:
if part.inline_data is not None:
return Image.open(BytesIO(part.inline_data.data)), "Image generated successfully"
return None, "No image found in response"
iface = gr.Interface(
fn=edit_image,
inputs=[
gr.Image(type="pil", label="Upload Image"),
gr.Textbox(label="Edit Prompt", value="Add a Cap to the person's head")
],
outputs=[
gr.Image(label="Edited Image"),
gr.Textbox(label="Status")
],
title="Image Editor"
)
iface.launch()
from poml import poml
import requests, json
from rich import print
# 1) Load and render POML file
messages = poml("financial_analysis.poml", chat=True)
# 2) Combine messages into a single prompt
full_prompt = "\n".join(
["\n".join(str(c).strip() for c in m["content"]) if isinstance(m.get("content"), list) else str(m["content"]).strip()
for m in messages if m.get("content")]
)
print("\n--- Full Prompt ---\n")
print(full_prompt)
# 3) Call Ollama Model
resp = requests.post(
"http://localhost:11434/api/generate",
json={"model": "qwen2.5vl:latest", "prompt": full_prompt, "stream": False},
)
data = resp.json()
print("\n--- Model Response ---\n")
print(data.get("response") or json.dumps(data, indent=2))
import langextract as lx
import textwrap
# 1. Define the prompt and extraction rules
prompt = textwrap.dedent("""\
Extract characters, emotions, and relationships in order of appearance.
Use exact text for extractions. Do not paraphrase or overlap entities.
Provide meaningful attributes for each entity to add context.""")
# 2. Provide a high-quality example to guide the model
examples = [
lx.data.ExampleData(
text="ROMEO. But soft! What light through yonder window breaks? It is the east, and Juliet is the sun.",
extractions=[
lx.data.Extraction(
extraction_class="character",
extraction_text="ROMEO",
attributes={"emotional_state": "wonder"}
),
lx.data.Extraction(
extraction_class="emotion",
extraction_text="But soft!",
attributes={"feeling": "gentle awe"}
),
lx.data.Extraction(
extraction_class="relationship",
extraction_text="Juliet is the sun",
attributes={"type": "metaphor"}
),
]
)
]
# The input text to be processed
input_text = "Lady Juliet gazed longingly at the stars, her heart aching for Romeo"
# Run the extraction
result = lx.extract(
text_or_documents=input_text,
prompt_description=prompt,
examples=examples,
model_id="gemini-2.5-flash",
)
# Save the results to a JSONL file
lx.io.save_annotated_documents([result], output_name="extraction_results.jsonl")
# Generate the visualization from the file
html_content = lx.visualize("test_output/extraction_results.jsonl")
with open("test_output/visualization.html", "w") as f:
f.write(html_content)
Advanced
import langextract as lx
import textwrap
from collections import Counter, defaultdict
# Define comprehensive prompt and examples for complex literary text
prompt = textwrap.dedent("""\
Extract characters, emotions, and relationships from the given text.
Provide meaningful attributes for every entity to add context and depth.
Important: Use exact text from the input for extraction_text. Do not paraphrase.
Extract entities in order of appearance with no overlapping text spans.
Note: In play scripts, speaker names appear in ALL-CAPS followed by a period.""")
examples = [
lx.data.ExampleData(
text=textwrap.dedent("""\
ROMEO. But soft! What light through yonder window breaks?
It is the east, and Juliet is the sun.
JULIET. O Romeo, Romeo! Wherefore art thou Romeo?"""),
extractions=[
lx.data.Extraction(
extraction_class="character",
extraction_text="ROMEO",
attributes={"emotional_state": "wonder"}
),
lx.data.Extraction(
extraction_class="emotion",
extraction_text="But soft!",
attributes={"feeling": "gentle awe", "character": "Romeo"}
),
lx.data.Extraction(
extraction_class="relationship",
extraction_text="Juliet is the sun",
attributes={"type": "metaphor", "character_1": "Romeo", "character_2": "Juliet"}
),
lx.data.Extraction(
extraction_class="character",
extraction_text="JULIET",
attributes={"emotional_state": "yearning"}
),
lx.data.Extraction(
extraction_class="emotion",
extraction_text="Wherefore art thou Romeo?",
attributes={"feeling": "longing question", "character": "Juliet"}
),
]
)
]
# Process Romeo & Juliet directly from Project Gutenberg
print("Downloading and processing Romeo and Juliet from Project Gutenberg...")
result = lx.extract(
text_or_documents="https://www.gutenberg.org/files/1513/1513-0.txt",
prompt_description=prompt,
examples=examples,
model_id="gemini-2.5-flash",
extraction_passes=3, # Multiple passes for improved recall
max_workers=20, # Parallel processing for speed
max_char_buffer=1000 # Smaller contexts for better accuracy
)
print(f"Extracted {len(result.extractions)} entities from {len(result.text):,} characters")
# Save and visualize the results
lx.io.save_annotated_documents([result], output_name="romeo_juliet_extractions.jsonl")
# Generate the interactive visualization
html_content = lx.visualize("test_output/romeo_juliet_extractions.jsonl")
with open("test_output/romeo_juliet_visualization.html", "w") as f:
f.write(html_content)
print("Interactive visualization saved to romeo_juliet_visualization.html")
# Analyze character mentions
characters = {}
for e in result.extractions:
if e.extraction_class == "character":
char_name = e.extraction_text
if char_name not in characters:
characters[char_name] = {"count": 0, "attributes": set()}
characters[char_name]["count"] += 1
if e.attributes:
for attr_key, attr_val in e.attributes.items():
characters[char_name]["attributes"].add(f"{attr_key}: {attr_val}")
# Print character summary
print(f"\nCHARACTER SUMMARY ({len(characters)} unique characters)")
print("=" * 60)
sorted_chars = sorted(characters.items(), key=lambda x: x[1]["count"], reverse=True)
for char_name, char_data in sorted_chars[:10]: # Top 10 characters
attrs_preview = list(char_data["attributes"])[:3]
attrs_str = f" ({', '.join(attrs_preview)})" if attrs_preview else ""
print(f"{char_name}: {char_data['count']} mentions{attrs_str}")
# Entity type breakdown
entity_counts = Counter(e.extraction_class for e in result.extractions)
print(f"\nENTITY TYPE BREAKDOWN")
print("=" * 60)
for entity_type, count in entity_counts.most_common():
percentage = (count / len(result.extractions)) * 100
print(f"{entity_type}: {count} ({percentage:.1f}%)")