knowledgesdk.com/blog/agentic-rag-web-data
tutorialMarch 20, 2026·15 min read

Agentic RAG: Building Self-Correcting Retrieval Pipelines with Live Web Data

Learn how to build a CRAG (Corrective RAG) pipeline that falls back to live web scraping when your vector index is stale. Full Python code with LangGraph and KnowledgeSDK.

Agentic RAG: Building Self-Correcting Retrieval Pipelines with Live Web Data

Agentic RAG: Building Self-Correcting Retrieval Pipelines with Live Web Data

Standard RAG has a fundamental limitation: it is only as good as its last index update.

You build a vector store from your documentation, knowledge base, or scraped web content. You index it. The RAG pipeline retrieves relevant chunks and feeds them to your LLM. This works well — until the world changes. Prices shift, APIs update their docs, companies merge, research findings get revised. Your vector index sits there, faithfully returning stale answers.

Agentic RAG, and specifically Corrective RAG (CRAG), solves this by treating retrieval quality as a decision, not a given. Instead of blindly returning whatever the vector search finds, a CRAG pipeline evaluates the relevance and freshness of retrieved content. If the evaluation fails — relevance is too low, confidence is too weak, or the retrieved content is demonstrably outdated — the agent falls back to live web scraping to fetch current information.

This tutorial builds a complete CRAG pipeline using LangGraph for orchestration and KnowledgeSDK for live web data retrieval.


The Architecture

CRAG adds a "relevance grader" between retrieval and generation:

User Query
    │
    ▼
[Vector Store Retrieval]
    │
    ▼
[Relevance Grader] ──── score < threshold ────► [Live Web Search + Scrape]
    │                                                      │
    └── score >= threshold ◄──────────────────────────────┘
    │
    ▼
[Knowledge Refinement] (filter or expand retrieved docs)
    │
    ▼
[Generation] → Answer with citations

The key decisions the system makes:

  1. Are the retrieved documents relevant? Score each retrieved chunk on a 0–1 scale against the query.
  2. Are the documents fresh? Check metadata — when was this content last scraped?
  3. If no/stale → fetch live. Query a search engine or scrape specific URLs for current information.
  4. Re-rank and generate. Combine static and live content, then generate a grounded answer.

This is more sophisticated than naive RAG but far more reliable for production applications where information currency matters.


Setup

pip install langgraph langchain-openai knowledgesdk langchain-community chromadb
export KNOWLEDGESDK_API_KEY="knowledgesdk_live_your_key_here"
export OPENAI_API_KEY="sk-your-openai-key"

Part 1: The Static Vector Store

We start with a simple Chroma vector store as our base index:

from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from datetime import datetime, timedelta
import random

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

# Simulated knowledge base with timestamps
# In production, these come from your scraping/indexing pipeline
sample_docs = [
    Document(
        page_content="""KnowledgeSDK pricing as of Q4 2025:
        Free tier: 1,000 requests/month
        Starter: $29/month for 50,000 requests
        Pro: $99/month for 250,000 requests""",
        metadata={
            "source": "https://knowledgesdk.com/pricing",
            "scraped_at": (datetime.now() - timedelta(days=90)).isoformat(),  # 90 days old
            "title": "KnowledgeSDK Pricing"
        }
    ),
    Document(
        page_content="""LangGraph is an orchestration framework for building stateful,
        multi-actor AI agent applications. As of 2026, LangGraph has 34.5 million
        monthly downloads and is the dominant agent framework for production deployments.""",
        metadata={
            "source": "https://langchain.com/langgraph",
            "scraped_at": (datetime.now() - timedelta(days=5)).isoformat(),  # 5 days old (fresh)
            "title": "LangGraph Overview"
        }
    ),
    Document(
        page_content="""Claude 3.5 Sonnet was released in June 2024 and offered
        significant improvements in coding, reasoning, and instruction following
        compared to Claude 3 Sonnet.""",
        metadata={
            "source": "https://anthropic.com/news",
            "scraped_at": (datetime.now() - timedelta(days=400)).isoformat(),  # 400 days old (very stale)
            "title": "Claude 3.5 Sonnet Release"
        }
    ),
]

