diff --git a/docs/ecosystem/index.md b/docs/ecosystem/index.md
index 789dcee6d..d13f48c5f 100644
--- a/docs/ecosystem/index.md
+++ b/docs/ecosystem/index.md
@@ -148,6 +148,7 @@ Persist and cache your data:
|
**Slack** | Notifications and integrations | [Examples](https://github.com/apache/hamilton/tree/main/examples/slack) \| [Lifecycle Hook](../reference/lifecycle-hooks/SlackNotifierHook.rst) |
|
**GeoPandas** | Geospatial data analysis | [Type extension](https://github.com/apache/hamilton/blob/main/hamilton/plugins/geopandas_extensions.py) for GeoDataFrame support |
|
**YAML** | Configuration management | [IO Adapters](../reference/io/available-data-adapters.rst) |
+| **Neo4j** | Knowledge graph RAG | [Examples](https://github.com/apache/hamilton/tree/main/examples/LLM_Workflows/neo4j_graph_rag) |
---
diff --git a/examples/LLM_Workflows/neo4j_graph_rag/.env.example b/examples/LLM_Workflows/neo4j_graph_rag/.env.example
new file mode 100644
index 000000000..3d70e233e
--- /dev/null
+++ b/examples/LLM_Workflows/neo4j_graph_rag/.env.example
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# OpenAI
+OPENAI_API_KEY=your-openai-api-key-here
+
+# Neo4j
+NEO4J_URI=bolt://localhost:7687
+NEO4J_USERNAME=neo4j
+NEO4J_PASSWORD=password
+NEO4J_DATABASE=neo4j
diff --git a/examples/LLM_Workflows/neo4j_graph_rag/.gitignore b/examples/LLM_Workflows/neo4j_graph_rag/.gitignore
new file mode 100644
index 000000000..021d89f60
--- /dev/null
+++ b/examples/LLM_Workflows/neo4j_graph_rag/.gitignore
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Environment
+.env
+
+# Python
+__pycache__/
+*.pyc
+*.pyo
+*.pyd
+.Python
+venv/
+.venv/
+
+# Data files (download separately per data/README.md)
+data/*.json
+data/*.csv
+
+# DAG visualisations are committed — ignore regenerated copies at root
+/ingest_dag.png
+/embed_dag.png
+/rag_dag.png
+
+# Neo4j
+*.dump
+
+# OS
+.DS_Store
+Thumbs.db
diff --git a/examples/LLM_Workflows/neo4j_graph_rag/README.md b/examples/LLM_Workflows/neo4j_graph_rag/README.md
new file mode 100644
index 000000000..848d8d504
--- /dev/null
+++ b/examples/LLM_Workflows/neo4j_graph_rag/README.md
@@ -0,0 +1,205 @@
+
+
+# Neo4j GraphRAG — TMDB Movies
+
+A full GraphRAG pipeline over a movie knowledge graph stored in Neo4j,
+built entirely with Apache Hamilton. Ingestion, embedding, and retrieval
+are each expressed as first-class Hamilton DAGs — dependencies declared
+through function signatures, execution graph built automatically.
+
+## Hamilton DAG visualisations
+
+Run `--visualise` on any mode to regenerate these from source without
+executing the pipeline.
+
+### Ingestion DAG
+
+```bash
+python run.py --mode ingest --visualise
+```
+
+
+
+Raw TMDB JSON flows through parsing nodes into batched Neo4j writes.
+Hamilton automatically parallelises the four independent branches
+(movies, genres, companies, person edges) from the shared `raw_movies`
+and `raw_credits` inputs.
+
+---
+
+### Embedding DAG
+
+```bash
+python run.py --mode embed --visualise
+```
+
+
+
+Movie texts are fetched from Neo4j, batched through the OpenAI embeddings
+API, written back to Movie nodes, and a cosine vector index is created.
+
+---
+
+### Retrieval + Generation DAG
+
+```bash
+python run.py --mode query --visualise
+```
+
+
+
+The full 13-node RAG pipeline. Hamilton wires all dependencies from
+function signatures — no manual orchestration:
+
+```
+user_query + openai_api_key + neo4j_driver
+ -> query_intent classify into VECTOR / CYPHER / AGGREGATE / HYBRID
+ -> entity_extraction extract persons, movies, genres, companies, filters
+ -> entity_resolution fuzzy-match each entity against the live graph
+ -> query_embedding embed query (VECTOR / HYBRID only)
+ -> vector_results cosine similarity search (VECTOR / HYBRID only)
+ -> cypher_query LLM generates Cypher from resolved entities
+ -> cypher_results execute Cypher against Neo4j
+ -> merged_results combine both retrieval paths
+ -> retrieved_context format as numbered plain-text records
+ -> system_prompt inject context into LLM system prompt
+ -> prompt_messages assemble message list
+ -> answer gpt-4o generates final answer
+```
+
+## What it demonstrates
+
+**Ingestion DAG** (`ingest_module.py`)
+Loads TMDB JSON, parses entities and relationships, writes to Neo4j via
+batched Cypher `MERGE`.
+
+**Embedding DAG** (`embed_module.py`)
+Computes OpenAI `text-embedding-3-small` embeddings over title + overview,
+writes vectors to Movie nodes, creates a Neo4j cosine vector index.
+
+**Retrieval DAG** (`retrieval_module.py`)
+Classifies each query into one of four strategies, resolves named entities
+against the graph to get canonical names, then executes retrieval:
+
+| Strategy | When used | How it retrieves |
+|-------------|----------------------------------|-----------------------------------------------|
+| `VECTOR` | Thematic / semantic queries | Cosine vector search over Movie embeddings |
+| `CYPHER` | Relational / factual queries | LLM-generated Cypher with resolved entities |
+| `AGGREGATE` | Counting / ranking queries | Aggregation Cypher with popularity guard |
+| `HYBRID` | Filtered + semantic queries | CYPHER + VECTOR, results merged |
+
+The semantic entity resolution layer looks up every extracted entity in
+Neo4j before generating Cypher, so "Warner Bros movies" always resolves
+to the canonical `"Warner Bros."` name in the graph.
+
+**Generation DAG** (`generation_module.py`)
+Formats retrieved records into a grounded system prompt and calls gpt-4o.
+
+## Knowledge graph schema
+
+```
+(:Movie {id, title, release_date, overview, popularity, vote_average})
+(:Person {id, name})
+(:Genre {name})
+(:ProductionCompany {id, name})
+
+(:Person)-[:ACTED_IN {order, character}]->(:Movie)
+(:Person)-[:DIRECTED]->(:Movie)
+(:Movie)-[:IN_GENRE]->(:Genre)
+(:Movie)-[:PRODUCED_BY]->(:ProductionCompany)
+```
+
+Dataset: 4,803 movies · 56,603 persons · 106,257 ACTED_IN · 5,166 DIRECTED · 20 genres · 5,047 companies
+
+## Prerequisites
+
+- Docker
+- Python 3.10+
+- OpenAI API key (`gpt-4o` access)
+- TMDB dataset (see `data/README.md`)
+
+## Setup
+
+### 1. Start Neo4j
+
+```bash
+docker compose up -d
+```
+
+Neo4j browser: http://localhost:7474 (user: `neo4j`, password: `password`)
+
+### 2. Install dependencies
+
+```bash
+python -m venv venv
+source venv/bin/activate
+pip install -r requirements.txt
+```
+
+### 3. Configure environment
+
+```bash
+cp .env.example .env
+# edit .env — add your OPENAI_API_KEY
+```
+
+### 4. Download the dataset
+
+Follow `data/README.md` to download and convert the TMDB dataset.
+
+## Running
+
+```bash
+# Step 1 — load graph (takes ~5 seconds)
+python run.py --mode ingest
+
+# Step 2 — compute and store embeddings (takes ~2 minutes)
+python run.py --mode embed
+
+# Step 3 — query
+python run.py --mode query --question "Who directed Inception?"
+python run.py --mode query --question "Which movies did Tom Hanks and Robin Wright appear in together?"
+python run.py --mode query --question "Which production company made the most action movies?"
+python run.py --mode query --question "Recommend movies similar to Inception"
+python run.py --mode query --question "Find me war films rated above 7.5"
+python run.py --mode query --question "Which actors appeared in both a Christopher Nolan and a Steven Spielberg film?"
+```
+
+## Project structure
+
+```
+neo4j_graph_rag/
+├── docker-compose.yml Neo4j 5 + APOC
+├── requirements.txt
+├── .env.example
+├── graph_schema.py Node/relationship definitions and Cypher constraints
+├── ingest_module.py Hamilton DAG: JSON -> Neo4j
+├── embed_module.py Hamilton DAG: Movie nodes -> embeddings -> vector index
+├── retrieval_module.py Hamilton DAG: query -> entity resolution -> retrieval
+├── generation_module.py Hamilton DAG: context + query -> gpt-4o -> answer
+├── run.py Entry point wiring all three pipelines
+├── docs/
+│ └── images/
+│ ├── ingest_dag.png
+│ ├── embed_dag.png
+│ └── rag_dag.png
+└── data/
+ └── README.md Dataset download and conversion instructions
+```
\ No newline at end of file
diff --git a/examples/LLM_Workflows/neo4j_graph_rag/data/README.md b/examples/LLM_Workflows/neo4j_graph_rag/data/README.md
new file mode 100644
index 000000000..4b99bb6cd
--- /dev/null
+++ b/examples/LLM_Workflows/neo4j_graph_rag/data/README.md
@@ -0,0 +1,57 @@
+
+
+# Data
+
+This example uses the [TMDB 5000 Movie Dataset](https://www.kaggle.com/datasets/tmdb/tmdb-movie-metadata) from Kaggle.
+
+## Download
+
+1. Create a free Kaggle account at https://www.kaggle.com
+2. Go to https://www.kaggle.com/datasets/tmdb/tmdb-movie-metadata
+3. Click **Download** and unzip the archive
+4. Place the following two files in this `data/` folder:
+
+```
+data/
+├── tmdb_5000_movies.json
+└── tmdb_5000_credits.json
+```
+
+## Note on file format
+
+The Kaggle archive ships the files as CSV (`tmdb_5000_movies.csv`, `tmdb_5000_credits.csv`).
+Several columns contain JSON strings (genres, cast, crew, production_companies).
+
+Convert them to JSON before running ingestion:
+
+```python
+import pandas as pd, json
+
+movies = pd.read_csv("tmdb_5000_movies.csv")
+credits = pd.read_csv("tmdb_5000_credits.csv")
+
+with open("tmdb_5000_movies.json", "w") as f:
+ json.dump(movies.to_dict(orient="records"), f)
+
+with open("tmdb_5000_credits.json", "w") as f:
+ json.dump(credits.to_dict(orient="records"), f)
+```
+
+Run this script once from inside the `data/` folder, then proceed with `python run.py --mode ingest`.
\ No newline at end of file
diff --git a/examples/LLM_Workflows/neo4j_graph_rag/data/data_refine.py b/examples/LLM_Workflows/neo4j_graph_rag/data/data_refine.py
new file mode 100644
index 000000000..d028de3f8
--- /dev/null
+++ b/examples/LLM_Workflows/neo4j_graph_rag/data/data_refine.py
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import pandas as pd, json
+
+movies = pd.read_csv("examples/LLM_Workflows/neo4j_graph_rag/data/tmdb_5000_movies.csv")
+credits = pd.read_csv("examples/LLM_Workflows/neo4j_graph_rag/data/tmdb_5000_credits.csv")
+
+with open("examples/LLM_Workflows/neo4j_graph_rag/data/tmdb_5000_movies.json", "w") as f:
+ json.dump(movies.to_dict(orient="records"), f)
+
+with open("examples/LLM_Workflows/neo4j_graph_rag/data/tmdb_5000_credits.json", "w") as f:
+ json.dump(credits.to_dict(orient="records"), f)
\ No newline at end of file
diff --git a/examples/LLM_Workflows/neo4j_graph_rag/docker-compose.yml b/examples/LLM_Workflows/neo4j_graph_rag/docker-compose.yml
new file mode 100644
index 000000000..b41df8671
--- /dev/null
+++ b/examples/LLM_Workflows/neo4j_graph_rag/docker-compose.yml
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+services:
+ neo4j:
+ image: neo4j:5.18.0
+ container_name: neo4j_graph_rag
+ ports:
+ - "7474:7474" # Browser UI
+ - "7687:7687" # Bolt protocol
+ environment:
+ - NEO4J_AUTH=neo4j/password
+ - NEO4J_PLUGINS=["apoc"]
+ - NEO4J_apoc_export_file_enabled=true
+ - NEO4J_apoc_import_file_enabled=true
+ - NEO4J_dbms_security_procedures_unrestricted=apoc.*
+ volumes:
+ - neo4j_data:/data
+ - neo4j_logs:/logs
+ healthcheck:
+ test: ["CMD", "cypher-shell", "-u", "neo4j", "-p", "password", "RETURN 1"]
+ interval: 10s
+ timeout: 5s
+ retries: 5
+
+volumes:
+ neo4j_data:
+ neo4j_logs:
\ No newline at end of file
diff --git a/examples/LLM_Workflows/neo4j_graph_rag/docs/images/embed_dag.png b/examples/LLM_Workflows/neo4j_graph_rag/docs/images/embed_dag.png
new file mode 100644
index 000000000..698bb0814
Binary files /dev/null and b/examples/LLM_Workflows/neo4j_graph_rag/docs/images/embed_dag.png differ
diff --git a/examples/LLM_Workflows/neo4j_graph_rag/docs/images/ingest_dag.png b/examples/LLM_Workflows/neo4j_graph_rag/docs/images/ingest_dag.png
new file mode 100644
index 000000000..9792655ad
Binary files /dev/null and b/examples/LLM_Workflows/neo4j_graph_rag/docs/images/ingest_dag.png differ
diff --git a/examples/LLM_Workflows/neo4j_graph_rag/docs/images/rag_dag.png b/examples/LLM_Workflows/neo4j_graph_rag/docs/images/rag_dag.png
new file mode 100644
index 000000000..b0bb7a0cc
Binary files /dev/null and b/examples/LLM_Workflows/neo4j_graph_rag/docs/images/rag_dag.png differ
diff --git a/examples/LLM_Workflows/neo4j_graph_rag/embed_module.py b/examples/LLM_Workflows/neo4j_graph_rag/embed_module.py
new file mode 100644
index 000000000..2dd5ecd3d
--- /dev/null
+++ b/examples/LLM_Workflows/neo4j_graph_rag/embed_module.py
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Hamilton embedding DAG.
+
+Fetches Movie nodes from Neo4j, computes OpenAI embeddings over
+title + overview text, writes embeddings back to each node, and
+creates a Neo4j vector index for use during retrieval.
+
+Run this once after ingestion and before querying.
+
+DAG flow:
+ neo4j_driver + openai_api_key
+ -> movie_texts
+ -> embedding_client
+ -> movie_embeddings
+ -> vector_index
+ -> embedding_summary
+"""
+
+import logging
+
+import openai
+from neo4j import Driver
+
+logger = logging.getLogger(__name__)
+
+# Shared constants — also imported by retrieval_module.py
+VECTOR_INDEX_NAME = "movie_embeddings"
+EMBEDDING_MODEL = "text-embedding-3-small"
+EMBEDDING_DIMENSIONS = 1536
+BATCH_SIZE = 100
+
+
+def movie_texts(neo4j_driver: Driver) -> list[dict]:
+ """
+ Fetch all Movie nodes and build embedding text from title + overview.
+ Returns list of dicts with keys: id, text.
+ """
+ query = """
+ MATCH (m:Movie)
+ WHERE m.title IS NOT NULL
+ RETURN m.id AS id,
+ m.title AS title,
+ coalesce(m.overview, '') AS overview
+ """
+ with neo4j_driver.session() as session:
+ rows = session.run(query).data()
+
+ texts = [
+ {
+ "id": row["id"],
+ "text": f"{row['title']}. {row['overview']}".strip(),
+ }
+ for row in rows
+ if row["id"] is not None
+ ]
+ logger.info("Fetched %d movie texts for embedding", len(texts))
+ return texts
+
+
+def embedding_client(openai_api_key: str) -> openai.OpenAI:
+ """Initialise the OpenAI client for embedding calls."""
+ return openai.OpenAI(api_key=openai_api_key)
+
+
+def movie_embeddings(
+ movie_texts: list[dict],
+ embedding_client: openai.OpenAI,
+) -> list[dict]:
+ """
+ Compute OpenAI embeddings for all movie texts in batches of BATCH_SIZE.
+ Returns list of dicts with keys: id, embedding.
+ """
+ results = []
+ total = len(movie_texts)
+
+ for i in range(0, total, BATCH_SIZE):
+ batch = movie_texts[i : i + BATCH_SIZE]
+ response = embedding_client.embeddings.create(
+ model=EMBEDDING_MODEL,
+ input=[item["text"] for item in batch],
+ )
+ for item, emb_obj in zip(batch, response.data):
+ results.append({"id": item["id"], "embedding": emb_obj.embedding})
+
+ logger.info("Embedded batch %d-%d of %d", i, min(i + BATCH_SIZE, total), total)
+
+ logger.info("Computed %d embeddings", len(results))
+ return results
+
+
+def vector_index(
+ movie_embeddings: list[dict],
+ neo4j_driver: Driver,
+) -> str:
+ """
+ Write embeddings to Movie nodes in Neo4j and create a cosine
+ vector index named VECTOR_INDEX_NAME over the embedding property.
+ Returns the index name.
+ """
+ write_query = """
+ UNWIND $batch AS row
+ MATCH (m:Movie {id: row.id})
+ SET m.embedding = row.embedding
+ """
+ total = len(movie_embeddings)
+ with neo4j_driver.session() as session:
+ for i in range(0, total, BATCH_SIZE):
+ batch = movie_embeddings[i : i + BATCH_SIZE]
+ session.run(write_query, {"batch": batch})
+ logger.info(
+ "Wrote embeddings to nodes %d-%d of %d",
+ i,
+ min(i + BATCH_SIZE, total),
+ total,
+ )
+
+ session.run(f"DROP INDEX {VECTOR_INDEX_NAME} IF EXISTS")
+ session.run(
+ f"""
+ CREATE VECTOR INDEX {VECTOR_INDEX_NAME}
+ FOR (m:Movie)
+ ON m.embedding
+ OPTIONS {{
+ indexConfig: {{
+ `vector.dimensions`: {EMBEDDING_DIMENSIONS},
+ `vector.similarity_function`: 'cosine'
+ }}
+ }}
+ """
+ )
+ logger.info("Created vector index '%s'", VECTOR_INDEX_NAME)
+
+ return VECTOR_INDEX_NAME
+
+
+def embedding_summary(
+ movie_embeddings: list[dict],
+ vector_index: str,
+) -> dict:
+ """
+ Collect embedding statistics and return a summary.
+ Terminal node of the embedding DAG.
+ """
+ summary = {
+ "embeddings_written": len(movie_embeddings),
+ "vector_index": vector_index,
+ "model": EMBEDDING_MODEL,
+ "dimensions": EMBEDDING_DIMENSIONS,
+ }
+ logger.info("Embedding complete: %s", summary)
+ return summary
\ No newline at end of file
diff --git a/examples/LLM_Workflows/neo4j_graph_rag/generation_module.py b/examples/LLM_Workflows/neo4j_graph_rag/generation_module.py
new file mode 100644
index 000000000..eee8b078d
--- /dev/null
+++ b/examples/LLM_Workflows/neo4j_graph_rag/generation_module.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Hamilton generation DAG.
+
+Takes the retrieved context from the retrieval DAG and the original
+user query, constructs a grounded system prompt, and calls gpt-4o
+to produce an answer.
+
+DAG flow:
+ retrieved_context + user_query + query_intent
+ -> system_prompt
+ -> prompt_messages
+ -> answer
+"""
+
+import logging
+
+import openai
+
+logger = logging.getLogger(__name__)
+
+SYSTEM_TEMPLATE = """You are a movie database assistant. \
+The numbered results below are pre-filtered database records that \
+directly answer the user's question. Each record is already the \
+correct answer — do not question whether it matches the query.
+
+Rules:
+- "Movie: X" means X is a film title. List it.
+- "Actor: X" means X appeared in the relevant films. List them.
+- "Director: X" means X is a director. State it.
+- The results are already filtered by the user's criteria \
+ (company, director, genre, etc.) — do not say the company or \
+ director is not specified.
+- When asked for a list, list every numbered record.
+- When asked yes/no, answer yes/no then cite the relevant record.
+- When a single record answers "highest/most/best", state it directly.
+- If there are genuinely no records, say so briefly.
+
+Retrieval strategy: {intent}
+
+Database records:
+{context}"""
+
+
+def system_prompt(
+ retrieved_context: str,
+ query_intent: str,
+) -> str:
+ """Build the system prompt by injecting retrieved context and intent metadata."""
+ return SYSTEM_TEMPLATE.format(
+ intent=query_intent,
+ context=retrieved_context,
+ )
+
+
+def prompt_messages(system_prompt: str, user_query: str) -> list[dict]:
+ """Assemble the message list for the OpenAI chat completion call."""
+ return [
+ {"role": "system", "content": system_prompt},
+ {"role": "user", "content": user_query},
+ ]
+
+
+def answer(prompt_messages: list[dict], openai_api_key: str) -> str:
+ """
+ Call gpt-4o with the assembled prompt and return the answer string.
+ Terminal node of the generation DAG.
+ """
+ client = openai.OpenAI(api_key=openai_api_key)
+ response = client.chat.completions.create(
+ model="gpt-4o",
+ messages=prompt_messages,
+ temperature=0.0,
+ )
+ result = response.choices[0].message.content
+ logger.info("Generated answer (%d chars)", len(result))
+ return result
\ No newline at end of file
diff --git a/examples/LLM_Workflows/neo4j_graph_rag/graph_schema.py b/examples/LLM_Workflows/neo4j_graph_rag/graph_schema.py
new file mode 100644
index 000000000..a50d82692
--- /dev/null
+++ b/examples/LLM_Workflows/neo4j_graph_rag/graph_schema.py
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Static schema definitions for the TMDB movie knowledge graph.
+
+Nodes
+-----
+Movie : id, title, release_date, overview, popularity, vote_average
+Person : id, name
+Genre : name
+ProductionCompany : id, name
+
+Relationships
+-------------
+(:Person)-[:ACTED_IN {order: int, character: str}]->(:Movie)
+(:Person)-[:DIRECTED]->(:Movie)
+(:Movie)-[:IN_GENRE]->(:Genre)
+(:Movie)-[:PRODUCED_BY]->(:ProductionCompany)
+
+Constraints (applied at ingestion time via run.py)
+-----------
+UNIQUE Movie.id
+UNIQUE Person.id
+UNIQUE Genre.name
+UNIQUE ProductionCompany.id
+"""
+
+# Node labels
+NODE_MOVIE = "Movie"
+NODE_PERSON = "Person"
+NODE_GENRE = "Genre"
+NODE_COMPANY = "ProductionCompany"
+
+# Relationship types
+REL_ACTED_IN = "ACTED_IN"
+REL_DIRECTED = "DIRECTED"
+REL_IN_GENRE = "IN_GENRE"
+REL_PRODUCED_BY = "PRODUCED_BY"
+
+# Properties stored per node type
+NODE_PROPERTIES = {
+ NODE_MOVIE: ["id", "title", "release_date", "overview", "popularity", "vote_average"],
+ NODE_PERSON: ["id", "name"],
+ NODE_GENRE: ["name"],
+ NODE_COMPANY: ["id", "name"],
+}
+
+# Properties stored per relationship type
+REL_PROPERTIES = {
+ REL_ACTED_IN: ["order", "character"],
+ REL_DIRECTED: [],
+ REL_IN_GENRE: [],
+ REL_PRODUCED_BY: [],
+}
+
+# Connectivity map src -> rel -> dest
+CONNECTIVITY = [
+ (NODE_PERSON, REL_ACTED_IN, NODE_MOVIE),
+ (NODE_PERSON, REL_DIRECTED, NODE_MOVIE),
+ (NODE_MOVIE, REL_IN_GENRE, NODE_GENRE),
+ (NODE_MOVIE, REL_PRODUCED_BY, NODE_COMPANY),
+]
+
+# Cypher constraints to create before ingestion
+CONSTRAINTS = [
+ f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{NODE_MOVIE}) REQUIRE n.id IS UNIQUE",
+ f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{NODE_PERSON}) REQUIRE n.id IS UNIQUE",
+ f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{NODE_GENRE}) REQUIRE n.name IS UNIQUE",
+ f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{NODE_COMPANY}) REQUIRE n.id IS UNIQUE",
+]
+
+
+def schema_to_prompt() -> str:
+ """
+ Returns a natural-language description of the graph schema
+ for use in LLM system prompts.
+ """
+ lines = ["The knowledge graph contains the following node types:\n"]
+
+ for label, props in NODE_PROPERTIES.items():
+ if props:
+ lines.append(f" {label}: {', '.join(props)}")
+ else:
+ lines.append(f" {label}: (no properties)")
+
+ lines.append("\nThe knowledge graph contains the following relationship types:\n")
+
+ for src, rel, dest in CONNECTIVITY:
+ props = REL_PROPERTIES.get(rel, [])
+ prop_str = f" with properties: {', '.join(props)}" if props else ""
+ lines.append(f" (:{src})-[:{rel}]->(:{dest}){prop_str}")
+
+ return "\n".join(lines)
\ No newline at end of file
diff --git a/examples/LLM_Workflows/neo4j_graph_rag/ingest_module.py b/examples/LLM_Workflows/neo4j_graph_rag/ingest_module.py
new file mode 100644
index 000000000..ac9de7686
--- /dev/null
+++ b/examples/LLM_Workflows/neo4j_graph_rag/ingest_module.py
@@ -0,0 +1,326 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Hamilton ingestion DAG for the TMDB movie dataset.
+
+Input files (place in data/):
+ - tmdb_5000_movies.json : movie metadata, genres, production companies
+ - tmdb_5000_credits.json : cast and crew per movie
+
+The DAG produces a fully populated Neo4j knowledge graph with the schema
+defined in graph_schema.py.
+
+Node types : Movie, Person, Genre, ProductionCompany
+Relationships: ACTED_IN, DIRECTED, IN_GENRE, PRODUCED_BY
+
+DAG flow:
+ movies_path -> raw_movies -> parsed_movies -> write_movie_nodes
+ -> genre_records -> write_genre_nodes_and_edges
+ -> company_records -> write_company_nodes_and_edges
+ credits_path -> raw_credits -> parsed_credits -> write_person_nodes_and_edges
+ all writes -> ingestion_summary
+"""
+
+import json
+import logging
+
+from neo4j import Driver
+
+logger = logging.getLogger(__name__)
+
+
+# ---------------------------------------------------------------------------
+# 1. Load raw data
+# ---------------------------------------------------------------------------
+
+
+def raw_movies(movies_path: str) -> list[dict]:
+ """Load raw movie records from the TMDB movies JSON file."""
+ with open(movies_path) as f:
+ data = json.load(f)
+ logger.info("Loaded %d raw movie records", len(data))
+ return data
+
+
+def raw_credits(credits_path: str) -> list[dict]:
+ """Load raw credit records from the TMDB credits JSON file."""
+ with open(credits_path) as f:
+ data = json.load(f)
+ logger.info("Loaded %d raw credit records", len(data))
+ return data
+
+
+# ---------------------------------------------------------------------------
+# 2. Parse movies
+# ---------------------------------------------------------------------------
+
+
+def parsed_movies(raw_movies: list[dict]) -> list[dict]:
+ """
+ Extract clean Movie node properties from raw records.
+ Drops records missing id or title.
+ """
+ movies = []
+ for m in raw_movies:
+ if not m.get("id") or not m.get("title"):
+ continue
+ movies.append(
+ {
+ "id": int(m["id"]),
+ "title": str(m["title"]),
+ "release_date": str(m.get("release_date", "")),
+ "overview": str(m.get("overview", "")),
+ "popularity": float(m.get("popularity", 0.0)),
+ "vote_average": float(m.get("vote_average", 0.0)),
+ }
+ )
+ logger.info("Parsed %d movies", len(movies))
+ return movies
+
+
+def genre_records(raw_movies: list[dict]) -> list[dict]:
+ """
+ Extract (movie_id, genre_name) pairs for IN_GENRE relationships.
+ Handles both pre-parsed lists and JSON-string encoded genres.
+ """
+ records = []
+ for m in raw_movies:
+ movie_id = m.get("id")
+ if not movie_id:
+ continue
+ genres = m.get("genres", [])
+ if isinstance(genres, str):
+ try:
+ genres = json.loads(genres)
+ except json.JSONDecodeError:
+ continue
+ for g in genres:
+ if g.get("name"):
+ records.append({"movie_id": int(movie_id), "genre_name": str(g["name"])})
+ logger.info("Extracted %d genre relationships", len(records))
+ return records
+
+
+def company_records(raw_movies: list[dict]) -> list[dict]:
+ """
+ Extract (movie_id, company_id, company_name) pairs for PRODUCED_BY relationships.
+ Handles both pre-parsed lists and JSON-string encoded production_companies.
+ """
+ records = []
+ for m in raw_movies:
+ movie_id = m.get("id")
+ if not movie_id:
+ continue
+ companies = m.get("production_companies", [])
+ if isinstance(companies, str):
+ try:
+ companies = json.loads(companies)
+ except json.JSONDecodeError:
+ continue
+ for c in companies:
+ if c.get("id") and c.get("name"):
+ records.append(
+ {
+ "movie_id": int(movie_id),
+ "company_id": int(c["id"]),
+ "company_name": str(c["name"]),
+ }
+ )
+ logger.info("Extracted %d production company relationships", len(records))
+ return records
+
+
+# ---------------------------------------------------------------------------
+# 3. Parse credits
+# ---------------------------------------------------------------------------
+
+
+def parsed_credits(raw_credits: list[dict]) -> list[dict]:
+ """
+ Parse credits into a flat list of records containing movie_id,
+ person details, and role (cast or director only from crew).
+ Handles both pre-parsed lists and JSON-string encoded cast/crew fields.
+ """
+ records = []
+ for c in raw_credits:
+ movie_id = c.get("id") or c.get("movie_id")
+ if not movie_id:
+ continue
+
+ cast = c.get("cast", [])
+ if isinstance(cast, str):
+ try:
+ cast = json.loads(cast)
+ except json.JSONDecodeError:
+ cast = []
+ for member in cast:
+ if member.get("id") and member.get("name"):
+ records.append(
+ {
+ "movie_id": int(movie_id),
+ "person_id": int(member["id"]),
+ "person_name": str(member["name"]),
+ "role": "cast",
+ "character": str(member.get("character", "")),
+ "order": int(member.get("order", 999)),
+ }
+ )
+
+ crew = c.get("crew", [])
+ if isinstance(crew, str):
+ try:
+ crew = json.loads(crew)
+ except json.JSONDecodeError:
+ crew = []
+ for member in crew:
+ if member.get("job") == "Director" and member.get("id") and member.get("name"):
+ records.append(
+ {
+ "movie_id": int(movie_id),
+ "person_id": int(member["id"]),
+ "person_name": str(member["name"]),
+ "role": "director",
+ "character": "",
+ "order": -1,
+ }
+ )
+
+ logger.info("Parsed %d credit records", len(records))
+ return records
+
+
+# ---------------------------------------------------------------------------
+# 4. Write to Neo4j
+# ---------------------------------------------------------------------------
+
+
+def _run_batch(session, query: str, batch: list[dict]) -> int:
+ """Execute a parameterised Cypher UNWIND query for a batch of records."""
+ session.run(query, {"batch": batch})
+ return len(batch)
+
+
+def write_movie_nodes(parsed_movies: list[dict], neo4j_driver: Driver) -> int:
+ """MERGE Movie nodes into Neo4j. Returns number of movies written."""
+ query = """
+ UNWIND $batch AS row
+ MERGE (m:Movie {id: row.id})
+ SET m.title = row.title,
+ m.release_date = row.release_date,
+ m.overview = row.overview,
+ m.popularity = row.popularity,
+ m.vote_average = row.vote_average
+ """
+ with neo4j_driver.session() as session:
+ count = _run_batch(session, query, parsed_movies)
+ logger.info("Wrote %d Movie nodes", count)
+ return count
+
+
+def write_genre_nodes_and_edges(genre_records: list[dict], neo4j_driver: Driver) -> int:
+ """MERGE Genre nodes and IN_GENRE relationships. Returns number of edges written."""
+ query = """
+ UNWIND $batch AS row
+ MERGE (g:Genre {name: row.genre_name})
+ WITH g, row
+ MATCH (m:Movie {id: row.movie_id})
+ MERGE (m)-[:IN_GENRE]->(g)
+ """
+ with neo4j_driver.session() as session:
+ count = _run_batch(session, query, genre_records)
+ logger.info("Wrote %d IN_GENRE edges", count)
+ return count
+
+
+def write_company_nodes_and_edges(company_records: list[dict], neo4j_driver: Driver) -> int:
+ """MERGE ProductionCompany nodes and PRODUCED_BY relationships."""
+ query = """
+ UNWIND $batch AS row
+ MERGE (c:ProductionCompany {id: row.company_id})
+ SET c.name = row.company_name
+ WITH c, row
+ MATCH (m:Movie {id: row.movie_id})
+ MERGE (m)-[:PRODUCED_BY]->(c)
+ """
+ with neo4j_driver.session() as session:
+ count = _run_batch(session, query, company_records)
+ logger.info("Wrote %d PRODUCED_BY edges", count)
+ return count
+
+
+def write_person_nodes_and_edges(parsed_credits: list[dict], neo4j_driver: Driver) -> int:
+ """
+ MERGE Person nodes and ACTED_IN / DIRECTED relationships.
+ Returns total number of relationships written.
+ """
+ acted_in_query = """
+ UNWIND $batch AS row
+ MERGE (p:Person {id: row.person_id})
+ SET p.name = row.person_name
+ WITH p, row
+ MATCH (m:Movie {id: row.movie_id})
+ MERGE (p)-[r:ACTED_IN {order: row.order}]->(m)
+ SET r.character = row.character
+ """
+ directed_query = """
+ UNWIND $batch AS row
+ MERGE (p:Person {id: row.person_id})
+ SET p.name = row.person_name
+ WITH p, row
+ MATCH (m:Movie {id: row.movie_id})
+ MERGE (p)-[:DIRECTED]->(m)
+ """
+ cast_records = [r for r in parsed_credits if r["role"] == "cast"]
+ director_records = [r for r in parsed_credits if r["role"] == "director"]
+
+ with neo4j_driver.session() as session:
+ _run_batch(session, acted_in_query, cast_records)
+ _run_batch(session, directed_query, director_records)
+
+ total = len(cast_records) + len(director_records)
+ logger.info(
+ "Wrote %d ACTED_IN and %d DIRECTED edges",
+ len(cast_records),
+ len(director_records),
+ )
+ return total
+
+
+# ---------------------------------------------------------------------------
+# 5. Terminal node
+# ---------------------------------------------------------------------------
+
+
+def ingestion_summary(
+ write_movie_nodes: int,
+ write_genre_nodes_and_edges: int,
+ write_company_nodes_and_edges: int,
+ write_person_nodes_and_edges: int,
+) -> dict:
+ """
+ Collect write counts from all upstream nodes and return a summary.
+ Terminal node of the ingestion DAG.
+ """
+ summary = {
+ "movies": write_movie_nodes,
+ "genre_edges": write_genre_nodes_and_edges,
+ "company_edges": write_company_nodes_and_edges,
+ "person_edges": write_person_nodes_and_edges,
+ }
+ logger.info("Ingestion complete: %s", summary)
+ return summary
\ No newline at end of file
diff --git a/examples/LLM_Workflows/neo4j_graph_rag/requirements.txt b/examples/LLM_Workflows/neo4j_graph_rag/requirements.txt
new file mode 100644
index 000000000..e1e08da02
--- /dev/null
+++ b/examples/LLM_Workflows/neo4j_graph_rag/requirements.txt
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+sf-hamilton>=1.73.0
+neo4j>=5.18.0
+openai>=1.30.0
+pandas>=2.0.0
+python-dotenv>=1.0.0
+tqdm>=4.66.0
diff --git a/examples/LLM_Workflows/neo4j_graph_rag/retrieval_module.py b/examples/LLM_Workflows/neo4j_graph_rag/retrieval_module.py
new file mode 100644
index 000000000..3c045d343
--- /dev/null
+++ b/examples/LLM_Workflows/neo4j_graph_rag/retrieval_module.py
@@ -0,0 +1,789 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Hamilton retrieval DAG — semantic entity resolution + multi-strategy GraphRAG.
+
+Pipeline:
+ 1. query_intent — classify into VECTOR / CYPHER / AGGREGATE / HYBRID
+ 2. entity_extraction — LLM extracts named entities from the query
+ 3. entity_resolution — each entity is looked up in Neo4j to get its
+ canonical form (exact name, genre spelling, date format)
+ 4. cypher_query — Cypher is generated using resolved entities, not raw text
+ 5. cypher_results — execute and return rows
+ 6. vector_results — semantic similarity search (VECTOR / HYBRID only)
+ 7. merged_results — combine both paths
+ 8. retrieved_context — format for generation DAG
+"""
+
+import json
+import logging
+
+import openai
+from neo4j import Driver
+
+from embed_module import EMBEDDING_MODEL, VECTOR_INDEX_NAME
+from graph_schema import schema_to_prompt
+
+logger = logging.getLogger(__name__)
+
+TOP_K = 8
+MAX_CAST = 8
+MAX_RETRIES = 2
+SCHEMA_PROMPT = schema_to_prompt()
+
+# ---------------------------------------------------------------------------
+# Cypher examples used in generation prompt
+# ---------------------------------------------------------------------------
+
+CYPHER_EXAMPLES = """
+-- Direct lookup (always return person/movie names explicitly)
+MATCH (d:Person)-[:DIRECTED]->(m:Movie)
+WHERE toLower(m.title) = toLower('Inception')
+RETURN m.title AS movie, d.name AS director, m.release_date, m.vote_average
+
+-- Actor filmography (always alias person name)
+MATCH (p:Person)-[:ACTED_IN]->(m:Movie)
+WHERE p.name = 'Tom Hanks'
+RETURN p.name AS actor, m.title AS movie, m.release_date
+ORDER BY m.release_date DESC
+LIMIT 20
+
+-- Director filmography
+MATCH (p:Person)-[:DIRECTED]->(m:Movie)
+WHERE p.name = 'Christopher Nolan'
+RETURN p.name AS director, m.title AS movie, m.release_date, m.vote_average
+ORDER BY m.release_date
+
+-- Co-occurrence (use exact resolved names)
+MATCH (a:Person {name: 'Tom Hanks'})-[:ACTED_IN]->(m:Movie)<-[:ACTED_IN]-(b:Person {name: 'Robin Wright'})
+RETURN m.title AS movie, m.release_date
+
+-- Aggregation with minimum count to avoid single-film outliers
+MATCH (d:Person)-[:DIRECTED]->(m:Movie)
+WITH d, avg(m.vote_average) AS avg_rating, count(m) AS film_count
+WHERE film_count >= 3
+RETURN d.name AS director, avg_rating, film_count
+ORDER BY avg_rating DESC
+LIMIT 10
+
+-- Production company (use exact resolved name)
+MATCH (c:ProductionCompany {name: 'Warner Bros. Pictures'})<-[:PRODUCED_BY]-(m:Movie)
+RETURN m.title AS movie, m.release_date
+ORDER BY m.release_date DESC
+LIMIT 20
+
+-- Genre filter (use exact resolved genre name, string date comparison)
+MATCH (m:Movie)-[:IN_GENRE]->(g:Genre {name: 'Comedy'})
+WHERE m.vote_average > 7
+ AND m.release_date > '2010-12-31'
+RETURN m.title AS movie, m.release_date, m.vote_average
+ORDER BY m.vote_average DESC
+LIMIT 20
+
+-- Multi-hop: always include connecting film titles for context
+MATCH (a:Person)-[:ACTED_IN]->(m1:Movie)<-[:DIRECTED]-(d1:Person {name: 'Christopher Nolan'})
+ ,(a)-[:ACTED_IN]->(m2:Movie)<-[:DIRECTED]-(d2:Person {name: 'Steven Spielberg'})
+RETURN DISTINCT a.name AS actor, m1.title AS nolan_film, m2.title AS spielberg_film
+LIMIT 20
+
+-- Hybrid: director + genre + rating
+MATCH (d:Person {name: 'James Cameron'})-[:DIRECTED]->(m:Movie)-[:IN_GENRE]->(g:Genre {name: 'Science Fiction'})
+RETURN m.title AS movie, m.vote_average, d.name AS director
+ORDER BY m.vote_average DESC
+LIMIT 20
+
+-- Highest rated overall or by genre (ALWAYS add popularity > 5 to exclude obscure films)
+MATCH (m:Movie)-[:IN_GENRE]->(g:Genre {name: 'Drama'})
+WHERE m.popularity > 5
+RETURN m.title AS movie, m.vote_average
+ORDER BY m.vote_average DESC
+LIMIT 1
+
+-- Highest rated film overall (ALWAYS add popularity > 5)
+MATCH (m:Movie)
+WHERE m.popularity > 5
+RETURN m.title AS movie, m.vote_average
+ORDER BY m.vote_average DESC
+LIMIT 1
+"""
+
+
+# ---------------------------------------------------------------------------
+# 1. Classify query intent
+# ---------------------------------------------------------------------------
+
+def query_intent(user_query: str, openai_api_key: str) -> str:
+ """
+ Classify the user query into one of four retrieval strategies:
+ VECTOR — thematic/semantic (recommend, find similar, what movies about X)
+ CYPHER — relational facts (who directed X, filmography, co-stars, direct lookup)
+ AGGREGATE — counting/ranking across the dataset
+ HYBRID — needs both graph facts AND semantic similarity / filtering
+ """
+ system = """You are a query classifier for a movie knowledge graph.
+Classify the user query into exactly one of these retrieval strategies:
+
+VECTOR — thematic or semantic: recommendations, "find movies like X",
+ "movies about space", "psychological thrillers"
+CYPHER — relational or factual: "who directed X", "what movies did actor Y appear in",
+ "did A and B appear together", "movies by director Z",
+ "what year was X released", "who starred in X"
+AGGREGATE — counting, ranking, aggregation: "which company made the most action movies",
+ "highest rated drama", "director with highest average rating",
+ "how many movies did X direct"
+HYBRID — needs both graph traversal AND filtering/rating:
+ "highest rated sci-fi films by James Cameron",
+ "Tom Hanks movies rated above 7.5",
+ "comedy films after 2010 rated above 7"
+
+Reply with ONLY one word: VECTOR, CYPHER, AGGREGATE, or HYBRID."""
+
+ client = openai.OpenAI(api_key=openai_api_key)
+ response = client.chat.completions.create(
+ model="gpt-4o",
+ messages=[
+ {"role": "system", "content": system},
+ {"role": "user", "content": user_query},
+ ],
+ temperature=0.0,
+ max_tokens=10,
+ )
+ intent = response.choices[0].message.content.strip().upper()
+ if intent not in ("VECTOR", "CYPHER", "AGGREGATE", "HYBRID"):
+ logger.warning("Unexpected intent '%s', defaulting to HYBRID", intent)
+ intent = "HYBRID"
+ logger.info("Query intent: %s", intent)
+ return intent
+
+
+# ---------------------------------------------------------------------------
+# 2. Entity extraction
+# ---------------------------------------------------------------------------
+
+def entity_extraction(
+ user_query: str,
+ openai_api_key: str,
+ query_intent: str,
+) -> dict:
+ """
+ Extract named entities from the user query as a structured dict.
+ Returns empty dict for VECTOR queries (no entity resolution needed).
+
+ Entity types extracted:
+ movies : list of movie titles mentioned
+ persons : list of person names (actors, directors)
+ genres : list of genres mentioned
+ companies : list of production company names mentioned
+ year_after : year string if query filters by "after YYYY"
+ year_before : year string if query filters by "before YYYY"
+ rating_above: float if query filters by minimum rating
+ rating_below: float if query filters by maximum rating
+ """
+ if query_intent == "VECTOR":
+ logger.info("Skipping entity extraction for VECTOR intent")
+ return {}
+
+ system = """You are an entity extractor for a movie knowledge graph query system.
+
+Extract all named entities from the user query and return a JSON object with these keys
+(omit keys that are not present in the query):
+
+{
+ "movies": ["title1", "title2"],
+ "persons": ["name1", "name2"],
+ "genres": ["genre1"],
+ "companies": ["company name"],
+ "year_after": "YYYY",
+ "year_before": "YYYY",
+ "rating_above": 7.5,
+ "rating_below": 8.0
+}
+
+Rules:
+- Extract exactly what is in the query, do not normalise or correct spellings yet
+- If a name could be an actor or director, put it in "persons"
+- Return ONLY valid JSON, no explanation, no markdown fences"""
+
+ client = openai.OpenAI(api_key=openai_api_key)
+ response = client.chat.completions.create(
+ model="gpt-4o",
+ messages=[
+ {"role": "system", "content": system},
+ {"role": "user", "content": user_query},
+ ],
+ temperature=0.0,
+ max_tokens=300,
+ )
+ raw = response.choices[0].message.content.strip()
+ if raw.startswith("```"):
+ raw = "\n".join(raw.split("\n")[1:])
+ if raw.endswith("```"):
+ raw = "\n".join(raw.split("\n")[:-1])
+ try:
+ entities = json.loads(raw)
+ except json.JSONDecodeError:
+ logger.warning("Failed to parse entity extraction response: %s", raw)
+ entities = {}
+
+ logger.info("Extracted entities: %s", entities)
+ return entities
+
+
+# ---------------------------------------------------------------------------
+# 3. Entity resolution — look up canonical forms in Neo4j
+# ---------------------------------------------------------------------------
+
+def _resolve_persons(names: list[str], session) -> dict[str, str]:
+ """Fuzzy-match person names against the graph, return {input: canonical}."""
+ resolved = {}
+ for name in names:
+ result = session.run(
+ """
+ MATCH (p:Person)
+ WHERE toLower(p.name) CONTAINS toLower($name)
+ RETURN p.name AS name
+ ORDER BY size(p.name)
+ LIMIT 1
+ """,
+ {"name": name},
+ ).single()
+ if result:
+ resolved[name] = result["name"]
+ logger.info("Resolved person '%s' -> '%s'", name, result["name"])
+ else:
+ resolved[name] = name
+ logger.warning("Could not resolve person '%s', using as-is", name)
+ return resolved
+
+
+def _resolve_movies(titles: list[str], session) -> dict[str, str]:
+ """Fuzzy-match movie titles against the graph, return {input: canonical}."""
+ resolved = {}
+ for title in titles:
+ result = session.run(
+ """
+ MATCH (m:Movie)
+ WHERE toLower(m.title) CONTAINS toLower($title)
+ RETURN m.title AS title
+ ORDER BY size(m.title)
+ LIMIT 1
+ """,
+ {"title": title},
+ ).single()
+ if result:
+ resolved[title] = result["title"]
+ logger.info("Resolved movie '%s' -> '%s'", title, result["title"])
+ else:
+ resolved[title] = title
+ logger.warning("Could not resolve movie '%s', using as-is", title)
+ return resolved
+
+
+GENRE_ALIASES = {
+ "sci-fi": "Science Fiction",
+ "scifi": "Science Fiction",
+ "science fiction": "Science Fiction",
+ "rom-com": "Romance",
+ "romcom": "Romance",
+ "rom com": "Romance",
+ "action-adventure": "Action",
+ "documentary": "Documentary",
+ "doc": "Documentary",
+ "animated": "Animation",
+ "animation": "Animation",
+ "anime": "Animation",
+ "horror": "Horror",
+ "comedy": "Comedy",
+ "drama": "Drama",
+ "thriller": "Thriller",
+ "western": "Western",
+ "fantasy": "Fantasy",
+ "mystery": "Mystery",
+ "adventure": "Adventure",
+ "crime": "Crime",
+ "family": "Family",
+ "history": "History",
+ "music": "Music",
+ "romance": "Romance",
+ "war": "War",
+ "tv movie": "TV Movie",
+}
+
+
+def _resolve_genres(genres: list[str], session) -> dict[str, str]:
+ """
+ Resolve genre names to their canonical form in the graph.
+ First checks a local alias map for common abbreviations (e.g. sci-fi),
+ then falls back to fuzzy Neo4j lookup.
+ Returns {input: canonical}.
+ """
+ resolved = {}
+ for genre in genres:
+ # Check alias map first
+ alias = GENRE_ALIASES.get(genre.lower())
+ if alias:
+ resolved[genre] = alias
+ logger.info("Resolved genre '%s' -> '%s' (alias map)", genre, alias)
+ continue
+
+ # Fall back to fuzzy Neo4j lookup
+ result = session.run(
+ """
+ MATCH (g:Genre)
+ WHERE toLower(g.name) CONTAINS toLower($genre)
+ RETURN g.name AS name
+ LIMIT 1
+ """,
+ {"genre": genre},
+ ).single()
+ if result:
+ resolved[genre] = result["name"]
+ logger.info("Resolved genre '%s' -> '%s'", genre, result["name"])
+ else:
+ resolved[genre] = genre
+ logger.warning("Could not resolve genre '%s', using as-is", genre)
+ return resolved
+
+
+def _resolve_companies(companies: list[str], session) -> dict[str, str]:
+ """Fuzzy-match company names against the graph, return {input: canonical}."""
+ resolved = {}
+ for company in companies:
+ result = session.run(
+ """
+ MATCH (c:ProductionCompany)
+ WHERE toLower(c.name) CONTAINS toLower($company)
+ RETURN c.name AS name
+ ORDER BY size(c.name)
+ LIMIT 1
+ """,
+ {"company": company},
+ ).single()
+ if result:
+ resolved[company] = result["name"]
+ logger.info("Resolved company '%s' -> '%s'", company, result["name"])
+ else:
+ resolved[company] = company
+ logger.warning("Could not resolve company '%s', using as-is", company)
+ return resolved
+
+
+def entity_resolution(
+ entity_extraction: dict,
+ neo4j_driver: Driver,
+ query_intent: str,
+) -> dict:
+ """
+ Resolve each extracted entity to its canonical form in Neo4j.
+
+ Returns the same structure as entity_extraction but with values
+ replaced by their canonical graph names. Also normalises:
+ - year_after -> date string '>YYYY-01-01' for Cypher comparison
+ - year_before -> date string ' kept as floats
+ """
+ if query_intent == "VECTOR" or not entity_extraction:
+ return entity_extraction
+
+ resolved = {}
+
+ with neo4j_driver.session() as session:
+ if entity_extraction.get("persons"):
+ resolved["persons"] = _resolve_persons(
+ entity_extraction["persons"], session
+ )
+
+ if entity_extraction.get("movies"):
+ resolved["movies"] = _resolve_movies(
+ entity_extraction["movies"], session
+ )
+
+ if entity_extraction.get("genres"):
+ resolved["genres"] = _resolve_genres(
+ entity_extraction["genres"], session
+ )
+
+ if entity_extraction.get("companies"):
+ resolved["companies"] = _resolve_companies(
+ entity_extraction["companies"], session
+ )
+
+ # Pass through numeric/date filters unchanged
+ for key in ("year_after", "year_before", "rating_above", "rating_below"):
+ if key in entity_extraction:
+ resolved[key] = entity_extraction[key]
+
+ logger.info("Resolved entities: %s", resolved)
+ return resolved
+
+
+# ---------------------------------------------------------------------------
+# 4. Vector path
+# ---------------------------------------------------------------------------
+
+def query_embedding(
+ user_query: str,
+ openai_api_key: str,
+ query_intent: str,
+) -> list[float]:
+ """Embed the user query. Returns empty list for CYPHER/AGGREGATE."""
+ if query_intent not in ("VECTOR", "HYBRID"):
+ logger.info("Skipping embedding for intent: %s", query_intent)
+ return []
+
+ client = openai.OpenAI(api_key=openai_api_key)
+ response = client.embeddings.create(
+ model=EMBEDDING_MODEL,
+ input=user_query,
+ )
+ embedding = response.data[0].embedding
+ logger.info("Computed query embedding (%d dims)", len(embedding))
+ return embedding
+
+
+def vector_results(
+ query_embedding: list[float],
+ neo4j_driver: Driver,
+ query_intent: str,
+) -> list[dict]:
+ """Run vector similarity search. Returns empty list for CYPHER/AGGREGATE."""
+ if query_intent not in ("VECTOR", "HYBRID") or not query_embedding:
+ return []
+
+ cypher = """
+ CALL db.index.vector.queryNodes($index_name, $top_k, $query_vector)
+ YIELD node, score
+ RETURN
+ node.id AS id,
+ node.title AS title,
+ node.overview AS overview,
+ node.release_date AS release_date,
+ node.vote_average AS vote_average,
+ score
+ ORDER BY score DESC
+ """
+ with neo4j_driver.session() as session:
+ rows = session.run(
+ cypher,
+ {
+ "index_name": VECTOR_INDEX_NAME,
+ "top_k": TOP_K,
+ "query_vector": query_embedding,
+ },
+ ).data()
+
+ logger.info(
+ "Vector search: %d results (top score: %.4f)",
+ len(rows),
+ rows[0]["score"] if rows else 0.0,
+ )
+ return rows
+
+
+# ---------------------------------------------------------------------------
+# 5. Cypher generation using resolved entities
+# ---------------------------------------------------------------------------
+
+def _build_entity_context(resolved: dict) -> str:
+ """
+ Build a plain-English summary of resolved entities for the Cypher
+ generation prompt so the LLM uses exact canonical names.
+ """
+ if not resolved:
+ return ""
+
+ lines = ["Resolved entities to use in the query (use these EXACT values):"]
+
+ persons = resolved.get("persons", {})
+ if persons:
+ for original, canonical in persons.items():
+ lines.append(f' Person: "{canonical}"')
+
+ movies = resolved.get("movies", {})
+ if movies:
+ for original, canonical in movies.items():
+ lines.append(f' Movie title: "{canonical}"')
+
+ genres = resolved.get("genres", {})
+ if genres:
+ for original, canonical in genres.items():
+ lines.append(f' Genre: "{canonical}"')
+
+ companies = resolved.get("companies", {})
+ if companies:
+ for original, canonical in companies.items():
+ lines.append(f' ProductionCompany: "{canonical}"')
+
+ if "year_after" in resolved:
+ lines.append(f' Date filter: m.release_date > \'{resolved["year_after"]}-01-01\'')
+ if "year_before" in resolved:
+ lines.append(f' Date filter: m.release_date < \'{resolved["year_before"]}-12-31\'')
+ if "rating_above" in resolved:
+ lines.append(f' Rating filter: m.vote_average > {resolved["rating_above"]}')
+ if "rating_below" in resolved:
+ lines.append(f' Rating filter: m.vote_average < {resolved["rating_below"]}')
+
+ return "\n".join(lines)
+
+
+def _generate_cypher(
+ user_query: str,
+ entity_context: str,
+ client: openai.OpenAI,
+ hint: str = "",
+) -> str:
+ """Generate Cypher using the schema, examples, and resolved entity context."""
+ system = f"""You are an expert Neo4j Cypher query generator.
+
+Graph schema:
+{SCHEMA_PROMPT}
+
+Example queries:
+{CYPHER_EXAMPLES}
+
+{entity_context}
+
+Rules:
+- Use ONLY the exact entity names provided above — do not paraphrase or guess
+- Relationship directions: (Person)-[:DIRECTED]->(Movie), (Person)-[:ACTED_IN]->(Movie),
+ (Movie)-[:IN_GENRE]->(Genre), (Movie)-[:PRODUCED_BY]->(ProductionCompany)
+- Use exact match {{name: 'resolved name'}} when entity is resolved, CONTAINS only as fallback
+- Always alias RETURN fields with meaningful names: actor, director, movie, genre, company
+- For date comparisons use STRING comparison: m.release_date > '2010-12-31'
+ (release_date is stored as a string, NOT a Neo4j date type)
+- For aggregation ranking queries add: WITH ..., count(m) AS film_count WHERE film_count >= 3
+- For "highest rated" queries ALWAYS add WHERE m.popularity > 5 to exclude obscure low-vote films
+- LIMIT 20 unless counting
+- Return ONLY the Cypher query, no explanation, no markdown fences
+{hint}"""
+
+ response = client.chat.completions.create(
+ model="gpt-4o",
+ messages=[
+ {"role": "system", "content": system},
+ {"role": "user", "content": f"Generate a Cypher query for: {user_query}"},
+ ],
+ temperature=0.0,
+ max_tokens=500,
+ )
+ raw = response.choices[0].message.content.strip()
+ if raw.startswith("```"):
+ raw = "\n".join(raw.split("\n")[1:])
+ if raw.endswith("```"):
+ raw = "\n".join(raw.split("\n")[:-1])
+ return raw.strip()
+
+
+def _execute_cypher(cypher: str, driver: Driver) -> list[dict]:
+ """Execute Cypher and return results as list of dicts."""
+ try:
+ with driver.session() as session:
+ return session.run(cypher).data()
+ except Exception as e:
+ logger.warning("Cypher execution failed: %s", e)
+ return []
+
+
+def cypher_query(
+ user_query: str,
+ entity_resolution: dict,
+ openai_api_key: str,
+ query_intent: str,
+ neo4j_driver: Driver,
+) -> str:
+ """
+ Generate and validate a Cypher query using resolved entities.
+ Retries once with a relaxed hint if first attempt returns no results.
+ Returns the successful Cypher string or empty string.
+ """
+ if query_intent == "VECTOR":
+ return ""
+
+ client = openai.OpenAI(api_key=openai_api_key)
+ entity_context = _build_entity_context(entity_resolution)
+
+ for attempt in range(MAX_RETRIES):
+ hint = ""
+ if attempt == 1:
+ hint = (
+ "\nHINT: The previous query returned no results. "
+ "Double-check relationship directions. "
+ "Try broadening filters or using CONTAINS as a fallback for name matching."
+ )
+
+ cypher = _generate_cypher(user_query, entity_context, client, hint)
+ logger.info("Generated Cypher (attempt %d):\n%s", attempt + 1, cypher)
+
+ results = _execute_cypher(cypher, neo4j_driver)
+ if results:
+ logger.info("Cypher returned %d results", len(results))
+ return cypher
+
+ logger.warning("Cypher attempt %d returned no results", attempt + 1)
+
+ logger.error("All Cypher attempts failed for: %s", user_query)
+ return ""
+
+
+def cypher_results(
+ cypher_query: str,
+ neo4j_driver: Driver,
+) -> list[dict]:
+ """Execute the validated Cypher query and return results."""
+ if not cypher_query:
+ return []
+ results = _execute_cypher(cypher_query, neo4j_driver)
+ logger.info("Cypher results: %d rows", len(results))
+ return results
+
+
+# ---------------------------------------------------------------------------
+# 6. Enrich vector results with graph traversal
+# ---------------------------------------------------------------------------
+
+def _enrich_movie(movie_id: int, driver: Driver) -> dict | None:
+ """Pull directors, cast, genres, companies for a movie node."""
+ cypher = """
+ MATCH (m:Movie {id: $movie_id})
+ OPTIONAL MATCH (d:Person)-[:DIRECTED]->(m)
+ OPTIONAL MATCH (a:Person)-[r:ACTED_IN]->(m)
+ WITH m, d, a, r ORDER BY r.order ASC
+ OPTIONAL MATCH (m)-[:IN_GENRE]->(g:Genre)
+ OPTIONAL MATCH (m)-[:PRODUCED_BY]->(c:ProductionCompany)
+ RETURN
+ m.title AS title,
+ m.overview AS overview,
+ m.release_date AS release_date,
+ m.vote_average AS vote_average,
+ collect(DISTINCT d.name) AS directors,
+ collect(DISTINCT a.name)[0..$max_cast] AS cast,
+ collect(DISTINCT g.name) AS genres,
+ collect(DISTINCT c.name) AS companies
+ """
+ with driver.session() as session:
+ row = session.run(cypher, {"movie_id": movie_id, "max_cast": MAX_CAST}).single()
+ return dict(row) if row else None
+
+
+# ---------------------------------------------------------------------------
+# 7. Merge results
+# ---------------------------------------------------------------------------
+
+def merged_results(
+ vector_results: list[dict],
+ cypher_results: list[dict],
+ neo4j_driver: Driver,
+ query_intent: str,
+) -> list[dict]:
+ """Merge Cypher and vector results. Cypher results come first."""
+ final = []
+
+ if cypher_results:
+ final.extend({"_source": "cypher", **row} for row in cypher_results)
+
+ if vector_results:
+ seen_ids = set()
+ for hit in vector_results:
+ movie_id = hit.get("id")
+ if movie_id in seen_ids:
+ continue
+ seen_ids.add(movie_id)
+ enriched = _enrich_movie(movie_id, neo4j_driver)
+ if enriched:
+ enriched["_source"] = "vector"
+ enriched["_score"] = hit.get("score", 0.0)
+ final.append(enriched)
+
+ logger.info(
+ "Merged %d results (%d cypher, %d vector)",
+ len(final),
+ len(cypher_results),
+ len([r for r in final if r.get("_source") == "vector"]),
+ )
+ return final
+
+
+# ---------------------------------------------------------------------------
+# 8. Format context
+# ---------------------------------------------------------------------------
+
+def retrieved_context(merged_results: list[dict], query_intent: str) -> str:
+ """
+ Format merged results into plain-text context for the generation DAG.
+
+ Enriched movie records (from vector path) are formatted with full
+ metadata. Raw Cypher rows are formatted as numbered plain-English
+ lines with field labels so the LLM can read them directly.
+ """
+ if not merged_results:
+ return "No relevant information found in the knowledge graph for this query."
+
+ FIELD_LABELS = {
+ "movie": "Movie",
+ "director": "Director",
+ "actor": "Actor",
+ "genre": "Genre",
+ "company": "Production company",
+ "film_count": "Films",
+ "movie_count": "Count",
+ "action_movie_count": "Action movies",
+ "avg_rating": "Avg rating",
+ "average_rating": "Avg rating",
+ "vote_average": "Rating",
+ "release_date": "Released",
+ }
+
+ lines = []
+ i = 0
+
+ for row in merged_results:
+ i += 1
+ source = row.get("_source", "unknown")
+
+ if "directors" in row:
+ # Enriched movie record from vector path
+ directors = ", ".join(row.get("directors") or []) or "Unknown"
+ cast = ", ".join(row.get("cast") or []) or "Unknown"
+ genres = ", ".join(row.get("genres") or []) or "Unknown"
+ companies = ", ".join(row.get("companies") or []) or "Unknown"
+ release = (row.get("release_date") or "")[:4] or "N/A"
+ rating = row.get("vote_average", "N/A")
+ title = row.get("title", "Unknown")
+ overview = row.get("overview", "")
+ lines.append(f"{i}. {title} ({release}) — Rating: {rating}")
+ lines.append(f" Directed by: {directors}")
+ lines.append(f" Cast: {cast}")
+ lines.append(f" Genres: {genres}")
+ lines.append(f" Produced by: {companies}")
+ if overview:
+ lines.append(f" Overview: {overview}")
+ lines.append("")
+ else:
+ # Raw Cypher result row
+ clean = {k: v for k, v in row.items() if not k.startswith("_")}
+ parts = []
+ for k, v in clean.items():
+ label = FIELD_LABELS.get(k, k.replace("_", " ").capitalize())
+ if isinstance(v, float):
+ v = round(v, 3)
+ parts.append(f"{label}: {v}")
+ lines.append(f"{i}. " + " | ".join(parts))
+
+ context = "\n".join(lines)
+ logger.info("Formatted context: %d chars from %d results", len(context), len(merged_results))
+ return context
\ No newline at end of file
diff --git a/examples/LLM_Workflows/neo4j_graph_rag/run.py b/examples/LLM_Workflows/neo4j_graph_rag/run.py
new file mode 100644
index 000000000..ca995bd14
--- /dev/null
+++ b/examples/LLM_Workflows/neo4j_graph_rag/run.py
@@ -0,0 +1,242 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Entry point for the Neo4j GraphRAG example.
+
+Usage
+-----
+# Step 1 — ingest TMDB data into Neo4j
+python run.py --mode ingest
+
+# Step 2 — compute and store embeddings on Movie nodes
+python run.py --mode embed
+
+# Step 3 — ask questions against the populated graph
+python run.py --mode query --question "Who directed Inception?"
+
+# Visualise any DAG without executing
+python run.py --mode ingest --visualise
+python run.py --mode embed --visualise
+python run.py --mode query --visualise
+"""
+
+import argparse
+import logging
+import os
+from pathlib import Path
+
+from dotenv import load_dotenv
+from hamilton import driver
+from neo4j import GraphDatabase
+
+import embed_module
+import generation_module
+import ingest_module
+import retrieval_module
+from graph_schema import CONSTRAINTS
+
+load_dotenv()
+logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
+logger = logging.getLogger(__name__)
+
+
+# ---------------------------------------------------------------------------
+# helpers
+# ---------------------------------------------------------------------------
+
+
+def get_env(key: str) -> str:
+ val = os.getenv(key)
+ if not val:
+ raise EnvironmentError(f"Missing required environment variable: {key}")
+ return val
+
+
+def make_neo4j_driver():
+ uri = get_env("NEO4J_URI")
+ user = get_env("NEO4J_USERNAME")
+ password = get_env("NEO4J_PASSWORD")
+ drv = GraphDatabase.driver(uri, auth=(user, password))
+ drv.verify_connectivity()
+ logger.info("Connected to Neo4j at %s", uri)
+ return drv
+
+
+def apply_constraints(drv):
+ with drv.session() as session:
+ for constraint in CONSTRAINTS:
+ session.run(constraint)
+ logger.info("Applied %d constraints", len(CONSTRAINTS))
+
+
+# ---------------------------------------------------------------------------
+# ingest
+# ---------------------------------------------------------------------------
+
+
+def run_ingest(visualise: bool = False):
+ data_dir = Path(__file__).parent / "data"
+ movies_path = str(data_dir / "tmdb_5000_movies.json")
+ credits_path = str(data_dir / "tmdb_5000_credits.json")
+
+ for p in [movies_path, credits_path]:
+ if not Path(p).exists():
+ raise FileNotFoundError(
+ f"Missing data file: {p}\n"
+ "Download from https://www.kaggle.com/datasets/tmdb/tmdb-movie-metadata "
+ "and place both JSON files in the data/ folder.\n"
+ "See data/README.md for conversion instructions."
+ )
+
+ drv = make_neo4j_driver()
+ apply_constraints(drv)
+
+ ingest_driver = driver.Builder().with_modules(ingest_module).build()
+
+ if visualise:
+ ingest_driver.display_all_functions("ingest_dag.png")
+ logger.info("Saved ingest DAG visualisation to ingest_dag.png")
+ drv.close()
+ return
+
+ result = ingest_driver.execute(
+ ["ingestion_summary"],
+ inputs={
+ "movies_path": movies_path,
+ "credits_path": credits_path,
+ "neo4j_driver": drv,
+ },
+ )
+ s = result["ingestion_summary"]
+ logger.info(
+ "Ingestion complete — movies: %d, genre edges: %d, "
+ "company edges: %d, person edges: %d",
+ s["movies"], s["genre_edges"], s["company_edges"], s["person_edges"],
+ )
+ drv.close()
+
+
+# ---------------------------------------------------------------------------
+# embed
+# ---------------------------------------------------------------------------
+
+
+def run_embed(visualise: bool = False):
+ drv = make_neo4j_driver()
+ openai_api_key = get_env("OPENAI_API_KEY")
+
+ embed_driver = driver.Builder().with_modules(embed_module).build()
+
+ if visualise:
+ embed_driver.display_all_functions("embed_dag.png")
+ logger.info("Saved embed DAG visualisation to embed_dag.png")
+ drv.close()
+ return
+
+ result = embed_driver.execute(
+ ["embedding_summary"],
+ inputs={
+ "neo4j_driver": drv,
+ "openai_api_key": openai_api_key,
+ },
+ )
+ s = result["embedding_summary"]
+ logger.info(
+ "Embedding complete — %d embeddings written, index: %s, model: %s",
+ s["embeddings_written"], s["vector_index"], s["model"],
+ )
+ drv.close()
+
+
+# ---------------------------------------------------------------------------
+# query
+# ---------------------------------------------------------------------------
+
+
+def run_query(question: str, visualise: bool = False):
+ drv = make_neo4j_driver()
+ openai_api_key = get_env("OPENAI_API_KEY")
+
+ rag_driver = (
+ driver.Builder()
+ .with_modules(retrieval_module, generation_module)
+ .build()
+ )
+
+ if visualise:
+ rag_driver.display_all_functions("rag_dag.png")
+ logger.info("Saved RAG DAG visualisation to rag_dag.png")
+ drv.close()
+ return
+
+ result = rag_driver.execute(
+ ["answer"],
+ inputs={
+ "user_query": question,
+ "neo4j_driver": drv,
+ "openai_api_key": openai_api_key,
+ },
+ )
+ print("\n" + "=" * 60)
+ print(f"Q: {question}")
+ print("=" * 60)
+ print(f"A: {result['answer']}")
+ print("=" * 60 + "\n")
+ drv.close()
+
+
+# ---------------------------------------------------------------------------
+# main
+# ---------------------------------------------------------------------------
+
+
+def main():
+ parser = argparse.ArgumentParser(description="Neo4j GraphRAG — TMDB Movies")
+ parser.add_argument(
+ "--mode",
+ choices=["ingest", "embed", "query"],
+ required=True,
+ help=(
+ "ingest: load TMDB data into Neo4j | "
+ "embed: compute and store embeddings | "
+ "query: ask a question"
+ ),
+ )
+ parser.add_argument(
+ "--question",
+ type=str,
+ default="Which movies did Christopher Nolan direct?",
+ help="Question to ask (only used in query mode)",
+ )
+ parser.add_argument(
+ "--visualise",
+ action="store_true",
+ help="Save a PNG of the Hamilton DAG and exit without executing",
+ )
+ args = parser.parse_args()
+
+ if args.mode == "ingest":
+ run_ingest(visualise=args.visualise)
+ elif args.mode == "embed":
+ run_embed(visualise=args.visualise)
+ elif args.mode == "query":
+ run_query(question=args.question, visualise=args.visualise)
+
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file