Agentic RAG: Building Self-Correcting Retrieval Pipelines with Live Web Data
Standard RAG has a fundamental limitation: it is only as good as its last index update.
You build a vector store from your documentation, knowledge base, or scraped web content. You index it. The RAG pipeline retrieves relevant chunks and feeds them to your LLM. This works well — until the world changes. Prices shift, APIs update their docs, companies merge, research findings get revised. Your vector index sits there, faithfully returning stale answers.
Agentic RAG, and specifically Corrective RAG (CRAG), solves this by treating retrieval quality as a decision, not a given. Instead of blindly returning whatever the vector search finds, a CRAG pipeline evaluates the relevance and freshness of retrieved content. If the evaluation fails — relevance is too low, confidence is too weak, or the retrieved content is demonstrably outdated — the agent falls back to live web scraping to fetch current information.
This tutorial builds a complete CRAG pipeline using LangGraph for orchestration and KnowledgeSDK for live web data retrieval.
The Architecture
CRAG adds a "relevance grader" between retrieval and generation:
User Query
│
▼
[Vector Store Retrieval]
│
▼
[Relevance Grader] ──── score < threshold ────► [Live Web Search + Scrape]
│ │
└── score >= threshold ◄──────────────────────────────┘
│
▼
[Knowledge Refinement] (filter or expand retrieved docs)
│
▼
[Generation] → Answer with citations
The key decisions the system makes:
- Are the retrieved documents relevant? Score each retrieved chunk on a 0–1 scale against the query.
- Are the documents fresh? Check metadata — when was this content last scraped?
- If no/stale → fetch live. Query a search engine or scrape specific URLs for current information.
- Re-rank and generate. Combine static and live content, then generate a grounded answer.
This is more sophisticated than naive RAG but far more reliable for production applications where information currency matters.
Setup
pip install langgraph langchain-openai knowledgesdk langchain-community chromadb
export KNOWLEDGESDK_API_KEY="knowledgesdk_live_your_key_here"
export OPENAI_API_KEY="sk-your-openai-key"
Part 1: The Static Vector Store
We start with a simple Chroma vector store as our base index:
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from datetime import datetime, timedelta
import random
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# Simulated knowledge base with timestamps
# In production, these come from your scraping/indexing pipeline
sample_docs = [
Document(
page_content="""KnowledgeSDK pricing as of Q4 2025:
Free tier: 1,000 requests/month
Starter: $29/month for 50,000 requests
Pro: $99/month for 250,000 requests""",
metadata={
"source": "https://knowledgesdk.com/pricing",
"scraped_at": (datetime.now() - timedelta(days=90)).isoformat(), # 90 days old
"title": "KnowledgeSDK Pricing"
}
),
Document(
page_content="""LangGraph is an orchestration framework for building stateful,
multi-actor AI agent applications. As of 2026, LangGraph has 34.5 million
monthly downloads and is the dominant agent framework for production deployments.""",
metadata={
"source": "https://langchain.com/langgraph",
"scraped_at": (datetime.now() - timedelta(days=5)).isoformat(), # 5 days old (fresh)
"title": "LangGraph Overview"
}
),
Document(
page_content="""Claude 3.5 Sonnet was released in June 2024 and offered
significant improvements in coding, reasoning, and instruction following
compared to Claude 3 Sonnet.""",
metadata={
"source": "https://anthropic.com/news",
"scraped_at": (datetime.now() - timedelta(days=400)).isoformat(), # 400 days old (very stale)
"title": "Claude 3.5 Sonnet Release"
}
),
]
# Add to vector store
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
docs = splitter.split_documents(sample_docs)
vectorstore = Chroma.from_documents(docs, embeddings, collection_name="knowledge_base")
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
print(f"Indexed {len(docs)} chunks")
Part 2: Define the CRAG State
from typing import TypedDict, List, Optional
from langchain_core.documents import Document
class CRAGState(TypedDict):
query: str
retrieved_docs: List[Document]
relevance_scores: List[float]
needs_web_search: bool
web_docs: List[Document]
final_docs: List[Document]
answer: str
sources: List[str]
confidence: float
Part 3: Retrieval Node
async def retrieve_from_index(state: CRAGState) -> dict:
"""Retrieve documents from the vector store."""
docs = retriever.invoke(state["query"])
print(f"Retrieved {len(docs)} documents from index")
return {"retrieved_docs": docs}
Part 4: Relevance Grader Node
This is the heart of CRAG — the document evaluator that decides whether retrieved content is good enough:
from langchain_openai import ChatOpenAI
import json
from datetime import datetime
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
STALENESS_THRESHOLD_DAYS = 30 # Documents older than 30 days considered potentially stale
async def grade_documents(state: CRAGState) -> dict:
"""
Grade each retrieved document for:
1. Relevance to the query
2. Freshness (age of the content)
"""
query = state["query"]
docs = state["retrieved_docs"]
scores = []
needs_web_search = False
for doc in docs:
# Check freshness
scraped_at_str = doc.metadata.get("scraped_at", "")
is_stale = False
if scraped_at_str:
scraped_at = datetime.fromisoformat(scraped_at_str)
age_days = (datetime.now() - scraped_at).days
is_stale = age_days > STALENESS_THRESHOLD_DAYS
# Grade relevance
response = await llm.ainvoke([
{
"role": "system",
"content": """Grade the relevance of a document to a query.
Return JSON: {"score": 0.85, "reasoning": "...", "is_relevant": true}
Score 0-1 where 1 is perfectly relevant."""
},
{
"role": "user",
"content": f"""Query: {query}
Document title: {doc.metadata.get('title', 'Unknown')}
Document content: {doc.page_content}
Document age: {doc.metadata.get('scraped_at', 'Unknown')}
Is this document relevant to the query?"""
}
])
grading = json.loads(response.content)
score = grading["score"]
# Penalize stale documents
if is_stale:
age_days = (datetime.now() - datetime.fromisoformat(scraped_at_str)).days
staleness_penalty = min(0.3, age_days / 365 * 0.3) # Max 30% penalty
adjusted_score = score * (1 - staleness_penalty)
print(f"Document '{doc.metadata.get('title')}': relevance={score:.2f}, age={age_days}d, adjusted={adjusted_score:.2f}")
score = adjusted_score
else:
print(f"Document '{doc.metadata.get('title')}': relevance={score:.2f}, fresh")
scores.append(score)
# Decision: if best document score is below threshold, trigger web search
max_score = max(scores) if scores else 0
avg_score = sum(scores) / len(scores) if scores else 0
needs_web_search = max_score < 0.6 or avg_score < 0.4
print(f"Max relevance: {max_score:.2f}, Avg: {avg_score:.2f}, Needs web search: {needs_web_search}")
return {
"relevance_scores": scores,
"needs_web_search": needs_web_search,
}
Part 5: Live Web Search and Scrape Node
When the grader decides retrieved content is insufficient, we fall back to live web scraping:
import knowledgesdk
import asyncio
ks_client = knowledgesdk.AsyncClient(api_key="knowledgesdk_live_your_key_here")
async def fetch_from_web(state: CRAGState) -> dict:
"""
Fall back to live web scraping when index content is insufficient.
Uses KnowledgeSDK to:
1. Generate search queries
2. Scrape top results
3. Return fresh documents
"""
query = state["query"]
# Step 1: Generate targeted search queries
query_response = await llm.ainvoke([
{
"role": "system",
"content": """Generate 3 specific search queries to find current information.
Return JSON: {"queries": ["query1", "query2", "query3"]}"""
},
{"role": "user", "content": f"Research question: {query}"}
])
queries = json.loads(query_response.content)["queries"]
# Step 2: Use KnowledgeSDK semantic search if we have indexed content
# (This searches our already-scraped knowledge base)
web_docs = []
try:
search_results = await ks_client.search(query=query, limit=5)
for result in search_results:
if result.score > 0.7: # Only high-confidence matches
doc = Document(
page_content=result.content,
metadata={
"source": result.url,
"title": result.title,
"scraped_at": datetime.now().isoformat(),
"retrieval_method": "semantic_search",
"score": result.score,
}
)
web_docs.append(doc)
except Exception as e:
print(f"Semantic search failed: {e}")
# Step 3: Scrape specific high-value URLs for fresh content
# In production, these URLs come from a search API (Tavily, Brave, etc.)
# For this example, we derive them from the query context
target_urls = await get_search_urls(query)
scrape_tasks = [ks_client.scrape(url=url) for url in target_urls[:5]]
scrape_results = await asyncio.gather(*scrape_tasks, return_exceptions=True)
for url, result in zip(target_urls, scrape_results):
if isinstance(result, Exception):
print(f"Failed to scrape {url}: {result}")
continue
doc = Document(
page_content=result.markdown[:3000], # Cap to avoid token overflow
metadata={
"source": url,
"title": result.title,
"scraped_at": datetime.now().isoformat(),
"retrieval_method": "live_scrape",
}
)
web_docs.append(doc)
print(f"Fetched {len(web_docs)} web documents")
return {"web_docs": web_docs}
async def get_search_urls(query: str) -> list[str]:
"""
In production: call Tavily, Brave Search, or Google Search API.
For this demo: LLM generates plausible URLs.
"""
response = await llm.ainvoke([
{
"role": "system",
"content": """Return JSON with 5 URLs that likely contain current information about this topic.
{"urls": ["https://...", "https://...", ...]}
Use real, authoritative domains."""
},
{"role": "user", "content": f"Find current information about: {query}"}
])
return json.loads(response.content).get("urls", [])
Part 6: Knowledge Refinement Node
After retrieval (from index or web), we refine and combine the best documents:
async def refine_knowledge(state: CRAGState) -> dict:
"""
Combine and rank static + web documents.
Filter out truly irrelevant content.
"""
# Combine all available docs
static_docs = state["retrieved_docs"]
web_docs = state.get("web_docs", [])
scores = state.get("relevance_scores", [])
# Keep static docs with good scores
good_static_docs = [
doc for doc, score in zip(static_docs, scores)
if score >= 0.5
]
# Combine with web docs (web docs are always considered fresh)
all_docs = good_static_docs + web_docs
# Deduplicate by source
seen_sources = set()
unique_docs = []
for doc in all_docs:
source = doc.metadata.get("source", "")
if source not in seen_sources:
seen_sources.add(source)
unique_docs.append(doc)
# Rank final documents by relevance (re-grade combined set)
final_docs = unique_docs[:8] # Cap at 8 to avoid context overflow
return {"final_docs": final_docs}
Part 7: Generation Node
async def generate_answer(state: CRAGState) -> dict:
"""Generate a grounded answer from the refined document set."""
docs = state["final_docs"]
query = state["query"]
if not docs:
return {
"answer": "I was unable to find sufficient information to answer this question reliably.",
"sources": [],
"confidence": 0.0,
}
# Format context for the LLM
context = "\n\n---\n\n".join([
f"**Source:** {doc.metadata.get('source', 'Unknown')}\n"
f"**Title:** {doc.metadata.get('title', 'Unknown')}\n"
f"**Retrieved via:** {doc.metadata.get('retrieval_method', 'index')}\n\n"
f"{doc.page_content}"
for doc in docs
])
response = await llm.ainvoke([
{
"role": "system",
"content": """You are a helpful research assistant.
Answer the question using ONLY the provided sources.
Always cite your sources with [Source: URL] notation.
If sources conflict, note the discrepancy and favor more recent information.
Return JSON: {"answer": "...", "confidence": 0.9}"""
},
{
"role": "user",
"content": f"""Question: {query}
Sources:
{context}
Provide a comprehensive, cited answer."""
}
])
result = json.loads(response.content)
sources = [doc.metadata.get("source", "") for doc in docs]
return {
"answer": result["answer"],
"sources": sources,
"confidence": result.get("confidence", 0.8),
}
Part 8: Assemble the CRAG Graph
from langgraph.graph import StateGraph, START, END
def route_based_on_relevance(state: CRAGState) -> str:
"""Route to web search if index retrieval was insufficient."""
if state.get("needs_web_search", False):
return "fetch_web"
return "refine"
# Build the graph
builder = StateGraph(CRAGState)
builder.add_node("retrieve", retrieve_from_index)
builder.add_node("grade", grade_documents)
builder.add_node("fetch_web", fetch_from_web)
builder.add_node("refine", refine_knowledge)
builder.add_node("generate", generate_answer)
builder.add_edge(START, "retrieve")
builder.add_edge("retrieve", "grade")
builder.add_conditional_edges(
"grade",
route_based_on_relevance,
{
"fetch_web": "fetch_web",
"refine": "refine",
}
)
builder.add_edge("fetch_web", "refine")
builder.add_edge("refine", "generate")
builder.add_edge("generate", END)
graph = builder.compile()
print("CRAG pipeline compiled successfully")
Part 9: Run the Pipeline
import asyncio
async def ask(question: str) -> dict:
"""Run a question through the CRAG pipeline."""
print(f"\n{'='*60}")
print(f"Question: {question}")
print('='*60)
initial_state: CRAGState = {
"query": question,
"retrieved_docs": [],
"relevance_scores": [],
"needs_web_search": False,
"web_docs": [],
"final_docs": [],
"answer": "",
"sources": [],
"confidence": 0.0,
}
final_state = await graph.ainvoke(initial_state)
print(f"\nAnswer (confidence: {final_state['confidence']:.0%}):")
print(final_state["answer"])
print(f"\nSources used: {len(final_state['sources'])}")
for source in final_state["sources"]:
print(f" - {source}")
return final_state
# Test cases
asyncio.run(ask("What is LangGraph and how many downloads does it have?"))
# → Retrieved from fresh index (5 days old), no web search needed
asyncio.run(ask("What is the current pricing for KnowledgeSDK?"))
# → Index doc is 90 days old, triggers web scrape for current pricing
asyncio.run(ask("What is the latest Claude model from Anthropic?"))
# → Index doc is 400 days old, very stale, triggers web scrape
Node.js Implementation
import { StateGraph, Annotation, START, END } from "@langchain/langgraph";
import { ChatOpenAI } from "@langchain/openai";
import KnowledgeSDK from "@knowledgesdk/node";
import { Document } from "@langchain/core/documents";
const llm = new ChatOpenAI({ model: "gpt-4o-mini", temperature: 0 });
const ksClient = new KnowledgeSDK({ apiKey: process.env.KNOWLEDGESDK_API_KEY! });
const CRAGAnnotation = Annotation.Root({
query: Annotation<string>(),
retrievedDocs: Annotation<Document[]>(),
relevanceScores: Annotation<number[]>(),
needsWebSearch: Annotation<boolean>(),
webDocs: Annotation<Document[]>(),
finalDocs: Annotation<Document[]>(),
answer: Annotation<string>(),
sources: Annotation<string[]>(),
confidence: Annotation<number>(),
});
// Grade documents node
async function gradeDocuments(state: typeof CRAGAnnotation.State) {
const scores: number[] = [];
let needsWebSearch = false;
for (const doc of state.retrievedDocs) {
const scrapedAt = doc.metadata.scraped_at
? new Date(doc.metadata.scraped_at)
: null;
const ageDays = scrapedAt
? (Date.now() - scrapedAt.getTime()) / (1000 * 60 * 60 * 24)
: 999;
const response = await llm.invoke([
{
role: "system",
content: 'Grade relevance 0-1. Return JSON: {"score": 0.8}',
},
{
role: "user",
content: `Query: ${state.query}\nDocument: ${doc.pageContent.slice(0, 500)}`,
},
]);
let score = JSON.parse(response.content as string).score;
// Penalize stale content
if (ageDays > 30) {
score *= 1 - Math.min(0.3, (ageDays / 365) * 0.3);
}
scores.push(score);
}
const maxScore = Math.max(...scores, 0);
needsWebSearch = maxScore < 0.6;
console.log(`Max relevance: ${maxScore.toFixed(2)}, Needs web search: ${needsWebSearch}`);
return { relevanceScores: scores, needsWebSearch };
}
// Live web fetch node
async function fetchFromWeb(state: typeof CRAGAnnotation.State) {
const webDocs: Document[] = [];
// Semantic search via KnowledgeSDK
try {
const searchResults = await ksClient.search({
query: state.query,
limit: 5,
});
for (const result of searchResults) {
if (result.score > 0.7) {
webDocs.push(
new Document({
pageContent: result.content,
metadata: {
source: result.url,
title: result.title,
scraped_at: new Date().toISOString(),
retrieval_method: "semantic_search",
},
})
);
}
}
} catch (e) {
console.error("Semantic search failed:", e);
}
// Live scrape fallback
const urlsResponse = await llm.invoke([
{ role: "system", content: 'Return JSON: {"urls": ["url1", "url2"]}' },
{ role: "user", content: `Find info about: ${state.query}` },
]);
const { urls } = JSON.parse(urlsResponse.content as string);
const scrapeResults = await Promise.allSettled(
urls.slice(0, 5).map((url: string) => ksClient.scrape({ url }))
);
for (let i = 0; i < scrapeResults.length; i++) {
const result = scrapeResults[i];
if (result.status === "fulfilled") {
webDocs.push(
new Document({
pageContent: result.value.markdown.slice(0, 3000),
metadata: {
source: urls[i],
title: result.value.title,
scraped_at: new Date().toISOString(),
retrieval_method: "live_scrape",
},
})
);
}
}
console.log(`Fetched ${webDocs.length} web documents`);
return { webDocs };
}
// Generate answer node
async function generateAnswer(state: typeof CRAGAnnotation.State) {
const docs = state.finalDocs.length > 0 ? state.finalDocs : state.webDocs;
const context = docs
.map((d) => `Source: ${d.metadata.source}\n\n${d.pageContent}`)
.join("\n\n---\n\n");
const response = await llm.invoke([
{
role: "system",
content:
'Answer using only provided sources with citations. Return JSON: {"answer": "...", "confidence": 0.9}',
},
{ role: "user", content: `Question: ${state.query}\n\nSources:\n${context}` },
]);
const result = JSON.parse(response.content as string);
return {
answer: result.answer,
sources: docs.map((d) => d.metadata.source as string),
confidence: result.confidence || 0.8,
};
}
// Build graph
const builder = new StateGraph(CRAGAnnotation)
.addNode("retrieve", async (state) => ({
retrievedDocs: await vectorstore.similaritySearch(state.query, 5),
}))
.addNode("grade", gradeDocuments)
.addNode("fetch_web", fetchFromWeb)
.addNode("refine", async (state) => {
const goodStatic = state.retrievedDocs.filter(
(_, i) => (state.relevanceScores[i] || 0) >= 0.5
);
const allDocs = [...goodStatic, ...state.webDocs];
return { finalDocs: allDocs.slice(0, 8) };
})
.addNode("generate", generateAnswer)
.addEdge(START, "retrieve")
.addEdge("retrieve", "grade")
.addConditionalEdges("grade", (state) => (state.needsWebSearch ? "fetch_web" : "refine"), {
fetch_web: "fetch_web",
refine: "refine",
})
.addEdge("fetch_web", "refine")
.addEdge("refine", "generate")
.addEdge("generate", END);
const cragGraph = builder.compile();
// Usage
async function askQuestion(question: string) {
const result = await cragGraph.invoke({
query: question,
retrievedDocs: [],
relevanceScores: [],
needsWebSearch: false,
webDocs: [],
finalDocs: [],
answer: "",
sources: [],
confidence: 0,
});
console.log("Answer:", result.answer);
console.log("Confidence:", result.confidence);
console.log("Sources:", result.sources);
return result;
}
Production Enhancements
Automatic Index Refresh with Webhooks
Instead of reactively detecting stale content at query time, proactively keep your index fresh using KnowledgeSDK webhooks:
import knowledgesdk
client = knowledgesdk.Client(api_key="knowledgesdk_live_your_key_here")
# Register pages to monitor for changes
client.webhooks.create(
url="https://yourapp.com/webhooks/content-changed",
events=["page.changed"],
watch_urls=[
"https://docs.example.com/api",
"https://competitor.com/pricing",
"https://industry-source.com/reports",
]
)
# Your webhook handler re-indexes changed pages automatically
# This keeps your vector store fresh without manual polling
Confidence Thresholds by Query Type
Different query types require different confidence thresholds:
CONFIDENCE_THRESHOLDS = {
"pricing": 0.9, # Price data must be very fresh
"technical": 0.7, # Tech docs change less frequently
"factual": 0.8, # Facts need high confidence
"general": 0.5, # General knowledge is more stable
}
async def classify_query(query: str) -> str:
"""Classify query type to set appropriate threshold."""
response = await llm.ainvoke([...])
return response.content # "pricing", "technical", etc.
Why CRAG Outperforms Naive RAG
| Scenario | Naive RAG | CRAG |
|---|---|---|
| Fresh indexed content | Correct | Correct |
| Stale indexed content | Wrong (confidently) | Falls back, fetches live data |
| Topic not in index | Hallucination | Falls back, fetches live data |
| Conflicting sources | Picks one (randomly) | Notes conflict, favors recent |
| Real-time pricing/data | Outdated answer | Live scrape returns current data |
The cost of CRAG over naive RAG is an additional LLM call for the relevance grader (roughly $0.0002 per query at gpt-4o-mini prices) and occasional live scraping when the grader fires. For most production applications, that cost is trivially small compared to the cost of returning wrong answers.
Conclusion
Static RAG pipelines are a liability when information currency matters. Agentic CRAG transforms retrieval from a deterministic lookup into an adaptive, self-correcting process.
The pattern is straightforward: retrieve from your index, grade the relevance and freshness of what you find, and fall back to live web scraping when the grade is insufficient. KnowledgeSDK makes the fallback layer fast, clean, and reliable — returning LLM-ready markdown that slots directly into your context window without additional processing.
The result is a RAG pipeline that stays accurate over time, without requiring constant manual re-indexing or accepting the risk of stale answers.
Ready to build a self-correcting RAG pipeline? Start with KnowledgeSDK — the live web data layer your agentic RAG needs. 1,000 free requests per month, no credit card required.