# Add to vector store
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
docs = splitter.split_documents(sample_docs)
vectorstore = Chroma.from_documents(docs, embeddings, collection_name="knowledge_base")
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})

print(f"Indexed {len(docs)} chunks")

Part 2: Define the CRAG State

from typing import TypedDict, List, Optional
from langchain_core.documents import Document

class CRAGState(TypedDict):
    query: str
    retrieved_docs: List[Document]
    relevance_scores: List[float]
    needs_web_search: bool
    web_docs: List[Document]
    final_docs: List[Document]
    answer: str
    sources: List[str]
    confidence: float

Part 3: Retrieval Node

async def retrieve_from_index(state: CRAGState) -> dict:
    """Retrieve documents from the vector store."""
    docs = retriever.invoke(state["query"])

    print(f"Retrieved {len(docs)} documents from index")
    return {"retrieved_docs": docs}

Part 4: Relevance Grader Node

This is the heart of CRAG — the document evaluator that decides whether retrieved content is good enough:

from langchain_openai import ChatOpenAI
import json
from datetime import datetime

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

STALENESS_THRESHOLD_DAYS = 30  # Documents older than 30 days considered potentially stale

async def grade_documents(state: CRAGState) -> dict:
    """
    Grade each retrieved document for:
    1. Relevance to the query
    2. Freshness (age of the content)
    """
    query = state["query"]
    docs = state["retrieved_docs"]
    scores = []
    needs_web_search = False

    for doc in docs:
        # Check freshness
        scraped_at_str = doc.metadata.get("scraped_at", "")
        is_stale = False

        if scraped_at_str:
            scraped_at = datetime.fromisoformat(scraped_at_str)
            age_days = (datetime.now() - scraped_at).days
            is_stale = age_days > STALENESS_THRESHOLD_DAYS

        # Grade relevance
        response = await llm.ainvoke([
            {
                "role": "system",
                "content": """Grade the relevance of a document to a query.
                Return JSON: {"score": 0.85, "reasoning": "...", "is_relevant": true}
                Score 0-1 where 1 is perfectly relevant."""
            },
            {
                "role": "user",
                "content": f"""Query: {query}

Document title: {doc.metadata.get('title', 'Unknown')}
Document content: {doc.page_content}
Document age: {doc.metadata.get('scraped_at', 'Unknown')}

Is this document relevant to the query?"""
            }
        ])

        grading = json.loads(response.content)
        score = grading["score"]

        # Penalize stale documents
        if is_stale:
            age_days = (datetime.now() - datetime.fromisoformat(scraped_at_str)).days
            staleness_penalty = min(0.3, age_days / 365 * 0.3)  # Max 30% penalty
            adjusted_score = score * (1 - staleness_penalty)
            print(f"Document '{doc.metadata.get('title')}': relevance={score:.2f}, age={age_days}d, adjusted={adjusted_score:.2f}")
            score = adjusted_score
        else:
            print(f"Document '{doc.metadata.get('title')}': relevance={score:.2f}, fresh")

        scores.append(score)

    # Decision: if best document score is below threshold, trigger web search
    max_score = max(scores) if scores else 0
    avg_score = sum(scores) / len(scores) if scores else 0

    needs_web_search = max_score < 0.6 or avg_score < 0.4

    print(f"Max relevance: {max_score:.2f}, Avg: {avg_score:.2f}, Needs web search: {needs_web_search}")

    return {
        "relevance_scores": scores,
        "needs_web_search": needs_web_search,
    }

Part 5: Live Web Search and Scrape Node

When the grader decides retrieved content is insufficient, we fall back to live web scraping:

import knowledgesdk
import asyncio

ks_client = knowledgesdk.AsyncClient(api_key="knowledgesdk_live_your_key_here")

