Why Vector Search Alone Falls Short
Standard RAG works by embedding documents into a vector space and retrieving the top-k chunks most semantically similar to a query. For isolated factual lookups — "What is the refund policy?" or "Which API endpoint handles authentication?" — this approach performs well. But real enterprise knowledge rarely lives in isolated paragraphs.
- Multi-hop questions. "Which engineers worked on projects that used the same database vendor as the one affected by the outage last quarter?" requires following relationships across multiple documents. No single chunk contains the full answer; vector similarity alone cannot chain across entities.
- Implicit relationships. A document corpus accumulates implicit structure: people belong to teams, teams own services, services depend on databases. This graph exists in the text but is invisible to a flat vector index.
- Global summaries. "What are the main architectural themes across all our design documents?" requires synthesising hundreds of documents. The top-k chunks retrieved by ANN search are a biased local sample, not a representative global view.
- Semantic drift at retrieval time. Embedding similarity captures surface-level wording, not causal or taxonomic relationships. A document about "database connection pooling" may score lower than a tangentially related paragraph about "connection limits" even when the former is the authoritative source.
GraphRAG addresses these gaps by extracting a structured knowledge graph from the document corpus and using it to augment — not replace — vector retrieval. The result is a hybrid system that can answer both local lookups and global synthesis questions.
Note
The GraphRAG Architecture
Microsoft's GraphRAG paper (2024) introduced a two-phase pipeline: an offline indexing phase that builds the knowledge graph from raw text, and an online query phase that selects between local and global retrieval strategies depending on query type.
Indexing Pipeline Overview
# GraphRAG indexing pipeline — four stages
#
# Stage 1: ENTITY EXTRACTION
# Input: raw documents (PDFs, markdown, HTML, plain text)
# Process: LLM or NLP model extracts named entities (people, orgs,
# technologies, concepts) and their textual descriptions
# Output: entity list with descriptions and source document references
#
# Stage 2: RELATIONSHIP EXTRACTION
# Input: same documents + entity list from Stage 1
# Process: LLM extracts (subject, predicate, object) triples
# describing how entities relate to each other
# Output: edge list with relationship types, weights, and descriptions
#
# Stage 3: COMMUNITY DETECTION
# Input: entity graph (nodes + edges from Stages 1–2)
# Process: Leiden algorithm partitions the graph into communities
# (dense subgraphs of closely related entities)
# Output: hierarchical community assignments at multiple resolutions
#
# Stage 4: COMMUNITY SUMMARIZATION
# Input: entities and relationships within each community
# Process: LLM synthesises a textual summary of each community's
# key entities, themes, and inter-entity relationships
# Output: community summary documents indexed in a vector store
#
# Query phase: LOCAL vs GLOBAL
# LOCAL queries → hybrid ANN vector search + Cypher graph traversal
# GLOBAL queries → map-reduce over community summariesEntity and Relationship Extraction
The first step is converting unstructured text into a structured graph. For high-volume corpora, a two-tier approach works well: use spaCy's transformer NER model for coarse entity detection, then an LLM for relationship extraction and entity disambiguation. This keeps token costs low while preserving accuracy.
Entity Extraction with spaCy + LLM Disambiguation
# pip install spacy anthropic
# python -m spacy download en_core_web_trf
import spacy
import anthropic
import json
from dataclasses import dataclass, field
nlp = spacy.load("en_core_web_trf")
client = anthropic.Anthropic()
@dataclass
class Entity:
name: str
type: str # PERSON, ORG, TECHNOLOGY, CONCEPT, etc.
description: str
source_docs: list[str] = field(default_factory=list)
@dataclass
class Relationship:
source: str # entity name
target: str # entity name
predicate: str # e.g. "USES", "OWNS", "DEPENDS_ON", "LEADS"
description: str
weight: float = 1.0
ENTITY_PROMPT = """Extract all named entities from the following text.
For each entity, provide:
- name: canonical name (normalised, e.g. "PostgreSQL" not "postgres" or "PG")
- type: one of PERSON, ORGANIZATION, TECHNOLOGY, CONCEPT, PRODUCT, LOCATION
- description: 1-2 sentence description based on how it appears in this text
Text:
{text}
Return ONLY a JSON array of objects with keys: name, type, description.
Merge duplicate entities (same real-world referent, different surface forms)."""
RELATIONSHIP_PROMPT = """Given the following text and entity list, extract all
relationships between entities as (source, predicate, target, description) tuples.
Entities: {entities}
Text:
{text}
Use active-voice predicates like: USES, OWNS, MANAGES, DEPENDS_ON, INTEGRATES_WITH,
REPLACES, SUCCEEDS, LEADS, PART_OF, INSTANCE_OF, CAUSES, MONITORS.
Return ONLY a JSON array with keys: source, predicate, target, description, weight (0.1–1.0).
Only include relationships explicitly or strongly implied by the text."""
def extract_entities(text: str, doc_id: str) -> list[Entity]:
# Fast NER with spaCy to seed the LLM prompt
doc = nlp(text[:50000]) # spaCy limit
spacy_ents = {ent.text for ent in doc.ents if ent.label_ in
{"PERSON", "ORG", "GPE", "PRODUCT", "WORK_OF_ART"}}
response = client.messages.create(
model="claude-haiku-4-5",
max_tokens=2048,
messages=[{"role": "user", "content": ENTITY_PROMPT.format(text=text[:8000])}],
)
raw = json.loads(response.content[0].text)
return [
Entity(
name=e["name"],
type=e["type"],
description=e["description"],
source_docs=[doc_id],
)
for e in raw
]
def extract_relationships(text: str, entities: list[Entity]) -> list[Relationship]:
entity_names = [e.name for e in entities]
response = client.messages.create(
model="claude-haiku-4-5",
max_tokens=2048,
messages=[{
"role": "user",
"content": RELATIONSHIP_PROMPT.format(
entities=json.dumps(entity_names),
text=text[:8000],
),
}],
)
raw = json.loads(response.content[0].text)
return [
Relationship(
source=r["source"],
target=r["target"],
predicate=r["predicate"],
description=r["description"],
weight=float(r.get("weight", 1.0)),
)
for r in raw
if r["source"] in entity_names and r["target"] in entity_names
]Note
asyncio or a task queue. Use a separate deduplication pass to merge entities that refer to the same real-world object across documents — simple string normalisation handles ~80% of cases; the remaining 20% can be resolved with an embedding-similarity merge step.Building the Property Graph in Neo4j
Neo4j's Cypher query language is well-suited for property graphs: nodes represent entities, relationships carry typed edges, and both can hold arbitrary properties. Neo4j 5.x adds native vector indexing, enabling ANN search directly in the graph without a separate vector store.
Schema Setup and MERGE-based Upserts
# pip install neo4j anthropic sentence-transformers
from neo4j import GraphDatabase
from sentence_transformers import SentenceTransformer
import numpy as np
NEO4J_URI = "bolt://localhost:7687"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "your-password" # use env var in production
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
embedder = SentenceTransformer("all-MiniLM-L6-v2") # 384-dim, fast
def setup_schema(session) -> None:
"""Create indexes and constraints once at startup."""
# Uniqueness constraint — prevents duplicate entity nodes
session.run("""
CREATE CONSTRAINT entity_name IF NOT EXISTS
FOR (e:Entity) REQUIRE e.name IS UNIQUE
""")
# Full-text index for keyword search
session.run("""
CREATE FULLTEXT INDEX entity_description IF NOT EXISTS
FOR (e:Entity) ON EACH [e.name, e.description]
""")
# Vector index for ANN search (Neo4j 5.11+)
session.run("""
CREATE VECTOR INDEX entity_embeddings IF NOT EXISTS
FOR (e:Entity) ON e.embedding
OPTIONS {indexConfig: {
`vector.dimensions`: 384,
`vector.similarity_function`: 'cosine'
}}
""")
def upsert_entity(session, entity: "Entity") -> None:
embedding = embedder.encode(
f"{entity.name}: {entity.description}"
).tolist()
session.run(
"""
MERGE (e:Entity {name: $name})
ON CREATE SET
e.type = $type,
e.description = $description,
e.embedding = $embedding,
e.source_docs = $source_docs,
e.created_at = datetime()
ON MATCH SET
e.description = CASE WHEN size($description) > size(e.description)
THEN $description ELSE e.description END,
e.source_docs = apoc.coll.toSet(e.source_docs + $source_docs),
e.embedding = $embedding,
e.updated_at = datetime()
""",
name=entity.name,
type=entity.type,
description=entity.description,
embedding=embedding,
source_docs=entity.source_docs,
)
def upsert_relationship(session, rel: "Relationship") -> None:
# MERGE prevents duplicate edges between the same pair
session.run(
"""
MATCH (src:Entity {name: $source})
MATCH (tgt:Entity {name: $target})
MERGE (src)-[r:RELATES_TO {predicate: $predicate}]->(tgt)
ON CREATE SET
r.description = $description,
r.weight = $weight,
r.created_at = datetime()
ON MATCH SET
r.weight = (r.weight + $weight) / 2.0,
r.updated_at = datetime()
""",
source=rel.source,
target=rel.target,
predicate=rel.predicate,
description=rel.description,
weight=rel.weight,
)
def ingest_document(entities: list, relationships: list) -> None:
with driver.session() as session:
with session.begin_transaction() as tx:
setup_schema(session)
for entity in entities:
with driver.session() as session:
upsert_entity(session, entity)
for rel in relationships:
with driver.session() as session:
upsert_relationship(session, rel)Community Detection with the Leiden Algorithm
Community detection partitions the entity graph into clusters of densely interconnected nodes. In the context of GraphRAG, each community represents a coherent thematic cluster — e.g., all entities related to "data ingestion infrastructure" or "authentication and authorisation". The Leiden algorithm is preferred over Louvain because it guarantees well-connected communities and avoids the disconnected-community problem.
Community Detection and Summary Generation
# pip install leidenalg igraph
import igraph as ig
import leidenalg
from neo4j import GraphDatabase
import anthropic
import json
client = anthropic.Anthropic()
def load_graph_from_neo4j(driver) -> ig.Graph:
"""Export the entity graph from Neo4j into an igraph object."""
with driver.session() as session:
nodes_result = session.run("MATCH (e:Entity) RETURN e.name AS name, id(e) AS id")
nodes = [(r["id"], r["name"]) for r in nodes_result]
edges_result = session.run("""
MATCH (a:Entity)-[r:RELATES_TO]->(b:Entity)
RETURN id(a) AS src, id(b) AS tgt, r.weight AS weight
""")
edges = [(r["src"], r["tgt"], r["weight"]) for r in edges_result]
node_id_map = {neo4j_id: idx for idx, (neo4j_id, _) in enumerate(nodes)}
node_names = [name for _, name in nodes]
g = ig.Graph(directed=False)
g.add_vertices(len(nodes))
g.vs["name"] = node_names
edge_list = [(node_id_map[src], node_id_map[tgt]) for src, tgt, _ in edges]
edge_weights = [w for _, _, w in edges]
g.add_edges(edge_list)
g.es["weight"] = edge_weights
return g
def detect_communities(g: ig.Graph, resolution: float = 1.0) -> dict[str, int]:
"""Run Leiden algorithm and return entity → community_id mapping."""
partition = leidenalg.find_partition(
g,
leidenalg.RBConfigurationVertexPartition,
weights=g.es["weight"],
resolution_parameter=resolution,
n_iterations=10,
seed=42,
)
return {g.vs[i]["name"]: community_id
for community_id, members in enumerate(partition)
for i in members}
COMMUNITY_SUMMARY_PROMPT = """You are a technical documentation analyst.
Summarise the following cluster of related entities and their relationships
into a coherent thematic overview.
Entities in this community:
{entities}
Key relationships:
{relationships}
Write a 2-3 paragraph summary covering:
1. The primary theme or technical domain of this community
2. The most important entities and their roles
3. Key dependencies, workflows, or interactions between entities
Be specific and technical. This summary will be used to answer user questions."""
def summarise_community(
driver,
community_id: int,
entity_names: list[str],
) -> str:
with driver.session() as session:
entities_result = session.run(
"MATCH (e:Entity) WHERE e.name IN $names "
"RETURN e.name AS name, e.type AS type, e.description AS desc",
names=entity_names,
)
entities_text = "
".join(
f"- {r['name']} ({r['type']}): {r['desc']}"
for r in entities_result
)
rels_result = session.run(
"""
MATCH (a:Entity)-[r:RELATES_TO]->(b:Entity)
WHERE a.name IN $names AND b.name IN $names
RETURN a.name AS src, r.predicate AS pred, b.name AS tgt, r.description AS desc
LIMIT 50
""",
names=entity_names,
)
rels_text = "
".join(
f"- {r['src']} {r['pred']} {r['tgt']}: {r['desc']}"
for r in rels_result
)
response = client.messages.create(
model="claude-haiku-4-5",
max_tokens=1024,
messages=[{
"role": "user",
"content": COMMUNITY_SUMMARY_PROMPT.format(
entities=entities_text,
relationships=rels_text,
),
}],
)
return response.content[0].text
def store_community_summary(driver, community_id: int, summary: str, entity_names: list[str]) -> None:
from sentence_transformers import SentenceTransformer
embedder = SentenceTransformer("all-MiniLM-L6-v2")
embedding = embedder.encode(summary).tolist()
with driver.session() as session:
session.run(
"""
MERGE (c:Community {id: $community_id})
SET c.summary = $summary,
c.embedding = $embedding,
c.members = $entity_names,
c.updated_at = datetime()
""",
community_id=community_id,
summary=summary,
embedding=embedding,
entity_names=entity_names,
)Hybrid Retrieval: Vector Search + Graph Traversal
GraphRAG's power comes from combining two retrieval strategies. For local queries — questions about specific entities, relationships, or events — ANN vector search finds the seed entities, and Cypher graph traversal expands the context by following typed relationships. For global queries — thematic summaries, broad comparisons — the community summary index provides a pre-synthesised view of the corpus.
Local Query: ANN Seed + Cypher Expansion
from neo4j import GraphDatabase
from sentence_transformers import SentenceTransformer
import anthropic
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
embedder = SentenceTransformer("all-MiniLM-L6-v2")
client = anthropic.Anthropic()
def local_retrieval(query: str, top_k: int = 5, hops: int = 2) -> list[dict]:
"""
Phase 1: ANN search finds the k most relevant seed entities.
Phase 2: Cypher traversal expands up to 'hops' relationship hops
from each seed to gather connected context.
"""
query_embedding = embedder.encode(query).tolist()
with driver.session() as session:
# Phase 1: ANN vector search for seed entities
seed_result = session.run(
"""
CALL db.index.vector.queryNodes('entity_embeddings', $k, $embedding)
YIELD node AS e, score
RETURN e.name AS name, e.description AS description,
e.type AS type, score
ORDER BY score DESC
""",
k=top_k,
embedding=query_embedding,
)
seeds = [dict(r) for r in seed_result]
seed_names = [s["name"] for s in seeds]
if not seed_names:
return []
# Phase 2: Cypher graph expansion — follow relationships outward
expansion_result = session.run(
f"""
MATCH (seed:Entity) WHERE seed.name IN $seed_names
CALL {{
WITH seed
MATCH path = (seed)-[r:RELATES_TO*1..{hops}]-(neighbor:Entity)
RETURN neighbor, relationships(path) AS rels,
length(path) AS distance
}}
RETURN DISTINCT
neighbor.name AS name,
neighbor.description AS description,
neighbor.type AS type,
min(distance) AS distance,
collect(DISTINCT {{
predicate: last(rels).predicate,
description: last(rels).description
}}) AS edges
ORDER BY distance ASC
LIMIT 30
""",
seed_names=seed_names,
)
neighbors = [dict(r) for r in expansion_result]
return seeds + neighbors
def answer_local_query(query: str) -> str:
context_nodes = local_retrieval(query)
context_text = "
".join(
f"Entity: {n['name']} ({n['type']})
"
f"Description: {n['description']}
"
+ (f"Connected via: {n.get('edges', [])}" if n.get("edges") else "")
for n in context_nodes
)
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{
"role": "user",
"content": (
f"Answer the following question using only the knowledge graph context provided.\n\n"
f"Context (entities and relationships from the knowledge graph):\n{context_text}\n\n"
f"Question: {query}"
),
}],
)
return response.content[0].textGlobal Query: Map-Reduce over Community Summaries
def global_retrieval(query: str, top_communities: int = 10) -> list[str]:
"""
For broad thematic questions, search community summaries instead of
individual entity nodes. Community summaries are pre-synthesised
overviews of dense entity clusters — better for global reasoning.
"""
query_embedding = embedder.encode(query).tolist()
with driver.session() as session:
result = session.run(
"""
CALL db.index.vector.queryNodes('community_embeddings', $k, $embedding)
YIELD node AS c, score
RETURN c.summary AS summary, c.members AS members, score
ORDER BY score DESC
""",
k=top_communities,
embedding=query_embedding,
)
return [r["summary"] for r in result]
def answer_global_query(query: str) -> str:
"""
Map phase: retrieve top community summaries.
Reduce phase: synthesise a final answer from the summaries.
This two-step approach prevents the LLM context from being
overwhelmed by raw entity data when answering broad questions.
"""
community_summaries = global_retrieval(query)
# Map step: score each community summary's relevance
map_prompt = (
"Given the following community summary and the user question, "
"extract the most relevant points from this summary that help "
"answer the question. If the summary is not relevant, return an empty string.\n\n"
"Question: {query}\n\nSummary:\n{summary}"
)
relevant_excerpts = []
for summary in community_summaries:
response = client.messages.create(
model="claude-haiku-4-5",
max_tokens=512,
messages=[{
"role": "user",
"content": map_prompt.format(query=query, summary=summary),
}],
)
excerpt = response.content[0].text.strip()
if excerpt:
relevant_excerpts.append(excerpt)
if not relevant_excerpts:
return "No relevant information found in the knowledge graph."
# Reduce step: synthesise final answer
reduce_context = "
---
".join(relevant_excerpts)
reduce_response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{
"role": "user",
"content": (
f"Synthesise a comprehensive answer to the following question "
f"based on the extracted knowledge graph insights below.\n\n"
f"Question: {query}\n\nInsights:\n{reduce_context}"
),
}],
)
return reduce_response.content[0].textLangChain Neo4j Integration
If you are already using LangChain, the Neo4jGraph integration provides GraphCypherQAChain — a chain that translates natural language questions into Cypher queries using an LLM, executes them against Neo4j, and returns the results as context for answer generation.
# pip install langchain langchain-anthropic langchain-community neo4j
from langchain_community.graphs import Neo4jGraph
from langchain.chains import GraphCypherQAChain
from langchain_anthropic import ChatAnthropic
# Connect LangChain to Neo4j
graph = Neo4jGraph(
url=NEO4J_URI,
username=NEO4J_USER,
password=NEO4J_PASSWORD,
)
# Refresh the schema so the LLM knows what node labels and
# relationship types exist — call this after bulk ingestion
graph.refresh_schema()
# Inspect what the LLM will see
print(graph.schema)
# Example output:
# Node properties are the following:
# Entity {name: STRING, type: STRING, description: STRING, embedding: LIST}
# Community {id: INTEGER, summary: STRING, members: LIST}
# Relationship properties are the following:
# RELATES_TO {predicate: STRING, description: STRING, weight: FLOAT}
# ...
llm = ChatAnthropic(model="claude-sonnet-4-6", temperature=0)
chain = GraphCypherQAChain.from_llm(
llm=llm,
graph=graph,
verbose=True, # log the generated Cypher query
return_intermediate_steps=True,
allow_dangerous_requests=True, # required in langchain >= 0.2.0
)
# Ask a multi-hop question
result = chain.invoke({
"query": "Which teams own services that depend on PostgreSQL, "
"and what incidents have affected those services?"
})
print(result["result"])
# The chain generates Cypher like:
# MATCH (t:Entity {type: 'ORGANIZATION'})-[:RELATES_TO {predicate: 'OWNS'}]
# ->(s:Entity {type: 'TECHNOLOGY'})-[:RELATES_TO {predicate: 'DEPENDS_ON'}]
# ->(db:Entity {name: 'PostgreSQL'})
# OPTIONAL MATCH (s)-[:RELATES_TO {predicate: 'AFFECTED_BY'}]->(i:Entity {type: 'CONCEPT'})
# RETURN t.name AS team, s.name AS service, i.name AS incident
# For RAG-style use, combine Cypher results with vector search context
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_template("""
Answer the question using the graph context and document context below.
Graph context (from Cypher query):
{graph_context}
Document context (from vector search):
{document_context}
Question: {question}
""")Note
GraphCypherQAChain works best when the graph schema is clean and the LLM has been primed with a few-shot Cypher example for your specific domain. For production use, validate generated Cypher before execution with a read-only Neo4j user that has MATCH permissions only — never expose a write-enabled connection to a chain that generates arbitrary queries.Incremental Graph Updates and Freshness
A static knowledge graph goes stale. Production deployments need a strategy for keeping entity descriptions, relationships, and community summaries current as the underlying document corpus changes. Two patterns cover most cases: TTL-based scheduled refresh for slowly-changing corpora, and CDC-triggered incremental updates for near-real-time knowledge bases.
Change Detection and Incremental Reindexing
import hashlib
import datetime
from neo4j import GraphDatabase
def document_fingerprint(content: str) -> str:
return hashlib.sha256(content.encode()).hexdigest()
def needs_reindex(driver, doc_id: str, content_hash: str) -> bool:
"""Returns True if this document has changed since last indexing."""
with driver.session() as session:
result = session.run(
"MATCH (d:Document {id: $id}) RETURN d.content_hash AS hash",
id=doc_id,
)
record = result.single()
if record is None:
return True # new document
return record["hash"] != content_hash
def mark_indexed(driver, doc_id: str, content_hash: str) -> None:
with driver.session() as session:
session.run(
"""
MERGE (d:Document {id: $id})
SET d.content_hash = $hash,
d.last_indexed = $ts
""",
id=doc_id,
hash=content_hash,
ts=datetime.datetime.utcnow().isoformat(),
)
def incremental_ingest(driver, documents: list[dict]) -> dict:
"""
Process only changed documents, reuse existing graph nodes
for unchanged ones. Returns counts of new/updated/skipped docs.
"""
stats = {"new": 0, "updated": 0, "skipped": 0}
for doc in documents:
doc_id = doc["id"]
content = doc["content"]
content_hash = document_fingerprint(content)
if not needs_reindex(driver, doc_id, content_hash):
stats["skipped"] += 1
continue
# Soft-delete stale relationships from this document
with driver.session() as session:
session.run(
"""
MATCH ()-[r:RELATES_TO]-()
WHERE $doc_id IN r.source_docs
WITH r, [x IN r.source_docs WHERE x <> $doc_id] AS remaining
SET r.source_docs = remaining
WITH r WHERE size(r.source_docs) = 0
DELETE r
""",
doc_id=doc_id,
)
entities = extract_entities(content, doc_id)
relationships = extract_relationships(content, entities)
for entity in entities:
with driver.session() as session:
upsert_entity(session, entity)
for rel in relationships:
with driver.session() as session:
upsert_relationship(session, rel)
mark_indexed(driver, doc_id, content_hash)
is_new = not needs_reindex(driver, doc_id, "")
stats["new" if is_new else "updated"] += 1
return stats
# Scheduled refresh — run via Airflow or cron
# After any incremental update, re-run community detection
# if >5% of entities changed:
def should_rerun_communities(stats: dict, total_entities: int) -> bool:
changed = stats["new"] + stats["updated"]
return changed / max(total_entities, 1) > 0.05Monitoring GraphRAG Quality
GraphRAG introduces new failure modes beyond standard RAG. Entity extraction errors propagate into the graph, corrupt relationships, and degrade retrieval quality silently. Monitor graph health with metrics that are specific to the graph layer, not just the LLM response layer.
# metrics.py — Prometheus metrics for GraphRAG
from prometheus_client import Counter, Gauge, Histogram
# Graph structure metrics
GRAPH_ENTITY_COUNT = Gauge(
"graphrag_entity_total",
"Total number of entities in the knowledge graph",
["entity_type"],
)
GRAPH_RELATIONSHIP_COUNT = Gauge(
"graphrag_relationship_total",
"Total number of relationships in the knowledge graph",
)
GRAPH_COMMUNITY_COUNT = Gauge(
"graphrag_community_total",
"Total number of communities",
)
ENTITY_COVERAGE_RATIO = Gauge(
"graphrag_entity_coverage_ratio",
"Fraction of documents that have at least one entity extracted",
)
# Retrieval quality metrics
GRAPH_RETRIEVAL_LATENCY = Histogram(
"graphrag_retrieval_duration_seconds",
"End-to-end GraphRAG retrieval latency (ANN + Cypher)",
["query_type"], # local | global
buckets=[0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0],
)
RETRIEVED_ENTITY_COUNT = Histogram(
"graphrag_retrieved_entities",
"Number of entities retrieved per query",
["query_type"],
buckets=[1, 2, 5, 10, 20, 50, 100],
)
EMPTY_RETRIEVAL_RATE = Counter(
"graphrag_empty_retrieval_total",
"Queries that returned zero entities from the graph",
["query_type"],
)
def collect_graph_metrics(driver) -> None:
"""Populate graph structure metrics — run every 5 minutes."""
with driver.session() as session:
entity_result = session.run(
"MATCH (e:Entity) RETURN e.type AS type, count(*) AS cnt"
)
for row in entity_result:
GRAPH_ENTITY_COUNT.labels(entity_type=row["type"]).set(row["cnt"])
rel_count = session.run(
"MATCH ()-[r:RELATES_TO]->() RETURN count(r) AS cnt"
).single()["cnt"]
GRAPH_RELATIONSHIP_COUNT.set(rel_count)
community_count = session.run(
"MATCH (c:Community) RETURN count(c) AS cnt"
).single()["cnt"]
GRAPH_COMMUNITY_COUNT.set(community_count)
# Entity coverage: documents that yielded >= 1 entity
coverage = session.run("""
MATCH (d:Document)
WITH count(d) AS total
MATCH (d:Document) WHERE size(d.entity_names) > 0
WITH total, count(d) AS covered
RETURN toFloat(covered) / total AS ratio
""").single()
if coverage:
ENTITY_COVERAGE_RATIO.set(coverage["ratio"])Standard RAG vs GraphRAG vs Hybrid — When to Use Each
GraphRAG adds significant complexity to your retrieval stack. The engineering cost is only justified when your query patterns actually require it. Use this framework to make the right call.
- Use standard vector RAG when: Questions are factual lookups against isolated document chunks. The corpus is small (<10k documents) and entities rarely appear across multiple documents. Fast iteration is more important than maximum accuracy. Budget and operational complexity are primary constraints.
- Use GraphRAG when: Questions require multi-hop reasoning ("who works with whom on what"). The corpus has high entity density — technical docs, codebases, knowledge bases, research papers. Global synthesis queries ("summarise all architecture decisions across the corpus") are common. You need explainable retrieval — Cypher traversal paths are inspectable.
- Use hybrid (vector RAG + graph) when: Most queries are local lookups but a subset require multi-hop reasoning. You want to start with standard RAG and incrementally add graph retrieval for query types that underperform. The document corpus contains both structured (graph-extractable) and unstructured (chunk-only) content.
- Operational cost to consider: GraphRAG adds two new systems to operate (a graph database and a community detection pipeline). Entity extraction costs ~0.5–2 LLM calls per document. Community re-detection is O(n log n) in entity count and should run after every significant corpus update. Budget 3–5× the infrastructure cost of standard RAG for a production GraphRAG deployment.
Note
Work with us
Building RAG systems that struggle with complex multi-hop questions or cross-document reasoning?
We design and implement GraphRAG systems — from entity and relationship extraction pipelines and Neo4j property graph construction to hybrid vector+graph retrieval, community summarization for global queries, incremental graph update pipelines, and production monitoring for entity coverage and faithfulness. Let’s talk.
Get in touch