async def fetch_from_web(state: CRAGState) -> dict:
    """
    Fall back to live web scraping when index content is insufficient.
    Uses KnowledgeSDK to:
    1. Generate search queries
    2. Scrape top results
    3. Return fresh documents
    """
    query = state["query"]

    # Step 1: Generate targeted search queries
    query_response = await llm.ainvoke([
        {
            "role": "system",
            "content": """Generate 3 specific search queries to find current information.
            Return JSON: {"queries": ["query1", "query2", "query3"]}"""
        },
        {"role": "user", "content": f"Research question: {query}"}
    ])
    queries = json.loads(query_response.content)["queries"]

    # Step 2: Use KnowledgeSDK semantic search if we have indexed content
    # (This searches our already-scraped knowledge base)
    web_docs = []

    try:
        search_results = await ks_client.search(query=query, limit=5)
        for result in search_results:
            if result.score > 0.7:  # Only high-confidence matches
                doc = Document(
                    page_content=result.content,
                    metadata={
                        "source": result.url,
                        "title": result.title,
                        "scraped_at": datetime.now().isoformat(),
                        "retrieval_method": "semantic_search",
                        "score": result.score,
                    }
                )
                web_docs.append(doc)
    except Exception as e:
        print(f"Semantic search failed: {e}")

    # Step 3: Scrape specific high-value URLs for fresh content
    # In production, these URLs come from a search API (Tavily, Brave, etc.)
    # For this example, we derive them from the query context
    target_urls = await get_search_urls(query)

    scrape_tasks = [ks_client.scrape(url=url) for url in target_urls[:5]]
    scrape_results = await asyncio.gather(*scrape_tasks, return_exceptions=True)

    for url, result in zip(target_urls, scrape_results):
        if isinstance(result, Exception):
            print(f"Failed to scrape {url}: {result}")
            continue

        doc = Document(
            page_content=result.markdown[:3000],  # Cap to avoid token overflow
            metadata={
                "source": url,
                "title": result.title,
                "scraped_at": datetime.now().isoformat(),
                "retrieval_method": "live_scrape",
            }
        )
        web_docs.append(doc)

    print(f"Fetched {len(web_docs)} web documents")
    return {"web_docs": web_docs}


async def get_search_urls(query: str) -> list[str]:
    """
    In production: call Tavily, Brave Search, or Google Search API.
    For this demo: LLM generates plausible URLs.
    """
    response = await llm.ainvoke([
        {
            "role": "system",
            "content": """Return JSON with 5 URLs that likely contain current information about this topic.
            {"urls": ["https://...", "https://...", ...]}
            Use real, authoritative domains."""
        },
        {"role": "user", "content": f"Find current information about: {query}"}
    ])
    return json.loads(response.content).get("urls", [])

Part 6: Knowledge Refinement Node

After retrieval (from index or web), we refine and combine the best documents:

async def refine_knowledge(state: CRAGState) -> dict:
    """
    Combine and rank static + web documents.
    Filter out truly irrelevant content.
    """
    # Combine all available docs
    static_docs = state["retrieved_docs"]
    web_docs = state.get("web_docs", [])
    scores = state.get("relevance_scores", [])

    # Keep static docs with good scores
    good_static_docs = [
        doc for doc, score in zip(static_docs, scores)
        if score >= 0.5
    ]

    # Combine with web docs (web docs are always considered fresh)
    all_docs = good_static_docs + web_docs

    # Deduplicate by source
    seen_sources = set()
    unique_docs = []
    for doc in all_docs:
        source = doc.metadata.get("source", "")
        if source not in seen_sources:
            seen_sources.add(source)
            unique_docs.append(doc)

    # Rank final documents by relevance (re-grade combined set)
    final_docs = unique_docs[:8]  # Cap at 8 to avoid context overflow

    return {"final_docs": final_docs}

Part 7: Generation Node

async def generate_answer(state: CRAGState) -> dict:
    """Generate a grounded answer from the refined document set."""

    docs = state["final_docs"]
    query = state["query"]

    if not docs:
        return {
            "answer": "I was unable to find sufficient information to answer this question reliably.",
            "sources": [],
            "confidence": 0.0,
        }

    # Format context for the LLM
    context = "\n\n---\n\n".join([
        f"**Source:** {doc.metadata.get('source', 'Unknown')}\n"
        f"**Title:** {doc.metadata.get('title', 'Unknown')}\n"
        f"**Retrieved via:** {doc.metadata.get('retrieval_method', 'index')}\n\n"
        f"{doc.page_content}"
        for doc in docs
    ])

    response = await llm.ainvoke([
        {
            "role": "system",
            "content": """You are a helpful research assistant.
            Answer the question using ONLY the provided sources.
            Always cite your sources with [Source: URL] notation.
            If sources conflict, note the discrepancy and favor more recent information.
            Return JSON: {"answer": "...", "confidence": 0.9}"""
        },
        {
            "role": "user",
            "content": f"""Question: {query}

Sources:
{context}

Provide a comprehensive, cited answer."""
        }
    ])

    result = json.loads(response.content)
    sources = [doc.metadata.get("source", "") for doc in docs]

    return {
        "answer": result["answer"],
        "sources": sources,
        "confidence": result.get("confidence", 0.8),
    }

Part 8: Assemble the CRAG Graph

from langgraph.graph import StateGraph, START, END

def route_based_on_relevance(state: CRAGState) -> str:
    """Route to web search if index retrieval was insufficient."""
    if state.get("needs_web_search", False):
        return "fetch_web"
    return "refine"

# Build the graph
builder = StateGraph(CRAGState)

builder.add_node("retrieve", retrieve_from_index)
builder.add_node("grade", grade_documents)
builder.add_node("fetch_web", fetch_from_web)
builder.add_node("refine", refine_knowledge)
builder.add_node("generate", generate_answer)

builder.add_edge(START, "retrieve")
builder.add_edge("retrieve", "grade")
builder.add_conditional_edges(
    "grade",
    route_based_on_relevance,
    {
        "fetch_web": "fetch_web",
        "refine": "refine",
    }
)
builder.add_edge("fetch_web", "refine")
builder.add_edge("refine", "generate")
builder.add_edge("generate", END)

graph = builder.compile()
print("CRAG pipeline compiled successfully")

Part 9: Run the Pipeline

import asyncio

async def ask(question: str) -> dict:
    """Run a question through the CRAG pipeline."""

    print(f"\n{'='*60}")
    print(f"Question: {question}")
    print('='*60)

    initial_state: CRAGState = {
        "query": question,
        "retrieved_docs": [],
        "relevance_scores": [],
        "needs_web_search": False,
        "web_docs": [],
        "final_docs": [],
        "answer": "",
        "sources": [],
        "confidence": 0.0,
    }

    final_state = await graph.ainvoke(initial_state)

    print(f"\nAnswer (confidence: {final_state['confidence']:.0%}):")
    print(final_state["answer"])
    print(f"\nSources used: {len(final_state['sources'])}")
    for source in final_state["sources"]:
        print(f"  - {source}")

    return final_state


# Test cases
asyncio.run(ask("What is LangGraph and how many downloads does it have?"))
# → Retrieved from fresh index (5 days old), no web search needed

asyncio.run(ask("What is the current pricing for KnowledgeSDK?"))
# → Index doc is 90 days old, triggers web scrape for current pricing

asyncio.run(ask("What is the latest Claude model from Anthropic?"))
# → Index doc is 400 days old, very stale, triggers web scrape

Node.js Implementation

import { StateGraph, Annotation, START, END } from "@langchain/langgraph";
import { ChatOpenAI } from "@langchain/openai";
import KnowledgeSDK from "@knowledgesdk/node";
import { Document } from "@langchain/core/documents";

const llm = new ChatOpenAI({ model: "gpt-4o-mini", temperature: 0 });
const ksClient = new KnowledgeSDK({ apiKey: process.env.KNOWLEDGESDK_API_KEY! });

const CRAGAnnotation = Annotation.Root({
  query: Annotation<string>(),
  retrievedDocs: Annotation<Document[]>(),
  relevanceScores: Annotation<number[]>(),
  needsWebSearch: Annotation<boolean>(),
  webDocs: Annotation<Document[]>(),
  finalDocs: Annotation<Document[]>(),
  answer: Annotation<string>(),
  sources: Annotation<string[]>(),
  confidence: Annotation<number>(),
});

// Grade documents node
async function gradeDocuments(state: typeof CRAGAnnotation.State) {
  const scores: number[] = [];
  let needsWebSearch = false;

  for (const doc of state.retrievedDocs) {
    const scrapedAt = doc.metadata.scraped_at
      ? new Date(doc.metadata.scraped_at)
      : null;
    const ageDays = scrapedAt
      ? (Date.now() - scrapedAt.getTime()) / (1000 * 60 * 60 * 24)
      : 999;

    const response = await llm.invoke([
      {
        role: "system",
        content: 'Grade relevance 0-1. Return JSON: {"score": 0.8}',
      },
      {
        role: "user",
        content: `Query: ${state.query}\nDocument: ${doc.pageContent.slice(0, 500)}`,
      },
    ]);

    let score = JSON.parse(response.content as string).score;

    // Penalize stale content
    if (ageDays > 30) {
      score *= 1 - Math.min(0.3, (ageDays / 365) * 0.3);
    }

    scores.push(score);
  }

  const maxScore = Math.max(...scores, 0);
  needsWebSearch = maxScore < 0.6;

  console.log(`Max relevance: ${maxScore.toFixed(2)}, Needs web search: ${needsWebSearch}`);

  return { relevanceScores: scores, needsWebSearch };
}

// Live web fetch node
async function fetchFromWeb(state: typeof CRAGAnnotation.State) {
  const webDocs: Document[] = [];

  // Semantic search via KnowledgeSDK
  try {
    const searchResults = await ksClient.search({
      query: state.query,
      limit: 5,
    });

    for (const result of searchResults) {
      if (result.score > 0.7) {
        webDocs.push(
          new Document({
            pageContent: result.content,
            metadata: {
              source: result.url,
              title: result.title,
              scraped_at: new Date().toISOString(),
              retrieval_method: "semantic_search",
            },
          })
        );
      }
    }
  } catch (e) {
    console.error("Semantic search failed:", e);
  }

  // Live scrape fallback
  const urlsResponse = await llm.invoke([
    { role: "system", content: 'Return JSON: {"urls": ["url1", "url2"]}' },
    { role: "user", content: `Find info about: ${state.query}` },
  ]);

  const { urls } = JSON.parse(urlsResponse.content as string);

  const scrapeResults = await Promise.allSettled(
    urls.slice(0, 5).map((url: string) => ksClient.scrape({ url }))
  );

  for (let i = 0; i < scrapeResults.length; i++) {
    const result = scrapeResults[i];
    if (result.status === "fulfilled") {
      webDocs.push(
        new Document({
          pageContent: result.value.markdown.slice(0, 3000),
          metadata: {
            source: urls[i],
            title: result.value.title,
            scraped_at: new Date().toISOString(),
            retrieval_method: "live_scrape",
          },
        })
      );
    }
  }

  console.log(`Fetched ${webDocs.length} web documents`);
  return { webDocs };
}

// Generate answer node
async function generateAnswer(state: typeof CRAGAnnotation.State) {
  const docs = state.finalDocs.length > 0 ? state.finalDocs : state.webDocs;

  const context = docs
    .map((d) => `Source: ${d.metadata.source}\n\n${d.pageContent}`)
    .join("\n\n---\n\n");

  const response = await llm.invoke([
    {
      role: "system",
      content:
        'Answer using only provided sources with citations. Return JSON: {"answer": "...", "confidence": 0.9}',
    },
    { role: "user", content: `Question: ${state.query}\n\nSources:\n${context}` },
  ]);

  const result = JSON.parse(response.content as string);
  return {
    answer: result.answer,
    sources: docs.map((d) => d.metadata.source as string),
    confidence: result.confidence || 0.8,
  };
}

// Build graph
const builder = new StateGraph(CRAGAnnotation)
  .addNode("retrieve", async (state) => ({
    retrievedDocs: await vectorstore.similaritySearch(state.query, 5),
  }))
  .addNode("grade", gradeDocuments)
  .addNode("fetch_web", fetchFromWeb)
  .addNode("refine", async (state) => {
    const goodStatic = state.retrievedDocs.filter(
      (_, i) => (state.relevanceScores[i] || 0) >= 0.5
    );
    const allDocs = [...goodStatic, ...state.webDocs];
    return { finalDocs: allDocs.slice(0, 8) };
  })
  .addNode("generate", generateAnswer)
  .addEdge(START, "retrieve")
  .addEdge("retrieve", "grade")
  .addConditionalEdges("grade", (state) => (state.needsWebSearch ? "fetch_web" : "refine"), {
    fetch_web: "fetch_web",
    refine: "refine",
  })
  .addEdge("fetch_web", "refine")
  .addEdge("refine", "generate")
  .addEdge("generate", END);

const cragGraph = builder.compile();

// Usage
async function askQuestion(question: string) {
  const result = await cragGraph.invoke({
    query: question,
    retrievedDocs: [],
    relevanceScores: [],
    needsWebSearch: false,
    webDocs: [],
    finalDocs: [],
    answer: "",
    sources: [],
    confidence: 0,
  });

  console.log("Answer:", result.answer);
  console.log("Confidence:", result.confidence);
  console.log("Sources:", result.sources);
  return result;
}

Production Enhancements

Automatic Index Refresh with Webhooks

Instead of reactively detecting stale content at query time, proactively keep your index fresh using KnowledgeSDK webhooks:

import knowledgesdk

client = knowledgesdk.Client(api_key="knowledgesdk_live_your_key_here")

# Register pages to monitor for changes
client.webhooks.create(
    url="https://yourapp.com/webhooks/content-changed",
    events=["page.changed"],
    watch_urls=[
        "https://docs.example.com/api",
        "https://competitor.com/pricing",
        "https://industry-source.com/reports",
    ]
)

# Your webhook handler re-indexes changed pages automatically
# This keeps your vector store fresh without manual polling

Confidence Thresholds by Query Type

Different query types require different confidence thresholds:

CONFIDENCE_THRESHOLDS = {
    "pricing": 0.9,       # Price data must be very fresh
    "technical": 0.7,     # Tech docs change less frequently
    "factual": 0.8,       # Facts need high confidence
    "general": 0.5,       # General knowledge is more stable
}

async def classify_query(query: str) -> str:
    """Classify query type to set appropriate threshold."""
    response = await llm.ainvoke([...])
    return response.content  # "pricing", "technical", etc.

Why CRAG Outperforms Naive RAG

Scenario Naive RAG CRAG
Fresh indexed content Correct Correct
Stale indexed content Wrong (confidently) Falls back, fetches live data
Topic not in index Hallucination Falls back, fetches live data
Conflicting sources Picks one (randomly) Notes conflict, favors recent
Real-time pricing/data Outdated answer Live scrape returns current data

The cost of CRAG over naive RAG is an additional LLM call for the relevance grader (roughly $0.0002 per query at gpt-4o-mini prices) and occasional live scraping when the grader fires. For most production applications, that cost is trivially small compared to the cost of returning wrong answers.


Conclusion

Static RAG pipelines are a liability when information currency matters. Agentic CRAG transforms retrieval from a deterministic lookup into an adaptive, self-correcting process.

The pattern is straightforward: retrieve from your index, grade the relevance and freshness of what you find, and fall back to live web scraping when the grade is insufficient. KnowledgeSDK makes the fallback layer fast, clean, and reliable — returning LLM-ready markdown that slots directly into your context window without additional processing.

The result is a RAG pipeline that stays accurate over time, without requiring constant manual re-indexing or accepting the risk of stale answers.


Ready to build a self-correcting RAG pipeline? Start with KnowledgeSDK — the live web data layer your agentic RAG needs. 1,000 free requests per month, no credit card required.

Try it now

Scrape, search, and monitor any website with one API.

Get your API key in 30 seconds. First 1,000 requests free.

GET API KEY →

Related Articles

tutorial

Build a Searchable Knowledge Base from Any Website in Minutes

tutorial

Build a Compliance Chatbot That Reads Your Website Automatically

tutorial

Context Engineering with Live Web Data: Keep Your AI Agents Current

tutorial

GraphRAG + Web Scraping: Extract Entities and Build Knowledge Graphs from Any Website

← Back to blog