diff --git a/docs/ecosystem/index.md b/docs/ecosystem/index.md index 789dcee6d..d13f48c5f 100644 --- a/docs/ecosystem/index.md +++ b/docs/ecosystem/index.md @@ -148,6 +148,7 @@ Persist and cache your data: | **Slack** | Notifications and integrations | [Examples](https://github.com/apache/hamilton/tree/main/examples/slack) \| [Lifecycle Hook](../reference/lifecycle-hooks/SlackNotifierHook.rst) | | **GeoPandas** | Geospatial data analysis | [Type extension](https://github.com/apache/hamilton/blob/main/hamilton/plugins/geopandas_extensions.py) for GeoDataFrame support | | **YAML** | Configuration management | [IO Adapters](../reference/io/available-data-adapters.rst) | +| **Neo4j** | Knowledge graph RAG | [Examples](https://github.com/apache/hamilton/tree/main/examples/LLM_Workflows/neo4j_graph_rag) | --- diff --git a/examples/LLM_Workflows/neo4j_graph_rag/.env.example b/examples/LLM_Workflows/neo4j_graph_rag/.env.example new file mode 100644 index 000000000..3d70e233e --- /dev/null +++ b/examples/LLM_Workflows/neo4j_graph_rag/.env.example @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# OpenAI +OPENAI_API_KEY=your-openai-api-key-here + +# Neo4j +NEO4J_URI=bolt://localhost:7687 +NEO4J_USERNAME=neo4j +NEO4J_PASSWORD=password +NEO4J_DATABASE=neo4j diff --git a/examples/LLM_Workflows/neo4j_graph_rag/.gitignore b/examples/LLM_Workflows/neo4j_graph_rag/.gitignore new file mode 100644 index 000000000..021d89f60 --- /dev/null +++ b/examples/LLM_Workflows/neo4j_graph_rag/.gitignore @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Environment +.env + +# Python +__pycache__/ +*.pyc +*.pyo +*.pyd +.Python +venv/ +.venv/ + +# Data files (download separately per data/README.md) +data/*.json +data/*.csv + +# DAG visualisations are committed — ignore regenerated copies at root +/ingest_dag.png +/embed_dag.png +/rag_dag.png + +# Neo4j +*.dump + +# OS +.DS_Store +Thumbs.db diff --git a/examples/LLM_Workflows/neo4j_graph_rag/README.md b/examples/LLM_Workflows/neo4j_graph_rag/README.md new file mode 100644 index 000000000..848d8d504 --- /dev/null +++ b/examples/LLM_Workflows/neo4j_graph_rag/README.md @@ -0,0 +1,205 @@ + + +# Neo4j GraphRAG — TMDB Movies + +A full GraphRAG pipeline over a movie knowledge graph stored in Neo4j, +built entirely with Apache Hamilton. Ingestion, embedding, and retrieval +are each expressed as first-class Hamilton DAGs — dependencies declared +through function signatures, execution graph built automatically. + +## Hamilton DAG visualisations + +Run `--visualise` on any mode to regenerate these from source without +executing the pipeline. + +### Ingestion DAG + +```bash +python run.py --mode ingest --visualise +``` + +![Ingestion DAG](https://raw.githubusercontent.com/apache/hamilton/examples/neo4j-graph-rag/examples/LLM_Workflows/neo4j_graph_rag/docs/images/ingest_dag.png) + +Raw TMDB JSON flows through parsing nodes into batched Neo4j writes. +Hamilton automatically parallelises the four independent branches +(movies, genres, companies, person edges) from the shared `raw_movies` +and `raw_credits` inputs. + +--- + +### Embedding DAG + +```bash +python run.py --mode embed --visualise +``` + +![Embedding DAG](https://raw.githubusercontent.com/apache/hamilton/examples/neo4j-graph-rag/examples/LLM_Workflows/neo4j_graph_rag/docs/images/embed_dag.png) + +Movie texts are fetched from Neo4j, batched through the OpenAI embeddings +API, written back to Movie nodes, and a cosine vector index is created. + +--- + +### Retrieval + Generation DAG + +```bash +python run.py --mode query --visualise +``` + +![RAG DAG](https://raw.githubusercontent.com/apache/hamilton/examples/neo4j-graph-rag/examples/LLM_Workflows/neo4j_graph_rag/docs/images/rag_dag.png) + +The full 13-node RAG pipeline. Hamilton wires all dependencies from +function signatures — no manual orchestration: + +``` +user_query + openai_api_key + neo4j_driver + -> query_intent classify into VECTOR / CYPHER / AGGREGATE / HYBRID + -> entity_extraction extract persons, movies, genres, companies, filters + -> entity_resolution fuzzy-match each entity against the live graph + -> query_embedding embed query (VECTOR / HYBRID only) + -> vector_results cosine similarity search (VECTOR / HYBRID only) + -> cypher_query LLM generates Cypher from resolved entities + -> cypher_results execute Cypher against Neo4j + -> merged_results combine both retrieval paths + -> retrieved_context format as numbered plain-text records + -> system_prompt inject context into LLM system prompt + -> prompt_messages assemble message list + -> answer gpt-4o generates final answer +``` + +## What it demonstrates + +**Ingestion DAG** (`ingest_module.py`) +Loads TMDB JSON, parses entities and relationships, writes to Neo4j via +batched Cypher `MERGE`. + +**Embedding DAG** (`embed_module.py`) +Computes OpenAI `text-embedding-3-small` embeddings over title + overview, +writes vectors to Movie nodes, creates a Neo4j cosine vector index. + +**Retrieval DAG** (`retrieval_module.py`) +Classifies each query into one of four strategies, resolves named entities +against the graph to get canonical names, then executes retrieval: + +| Strategy | When used | How it retrieves | +|-------------|----------------------------------|-----------------------------------------------| +| `VECTOR` | Thematic / semantic queries | Cosine vector search over Movie embeddings | +| `CYPHER` | Relational / factual queries | LLM-generated Cypher with resolved entities | +| `AGGREGATE` | Counting / ranking queries | Aggregation Cypher with popularity guard | +| `HYBRID` | Filtered + semantic queries | CYPHER + VECTOR, results merged | + +The semantic entity resolution layer looks up every extracted entity in +Neo4j before generating Cypher, so "Warner Bros movies" always resolves +to the canonical `"Warner Bros."` name in the graph. + +**Generation DAG** (`generation_module.py`) +Formats retrieved records into a grounded system prompt and calls gpt-4o. + +## Knowledge graph schema + +``` +(:Movie {id, title, release_date, overview, popularity, vote_average}) +(:Person {id, name}) +(:Genre {name}) +(:ProductionCompany {id, name}) + +(:Person)-[:ACTED_IN {order, character}]->(:Movie) +(:Person)-[:DIRECTED]->(:Movie) +(:Movie)-[:IN_GENRE]->(:Genre) +(:Movie)-[:PRODUCED_BY]->(:ProductionCompany) +``` + +Dataset: 4,803 movies · 56,603 persons · 106,257 ACTED_IN · 5,166 DIRECTED · 20 genres · 5,047 companies + +## Prerequisites + +- Docker +- Python 3.10+ +- OpenAI API key (`gpt-4o` access) +- TMDB dataset (see `data/README.md`) + +## Setup + +### 1. Start Neo4j + +```bash +docker compose up -d +``` + +Neo4j browser: http://localhost:7474 (user: `neo4j`, password: `password`) + +### 2. Install dependencies + +```bash +python -m venv venv +source venv/bin/activate +pip install -r requirements.txt +``` + +### 3. Configure environment + +```bash +cp .env.example .env +# edit .env — add your OPENAI_API_KEY +``` + +### 4. Download the dataset + +Follow `data/README.md` to download and convert the TMDB dataset. + +## Running + +```bash +# Step 1 — load graph (takes ~5 seconds) +python run.py --mode ingest + +# Step 2 — compute and store embeddings (takes ~2 minutes) +python run.py --mode embed + +# Step 3 — query +python run.py --mode query --question "Who directed Inception?" +python run.py --mode query --question "Which movies did Tom Hanks and Robin Wright appear in together?" +python run.py --mode query --question "Which production company made the most action movies?" +python run.py --mode query --question "Recommend movies similar to Inception" +python run.py --mode query --question "Find me war films rated above 7.5" +python run.py --mode query --question "Which actors appeared in both a Christopher Nolan and a Steven Spielberg film?" +``` + +## Project structure + +``` +neo4j_graph_rag/ +├── docker-compose.yml Neo4j 5 + APOC +├── requirements.txt +├── .env.example +├── graph_schema.py Node/relationship definitions and Cypher constraints +├── ingest_module.py Hamilton DAG: JSON -> Neo4j +├── embed_module.py Hamilton DAG: Movie nodes -> embeddings -> vector index +├── retrieval_module.py Hamilton DAG: query -> entity resolution -> retrieval +├── generation_module.py Hamilton DAG: context + query -> gpt-4o -> answer +├── run.py Entry point wiring all three pipelines +├── docs/ +│ └── images/ +│ ├── ingest_dag.png +│ ├── embed_dag.png +│ └── rag_dag.png +└── data/ + └── README.md Dataset download and conversion instructions +``` \ No newline at end of file diff --git a/examples/LLM_Workflows/neo4j_graph_rag/data/README.md b/examples/LLM_Workflows/neo4j_graph_rag/data/README.md new file mode 100644 index 000000000..4b99bb6cd --- /dev/null +++ b/examples/LLM_Workflows/neo4j_graph_rag/data/README.md @@ -0,0 +1,57 @@ + + +# Data + +This example uses the [TMDB 5000 Movie Dataset](https://www.kaggle.com/datasets/tmdb/tmdb-movie-metadata) from Kaggle. + +## Download + +1. Create a free Kaggle account at https://www.kaggle.com +2. Go to https://www.kaggle.com/datasets/tmdb/tmdb-movie-metadata +3. Click **Download** and unzip the archive +4. Place the following two files in this `data/` folder: + +``` +data/ +├── tmdb_5000_movies.json +└── tmdb_5000_credits.json +``` + +## Note on file format + +The Kaggle archive ships the files as CSV (`tmdb_5000_movies.csv`, `tmdb_5000_credits.csv`). +Several columns contain JSON strings (genres, cast, crew, production_companies). + +Convert them to JSON before running ingestion: + +```python +import pandas as pd, json + +movies = pd.read_csv("tmdb_5000_movies.csv") +credits = pd.read_csv("tmdb_5000_credits.csv") + +with open("tmdb_5000_movies.json", "w") as f: + json.dump(movies.to_dict(orient="records"), f) + +with open("tmdb_5000_credits.json", "w") as f: + json.dump(credits.to_dict(orient="records"), f) +``` + +Run this script once from inside the `data/` folder, then proceed with `python run.py --mode ingest`. \ No newline at end of file diff --git a/examples/LLM_Workflows/neo4j_graph_rag/data/data_refine.py b/examples/LLM_Workflows/neo4j_graph_rag/data/data_refine.py new file mode 100644 index 000000000..d028de3f8 --- /dev/null +++ b/examples/LLM_Workflows/neo4j_graph_rag/data/data_refine.py @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import pandas as pd, json + +movies = pd.read_csv("examples/LLM_Workflows/neo4j_graph_rag/data/tmdb_5000_movies.csv") +credits = pd.read_csv("examples/LLM_Workflows/neo4j_graph_rag/data/tmdb_5000_credits.csv") + +with open("examples/LLM_Workflows/neo4j_graph_rag/data/tmdb_5000_movies.json", "w") as f: + json.dump(movies.to_dict(orient="records"), f) + +with open("examples/LLM_Workflows/neo4j_graph_rag/data/tmdb_5000_credits.json", "w") as f: + json.dump(credits.to_dict(orient="records"), f) \ No newline at end of file diff --git a/examples/LLM_Workflows/neo4j_graph_rag/docker-compose.yml b/examples/LLM_Workflows/neo4j_graph_rag/docker-compose.yml new file mode 100644 index 000000000..b41df8671 --- /dev/null +++ b/examples/LLM_Workflows/neo4j_graph_rag/docker-compose.yml @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +services: + neo4j: + image: neo4j:5.18.0 + container_name: neo4j_graph_rag + ports: + - "7474:7474" # Browser UI + - "7687:7687" # Bolt protocol + environment: + - NEO4J_AUTH=neo4j/password + - NEO4J_PLUGINS=["apoc"] + - NEO4J_apoc_export_file_enabled=true + - NEO4J_apoc_import_file_enabled=true + - NEO4J_dbms_security_procedures_unrestricted=apoc.* + volumes: + - neo4j_data:/data + - neo4j_logs:/logs + healthcheck: + test: ["CMD", "cypher-shell", "-u", "neo4j", "-p", "password", "RETURN 1"] + interval: 10s + timeout: 5s + retries: 5 + +volumes: + neo4j_data: + neo4j_logs: \ No newline at end of file diff --git a/examples/LLM_Workflows/neo4j_graph_rag/docs/images/embed_dag.png b/examples/LLM_Workflows/neo4j_graph_rag/docs/images/embed_dag.png new file mode 100644 index 000000000..698bb0814 Binary files /dev/null and b/examples/LLM_Workflows/neo4j_graph_rag/docs/images/embed_dag.png differ diff --git a/examples/LLM_Workflows/neo4j_graph_rag/docs/images/ingest_dag.png b/examples/LLM_Workflows/neo4j_graph_rag/docs/images/ingest_dag.png new file mode 100644 index 000000000..9792655ad Binary files /dev/null and b/examples/LLM_Workflows/neo4j_graph_rag/docs/images/ingest_dag.png differ diff --git a/examples/LLM_Workflows/neo4j_graph_rag/docs/images/rag_dag.png b/examples/LLM_Workflows/neo4j_graph_rag/docs/images/rag_dag.png new file mode 100644 index 000000000..b0bb7a0cc Binary files /dev/null and b/examples/LLM_Workflows/neo4j_graph_rag/docs/images/rag_dag.png differ diff --git a/examples/LLM_Workflows/neo4j_graph_rag/embed_module.py b/examples/LLM_Workflows/neo4j_graph_rag/embed_module.py new file mode 100644 index 000000000..2dd5ecd3d --- /dev/null +++ b/examples/LLM_Workflows/neo4j_graph_rag/embed_module.py @@ -0,0 +1,168 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Hamilton embedding DAG. + +Fetches Movie nodes from Neo4j, computes OpenAI embeddings over +title + overview text, writes embeddings back to each node, and +creates a Neo4j vector index for use during retrieval. + +Run this once after ingestion and before querying. + +DAG flow: + neo4j_driver + openai_api_key + -> movie_texts + -> embedding_client + -> movie_embeddings + -> vector_index + -> embedding_summary +""" + +import logging + +import openai +from neo4j import Driver + +logger = logging.getLogger(__name__) + +# Shared constants — also imported by retrieval_module.py +VECTOR_INDEX_NAME = "movie_embeddings" +EMBEDDING_MODEL = "text-embedding-3-small" +EMBEDDING_DIMENSIONS = 1536 +BATCH_SIZE = 100 + + +def movie_texts(neo4j_driver: Driver) -> list[dict]: + """ + Fetch all Movie nodes and build embedding text from title + overview. + Returns list of dicts with keys: id, text. + """ + query = """ + MATCH (m:Movie) + WHERE m.title IS NOT NULL + RETURN m.id AS id, + m.title AS title, + coalesce(m.overview, '') AS overview + """ + with neo4j_driver.session() as session: + rows = session.run(query).data() + + texts = [ + { + "id": row["id"], + "text": f"{row['title']}. {row['overview']}".strip(), + } + for row in rows + if row["id"] is not None + ] + logger.info("Fetched %d movie texts for embedding", len(texts)) + return texts + + +def embedding_client(openai_api_key: str) -> openai.OpenAI: + """Initialise the OpenAI client for embedding calls.""" + return openai.OpenAI(api_key=openai_api_key) + + +def movie_embeddings( + movie_texts: list[dict], + embedding_client: openai.OpenAI, +) -> list[dict]: + """ + Compute OpenAI embeddings for all movie texts in batches of BATCH_SIZE. + Returns list of dicts with keys: id, embedding. + """ + results = [] + total = len(movie_texts) + + for i in range(0, total, BATCH_SIZE): + batch = movie_texts[i : i + BATCH_SIZE] + response = embedding_client.embeddings.create( + model=EMBEDDING_MODEL, + input=[item["text"] for item in batch], + ) + for item, emb_obj in zip(batch, response.data): + results.append({"id": item["id"], "embedding": emb_obj.embedding}) + + logger.info("Embedded batch %d-%d of %d", i, min(i + BATCH_SIZE, total), total) + + logger.info("Computed %d embeddings", len(results)) + return results + + +def vector_index( + movie_embeddings: list[dict], + neo4j_driver: Driver, +) -> str: + """ + Write embeddings to Movie nodes in Neo4j and create a cosine + vector index named VECTOR_INDEX_NAME over the embedding property. + Returns the index name. + """ + write_query = """ + UNWIND $batch AS row + MATCH (m:Movie {id: row.id}) + SET m.embedding = row.embedding + """ + total = len(movie_embeddings) + with neo4j_driver.session() as session: + for i in range(0, total, BATCH_SIZE): + batch = movie_embeddings[i : i + BATCH_SIZE] + session.run(write_query, {"batch": batch}) + logger.info( + "Wrote embeddings to nodes %d-%d of %d", + i, + min(i + BATCH_SIZE, total), + total, + ) + + session.run(f"DROP INDEX {VECTOR_INDEX_NAME} IF EXISTS") + session.run( + f""" + CREATE VECTOR INDEX {VECTOR_INDEX_NAME} + FOR (m:Movie) + ON m.embedding + OPTIONS {{ + indexConfig: {{ + `vector.dimensions`: {EMBEDDING_DIMENSIONS}, + `vector.similarity_function`: 'cosine' + }} + }} + """ + ) + logger.info("Created vector index '%s'", VECTOR_INDEX_NAME) + + return VECTOR_INDEX_NAME + + +def embedding_summary( + movie_embeddings: list[dict], + vector_index: str, +) -> dict: + """ + Collect embedding statistics and return a summary. + Terminal node of the embedding DAG. + """ + summary = { + "embeddings_written": len(movie_embeddings), + "vector_index": vector_index, + "model": EMBEDDING_MODEL, + "dimensions": EMBEDDING_DIMENSIONS, + } + logger.info("Embedding complete: %s", summary) + return summary \ No newline at end of file diff --git a/examples/LLM_Workflows/neo4j_graph_rag/generation_module.py b/examples/LLM_Workflows/neo4j_graph_rag/generation_module.py new file mode 100644 index 000000000..eee8b078d --- /dev/null +++ b/examples/LLM_Workflows/neo4j_graph_rag/generation_module.py @@ -0,0 +1,93 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Hamilton generation DAG. + +Takes the retrieved context from the retrieval DAG and the original +user query, constructs a grounded system prompt, and calls gpt-4o +to produce an answer. + +DAG flow: + retrieved_context + user_query + query_intent + -> system_prompt + -> prompt_messages + -> answer +""" + +import logging + +import openai + +logger = logging.getLogger(__name__) + +SYSTEM_TEMPLATE = """You are a movie database assistant. \ +The numbered results below are pre-filtered database records that \ +directly answer the user's question. Each record is already the \ +correct answer — do not question whether it matches the query. + +Rules: +- "Movie: X" means X is a film title. List it. +- "Actor: X" means X appeared in the relevant films. List them. +- "Director: X" means X is a director. State it. +- The results are already filtered by the user's criteria \ + (company, director, genre, etc.) — do not say the company or \ + director is not specified. +- When asked for a list, list every numbered record. +- When asked yes/no, answer yes/no then cite the relevant record. +- When a single record answers "highest/most/best", state it directly. +- If there are genuinely no records, say so briefly. + +Retrieval strategy: {intent} + +Database records: +{context}""" + + +def system_prompt( + retrieved_context: str, + query_intent: str, +) -> str: + """Build the system prompt by injecting retrieved context and intent metadata.""" + return SYSTEM_TEMPLATE.format( + intent=query_intent, + context=retrieved_context, + ) + + +def prompt_messages(system_prompt: str, user_query: str) -> list[dict]: + """Assemble the message list for the OpenAI chat completion call.""" + return [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_query}, + ] + + +def answer(prompt_messages: list[dict], openai_api_key: str) -> str: + """ + Call gpt-4o with the assembled prompt and return the answer string. + Terminal node of the generation DAG. + """ + client = openai.OpenAI(api_key=openai_api_key) + response = client.chat.completions.create( + model="gpt-4o", + messages=prompt_messages, + temperature=0.0, + ) + result = response.choices[0].message.content + logger.info("Generated answer (%d chars)", len(result)) + return result \ No newline at end of file diff --git a/examples/LLM_Workflows/neo4j_graph_rag/graph_schema.py b/examples/LLM_Workflows/neo4j_graph_rag/graph_schema.py new file mode 100644 index 000000000..a50d82692 --- /dev/null +++ b/examples/LLM_Workflows/neo4j_graph_rag/graph_schema.py @@ -0,0 +1,108 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Static schema definitions for the TMDB movie knowledge graph. + +Nodes +----- +Movie : id, title, release_date, overview, popularity, vote_average +Person : id, name +Genre : name +ProductionCompany : id, name + +Relationships +------------- +(:Person)-[:ACTED_IN {order: int, character: str}]->(:Movie) +(:Person)-[:DIRECTED]->(:Movie) +(:Movie)-[:IN_GENRE]->(:Genre) +(:Movie)-[:PRODUCED_BY]->(:ProductionCompany) + +Constraints (applied at ingestion time via run.py) +----------- +UNIQUE Movie.id +UNIQUE Person.id +UNIQUE Genre.name +UNIQUE ProductionCompany.id +""" + +# Node labels +NODE_MOVIE = "Movie" +NODE_PERSON = "Person" +NODE_GENRE = "Genre" +NODE_COMPANY = "ProductionCompany" + +# Relationship types +REL_ACTED_IN = "ACTED_IN" +REL_DIRECTED = "DIRECTED" +REL_IN_GENRE = "IN_GENRE" +REL_PRODUCED_BY = "PRODUCED_BY" + +# Properties stored per node type +NODE_PROPERTIES = { + NODE_MOVIE: ["id", "title", "release_date", "overview", "popularity", "vote_average"], + NODE_PERSON: ["id", "name"], + NODE_GENRE: ["name"], + NODE_COMPANY: ["id", "name"], +} + +# Properties stored per relationship type +REL_PROPERTIES = { + REL_ACTED_IN: ["order", "character"], + REL_DIRECTED: [], + REL_IN_GENRE: [], + REL_PRODUCED_BY: [], +} + +# Connectivity map src -> rel -> dest +CONNECTIVITY = [ + (NODE_PERSON, REL_ACTED_IN, NODE_MOVIE), + (NODE_PERSON, REL_DIRECTED, NODE_MOVIE), + (NODE_MOVIE, REL_IN_GENRE, NODE_GENRE), + (NODE_MOVIE, REL_PRODUCED_BY, NODE_COMPANY), +] + +# Cypher constraints to create before ingestion +CONSTRAINTS = [ + f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{NODE_MOVIE}) REQUIRE n.id IS UNIQUE", + f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{NODE_PERSON}) REQUIRE n.id IS UNIQUE", + f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{NODE_GENRE}) REQUIRE n.name IS UNIQUE", + f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{NODE_COMPANY}) REQUIRE n.id IS UNIQUE", +] + + +def schema_to_prompt() -> str: + """ + Returns a natural-language description of the graph schema + for use in LLM system prompts. + """ + lines = ["The knowledge graph contains the following node types:\n"] + + for label, props in NODE_PROPERTIES.items(): + if props: + lines.append(f" {label}: {', '.join(props)}") + else: + lines.append(f" {label}: (no properties)") + + lines.append("\nThe knowledge graph contains the following relationship types:\n") + + for src, rel, dest in CONNECTIVITY: + props = REL_PROPERTIES.get(rel, []) + prop_str = f" with properties: {', '.join(props)}" if props else "" + lines.append(f" (:{src})-[:{rel}]->(:{dest}){prop_str}") + + return "\n".join(lines) \ No newline at end of file diff --git a/examples/LLM_Workflows/neo4j_graph_rag/ingest_module.py b/examples/LLM_Workflows/neo4j_graph_rag/ingest_module.py new file mode 100644 index 000000000..ac9de7686 --- /dev/null +++ b/examples/LLM_Workflows/neo4j_graph_rag/ingest_module.py @@ -0,0 +1,326 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Hamilton ingestion DAG for the TMDB movie dataset. + +Input files (place in data/): + - tmdb_5000_movies.json : movie metadata, genres, production companies + - tmdb_5000_credits.json : cast and crew per movie + +The DAG produces a fully populated Neo4j knowledge graph with the schema +defined in graph_schema.py. + +Node types : Movie, Person, Genre, ProductionCompany +Relationships: ACTED_IN, DIRECTED, IN_GENRE, PRODUCED_BY + +DAG flow: + movies_path -> raw_movies -> parsed_movies -> write_movie_nodes + -> genre_records -> write_genre_nodes_and_edges + -> company_records -> write_company_nodes_and_edges + credits_path -> raw_credits -> parsed_credits -> write_person_nodes_and_edges + all writes -> ingestion_summary +""" + +import json +import logging + +from neo4j import Driver + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# 1. Load raw data +# --------------------------------------------------------------------------- + + +def raw_movies(movies_path: str) -> list[dict]: + """Load raw movie records from the TMDB movies JSON file.""" + with open(movies_path) as f: + data = json.load(f) + logger.info("Loaded %d raw movie records", len(data)) + return data + + +def raw_credits(credits_path: str) -> list[dict]: + """Load raw credit records from the TMDB credits JSON file.""" + with open(credits_path) as f: + data = json.load(f) + logger.info("Loaded %d raw credit records", len(data)) + return data + + +# --------------------------------------------------------------------------- +# 2. Parse movies +# --------------------------------------------------------------------------- + + +def parsed_movies(raw_movies: list[dict]) -> list[dict]: + """ + Extract clean Movie node properties from raw records. + Drops records missing id or title. + """ + movies = [] + for m in raw_movies: + if not m.get("id") or not m.get("title"): + continue + movies.append( + { + "id": int(m["id"]), + "title": str(m["title"]), + "release_date": str(m.get("release_date", "")), + "overview": str(m.get("overview", "")), + "popularity": float(m.get("popularity", 0.0)), + "vote_average": float(m.get("vote_average", 0.0)), + } + ) + logger.info("Parsed %d movies", len(movies)) + return movies + + +def genre_records(raw_movies: list[dict]) -> list[dict]: + """ + Extract (movie_id, genre_name) pairs for IN_GENRE relationships. + Handles both pre-parsed lists and JSON-string encoded genres. + """ + records = [] + for m in raw_movies: + movie_id = m.get("id") + if not movie_id: + continue + genres = m.get("genres", []) + if isinstance(genres, str): + try: + genres = json.loads(genres) + except json.JSONDecodeError: + continue + for g in genres: + if g.get("name"): + records.append({"movie_id": int(movie_id), "genre_name": str(g["name"])}) + logger.info("Extracted %d genre relationships", len(records)) + return records + + +def company_records(raw_movies: list[dict]) -> list[dict]: + """ + Extract (movie_id, company_id, company_name) pairs for PRODUCED_BY relationships. + Handles both pre-parsed lists and JSON-string encoded production_companies. + """ + records = [] + for m in raw_movies: + movie_id = m.get("id") + if not movie_id: + continue + companies = m.get("production_companies", []) + if isinstance(companies, str): + try: + companies = json.loads(companies) + except json.JSONDecodeError: + continue + for c in companies: + if c.get("id") and c.get("name"): + records.append( + { + "movie_id": int(movie_id), + "company_id": int(c["id"]), + "company_name": str(c["name"]), + } + ) + logger.info("Extracted %d production company relationships", len(records)) + return records + + +# --------------------------------------------------------------------------- +# 3. Parse credits +# --------------------------------------------------------------------------- + + +def parsed_credits(raw_credits: list[dict]) -> list[dict]: + """ + Parse credits into a flat list of records containing movie_id, + person details, and role (cast or director only from crew). + Handles both pre-parsed lists and JSON-string encoded cast/crew fields. + """ + records = [] + for c in raw_credits: + movie_id = c.get("id") or c.get("movie_id") + if not movie_id: + continue + + cast = c.get("cast", []) + if isinstance(cast, str): + try: + cast = json.loads(cast) + except json.JSONDecodeError: + cast = [] + for member in cast: + if member.get("id") and member.get("name"): + records.append( + { + "movie_id": int(movie_id), + "person_id": int(member["id"]), + "person_name": str(member["name"]), + "role": "cast", + "character": str(member.get("character", "")), + "order": int(member.get("order", 999)), + } + ) + + crew = c.get("crew", []) + if isinstance(crew, str): + try: + crew = json.loads(crew) + except json.JSONDecodeError: + crew = [] + for member in crew: + if member.get("job") == "Director" and member.get("id") and member.get("name"): + records.append( + { + "movie_id": int(movie_id), + "person_id": int(member["id"]), + "person_name": str(member["name"]), + "role": "director", + "character": "", + "order": -1, + } + ) + + logger.info("Parsed %d credit records", len(records)) + return records + + +# --------------------------------------------------------------------------- +# 4. Write to Neo4j +# --------------------------------------------------------------------------- + + +def _run_batch(session, query: str, batch: list[dict]) -> int: + """Execute a parameterised Cypher UNWIND query for a batch of records.""" + session.run(query, {"batch": batch}) + return len(batch) + + +def write_movie_nodes(parsed_movies: list[dict], neo4j_driver: Driver) -> int: + """MERGE Movie nodes into Neo4j. Returns number of movies written.""" + query = """ + UNWIND $batch AS row + MERGE (m:Movie {id: row.id}) + SET m.title = row.title, + m.release_date = row.release_date, + m.overview = row.overview, + m.popularity = row.popularity, + m.vote_average = row.vote_average + """ + with neo4j_driver.session() as session: + count = _run_batch(session, query, parsed_movies) + logger.info("Wrote %d Movie nodes", count) + return count + + +def write_genre_nodes_and_edges(genre_records: list[dict], neo4j_driver: Driver) -> int: + """MERGE Genre nodes and IN_GENRE relationships. Returns number of edges written.""" + query = """ + UNWIND $batch AS row + MERGE (g:Genre {name: row.genre_name}) + WITH g, row + MATCH (m:Movie {id: row.movie_id}) + MERGE (m)-[:IN_GENRE]->(g) + """ + with neo4j_driver.session() as session: + count = _run_batch(session, query, genre_records) + logger.info("Wrote %d IN_GENRE edges", count) + return count + + +def write_company_nodes_and_edges(company_records: list[dict], neo4j_driver: Driver) -> int: + """MERGE ProductionCompany nodes and PRODUCED_BY relationships.""" + query = """ + UNWIND $batch AS row + MERGE (c:ProductionCompany {id: row.company_id}) + SET c.name = row.company_name + WITH c, row + MATCH (m:Movie {id: row.movie_id}) + MERGE (m)-[:PRODUCED_BY]->(c) + """ + with neo4j_driver.session() as session: + count = _run_batch(session, query, company_records) + logger.info("Wrote %d PRODUCED_BY edges", count) + return count + + +def write_person_nodes_and_edges(parsed_credits: list[dict], neo4j_driver: Driver) -> int: + """ + MERGE Person nodes and ACTED_IN / DIRECTED relationships. + Returns total number of relationships written. + """ + acted_in_query = """ + UNWIND $batch AS row + MERGE (p:Person {id: row.person_id}) + SET p.name = row.person_name + WITH p, row + MATCH (m:Movie {id: row.movie_id}) + MERGE (p)-[r:ACTED_IN {order: row.order}]->(m) + SET r.character = row.character + """ + directed_query = """ + UNWIND $batch AS row + MERGE (p:Person {id: row.person_id}) + SET p.name = row.person_name + WITH p, row + MATCH (m:Movie {id: row.movie_id}) + MERGE (p)-[:DIRECTED]->(m) + """ + cast_records = [r for r in parsed_credits if r["role"] == "cast"] + director_records = [r for r in parsed_credits if r["role"] == "director"] + + with neo4j_driver.session() as session: + _run_batch(session, acted_in_query, cast_records) + _run_batch(session, directed_query, director_records) + + total = len(cast_records) + len(director_records) + logger.info( + "Wrote %d ACTED_IN and %d DIRECTED edges", + len(cast_records), + len(director_records), + ) + return total + + +# --------------------------------------------------------------------------- +# 5. Terminal node +# --------------------------------------------------------------------------- + + +def ingestion_summary( + write_movie_nodes: int, + write_genre_nodes_and_edges: int, + write_company_nodes_and_edges: int, + write_person_nodes_and_edges: int, +) -> dict: + """ + Collect write counts from all upstream nodes and return a summary. + Terminal node of the ingestion DAG. + """ + summary = { + "movies": write_movie_nodes, + "genre_edges": write_genre_nodes_and_edges, + "company_edges": write_company_nodes_and_edges, + "person_edges": write_person_nodes_and_edges, + } + logger.info("Ingestion complete: %s", summary) + return summary \ No newline at end of file diff --git a/examples/LLM_Workflows/neo4j_graph_rag/requirements.txt b/examples/LLM_Workflows/neo4j_graph_rag/requirements.txt new file mode 100644 index 000000000..e1e08da02 --- /dev/null +++ b/examples/LLM_Workflows/neo4j_graph_rag/requirements.txt @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +sf-hamilton>=1.73.0 +neo4j>=5.18.0 +openai>=1.30.0 +pandas>=2.0.0 +python-dotenv>=1.0.0 +tqdm>=4.66.0 diff --git a/examples/LLM_Workflows/neo4j_graph_rag/retrieval_module.py b/examples/LLM_Workflows/neo4j_graph_rag/retrieval_module.py new file mode 100644 index 000000000..3c045d343 --- /dev/null +++ b/examples/LLM_Workflows/neo4j_graph_rag/retrieval_module.py @@ -0,0 +1,789 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Hamilton retrieval DAG — semantic entity resolution + multi-strategy GraphRAG. + +Pipeline: + 1. query_intent — classify into VECTOR / CYPHER / AGGREGATE / HYBRID + 2. entity_extraction — LLM extracts named entities from the query + 3. entity_resolution — each entity is looked up in Neo4j to get its + canonical form (exact name, genre spelling, date format) + 4. cypher_query — Cypher is generated using resolved entities, not raw text + 5. cypher_results — execute and return rows + 6. vector_results — semantic similarity search (VECTOR / HYBRID only) + 7. merged_results — combine both paths + 8. retrieved_context — format for generation DAG +""" + +import json +import logging + +import openai +from neo4j import Driver + +from embed_module import EMBEDDING_MODEL, VECTOR_INDEX_NAME +from graph_schema import schema_to_prompt + +logger = logging.getLogger(__name__) + +TOP_K = 8 +MAX_CAST = 8 +MAX_RETRIES = 2 +SCHEMA_PROMPT = schema_to_prompt() + +# --------------------------------------------------------------------------- +# Cypher examples used in generation prompt +# --------------------------------------------------------------------------- + +CYPHER_EXAMPLES = """ +-- Direct lookup (always return person/movie names explicitly) +MATCH (d:Person)-[:DIRECTED]->(m:Movie) +WHERE toLower(m.title) = toLower('Inception') +RETURN m.title AS movie, d.name AS director, m.release_date, m.vote_average + +-- Actor filmography (always alias person name) +MATCH (p:Person)-[:ACTED_IN]->(m:Movie) +WHERE p.name = 'Tom Hanks' +RETURN p.name AS actor, m.title AS movie, m.release_date +ORDER BY m.release_date DESC +LIMIT 20 + +-- Director filmography +MATCH (p:Person)-[:DIRECTED]->(m:Movie) +WHERE p.name = 'Christopher Nolan' +RETURN p.name AS director, m.title AS movie, m.release_date, m.vote_average +ORDER BY m.release_date + +-- Co-occurrence (use exact resolved names) +MATCH (a:Person {name: 'Tom Hanks'})-[:ACTED_IN]->(m:Movie)<-[:ACTED_IN]-(b:Person {name: 'Robin Wright'}) +RETURN m.title AS movie, m.release_date + +-- Aggregation with minimum count to avoid single-film outliers +MATCH (d:Person)-[:DIRECTED]->(m:Movie) +WITH d, avg(m.vote_average) AS avg_rating, count(m) AS film_count +WHERE film_count >= 3 +RETURN d.name AS director, avg_rating, film_count +ORDER BY avg_rating DESC +LIMIT 10 + +-- Production company (use exact resolved name) +MATCH (c:ProductionCompany {name: 'Warner Bros. Pictures'})<-[:PRODUCED_BY]-(m:Movie) +RETURN m.title AS movie, m.release_date +ORDER BY m.release_date DESC +LIMIT 20 + +-- Genre filter (use exact resolved genre name, string date comparison) +MATCH (m:Movie)-[:IN_GENRE]->(g:Genre {name: 'Comedy'}) +WHERE m.vote_average > 7 + AND m.release_date > '2010-12-31' +RETURN m.title AS movie, m.release_date, m.vote_average +ORDER BY m.vote_average DESC +LIMIT 20 + +-- Multi-hop: always include connecting film titles for context +MATCH (a:Person)-[:ACTED_IN]->(m1:Movie)<-[:DIRECTED]-(d1:Person {name: 'Christopher Nolan'}) + ,(a)-[:ACTED_IN]->(m2:Movie)<-[:DIRECTED]-(d2:Person {name: 'Steven Spielberg'}) +RETURN DISTINCT a.name AS actor, m1.title AS nolan_film, m2.title AS spielberg_film +LIMIT 20 + +-- Hybrid: director + genre + rating +MATCH (d:Person {name: 'James Cameron'})-[:DIRECTED]->(m:Movie)-[:IN_GENRE]->(g:Genre {name: 'Science Fiction'}) +RETURN m.title AS movie, m.vote_average, d.name AS director +ORDER BY m.vote_average DESC +LIMIT 20 + +-- Highest rated overall or by genre (ALWAYS add popularity > 5 to exclude obscure films) +MATCH (m:Movie)-[:IN_GENRE]->(g:Genre {name: 'Drama'}) +WHERE m.popularity > 5 +RETURN m.title AS movie, m.vote_average +ORDER BY m.vote_average DESC +LIMIT 1 + +-- Highest rated film overall (ALWAYS add popularity > 5) +MATCH (m:Movie) +WHERE m.popularity > 5 +RETURN m.title AS movie, m.vote_average +ORDER BY m.vote_average DESC +LIMIT 1 +""" + + +# --------------------------------------------------------------------------- +# 1. Classify query intent +# --------------------------------------------------------------------------- + +def query_intent(user_query: str, openai_api_key: str) -> str: + """ + Classify the user query into one of four retrieval strategies: + VECTOR — thematic/semantic (recommend, find similar, what movies about X) + CYPHER — relational facts (who directed X, filmography, co-stars, direct lookup) + AGGREGATE — counting/ranking across the dataset + HYBRID — needs both graph facts AND semantic similarity / filtering + """ + system = """You are a query classifier for a movie knowledge graph. +Classify the user query into exactly one of these retrieval strategies: + +VECTOR — thematic or semantic: recommendations, "find movies like X", + "movies about space", "psychological thrillers" +CYPHER — relational or factual: "who directed X", "what movies did actor Y appear in", + "did A and B appear together", "movies by director Z", + "what year was X released", "who starred in X" +AGGREGATE — counting, ranking, aggregation: "which company made the most action movies", + "highest rated drama", "director with highest average rating", + "how many movies did X direct" +HYBRID — needs both graph traversal AND filtering/rating: + "highest rated sci-fi films by James Cameron", + "Tom Hanks movies rated above 7.5", + "comedy films after 2010 rated above 7" + +Reply with ONLY one word: VECTOR, CYPHER, AGGREGATE, or HYBRID.""" + + client = openai.OpenAI(api_key=openai_api_key) + response = client.chat.completions.create( + model="gpt-4o", + messages=[ + {"role": "system", "content": system}, + {"role": "user", "content": user_query}, + ], + temperature=0.0, + max_tokens=10, + ) + intent = response.choices[0].message.content.strip().upper() + if intent not in ("VECTOR", "CYPHER", "AGGREGATE", "HYBRID"): + logger.warning("Unexpected intent '%s', defaulting to HYBRID", intent) + intent = "HYBRID" + logger.info("Query intent: %s", intent) + return intent + + +# --------------------------------------------------------------------------- +# 2. Entity extraction +# --------------------------------------------------------------------------- + +def entity_extraction( + user_query: str, + openai_api_key: str, + query_intent: str, +) -> dict: + """ + Extract named entities from the user query as a structured dict. + Returns empty dict for VECTOR queries (no entity resolution needed). + + Entity types extracted: + movies : list of movie titles mentioned + persons : list of person names (actors, directors) + genres : list of genres mentioned + companies : list of production company names mentioned + year_after : year string if query filters by "after YYYY" + year_before : year string if query filters by "before YYYY" + rating_above: float if query filters by minimum rating + rating_below: float if query filters by maximum rating + """ + if query_intent == "VECTOR": + logger.info("Skipping entity extraction for VECTOR intent") + return {} + + system = """You are an entity extractor for a movie knowledge graph query system. + +Extract all named entities from the user query and return a JSON object with these keys +(omit keys that are not present in the query): + +{ + "movies": ["title1", "title2"], + "persons": ["name1", "name2"], + "genres": ["genre1"], + "companies": ["company name"], + "year_after": "YYYY", + "year_before": "YYYY", + "rating_above": 7.5, + "rating_below": 8.0 +} + +Rules: +- Extract exactly what is in the query, do not normalise or correct spellings yet +- If a name could be an actor or director, put it in "persons" +- Return ONLY valid JSON, no explanation, no markdown fences""" + + client = openai.OpenAI(api_key=openai_api_key) + response = client.chat.completions.create( + model="gpt-4o", + messages=[ + {"role": "system", "content": system}, + {"role": "user", "content": user_query}, + ], + temperature=0.0, + max_tokens=300, + ) + raw = response.choices[0].message.content.strip() + if raw.startswith("```"): + raw = "\n".join(raw.split("\n")[1:]) + if raw.endswith("```"): + raw = "\n".join(raw.split("\n")[:-1]) + try: + entities = json.loads(raw) + except json.JSONDecodeError: + logger.warning("Failed to parse entity extraction response: %s", raw) + entities = {} + + logger.info("Extracted entities: %s", entities) + return entities + + +# --------------------------------------------------------------------------- +# 3. Entity resolution — look up canonical forms in Neo4j +# --------------------------------------------------------------------------- + +def _resolve_persons(names: list[str], session) -> dict[str, str]: + """Fuzzy-match person names against the graph, return {input: canonical}.""" + resolved = {} + for name in names: + result = session.run( + """ + MATCH (p:Person) + WHERE toLower(p.name) CONTAINS toLower($name) + RETURN p.name AS name + ORDER BY size(p.name) + LIMIT 1 + """, + {"name": name}, + ).single() + if result: + resolved[name] = result["name"] + logger.info("Resolved person '%s' -> '%s'", name, result["name"]) + else: + resolved[name] = name + logger.warning("Could not resolve person '%s', using as-is", name) + return resolved + + +def _resolve_movies(titles: list[str], session) -> dict[str, str]: + """Fuzzy-match movie titles against the graph, return {input: canonical}.""" + resolved = {} + for title in titles: + result = session.run( + """ + MATCH (m:Movie) + WHERE toLower(m.title) CONTAINS toLower($title) + RETURN m.title AS title + ORDER BY size(m.title) + LIMIT 1 + """, + {"title": title}, + ).single() + if result: + resolved[title] = result["title"] + logger.info("Resolved movie '%s' -> '%s'", title, result["title"]) + else: + resolved[title] = title + logger.warning("Could not resolve movie '%s', using as-is", title) + return resolved + + +GENRE_ALIASES = { + "sci-fi": "Science Fiction", + "scifi": "Science Fiction", + "science fiction": "Science Fiction", + "rom-com": "Romance", + "romcom": "Romance", + "rom com": "Romance", + "action-adventure": "Action", + "documentary": "Documentary", + "doc": "Documentary", + "animated": "Animation", + "animation": "Animation", + "anime": "Animation", + "horror": "Horror", + "comedy": "Comedy", + "drama": "Drama", + "thriller": "Thriller", + "western": "Western", + "fantasy": "Fantasy", + "mystery": "Mystery", + "adventure": "Adventure", + "crime": "Crime", + "family": "Family", + "history": "History", + "music": "Music", + "romance": "Romance", + "war": "War", + "tv movie": "TV Movie", +} + + +def _resolve_genres(genres: list[str], session) -> dict[str, str]: + """ + Resolve genre names to their canonical form in the graph. + First checks a local alias map for common abbreviations (e.g. sci-fi), + then falls back to fuzzy Neo4j lookup. + Returns {input: canonical}. + """ + resolved = {} + for genre in genres: + # Check alias map first + alias = GENRE_ALIASES.get(genre.lower()) + if alias: + resolved[genre] = alias + logger.info("Resolved genre '%s' -> '%s' (alias map)", genre, alias) + continue + + # Fall back to fuzzy Neo4j lookup + result = session.run( + """ + MATCH (g:Genre) + WHERE toLower(g.name) CONTAINS toLower($genre) + RETURN g.name AS name + LIMIT 1 + """, + {"genre": genre}, + ).single() + if result: + resolved[genre] = result["name"] + logger.info("Resolved genre '%s' -> '%s'", genre, result["name"]) + else: + resolved[genre] = genre + logger.warning("Could not resolve genre '%s', using as-is", genre) + return resolved + + +def _resolve_companies(companies: list[str], session) -> dict[str, str]: + """Fuzzy-match company names against the graph, return {input: canonical}.""" + resolved = {} + for company in companies: + result = session.run( + """ + MATCH (c:ProductionCompany) + WHERE toLower(c.name) CONTAINS toLower($company) + RETURN c.name AS name + ORDER BY size(c.name) + LIMIT 1 + """, + {"company": company}, + ).single() + if result: + resolved[company] = result["name"] + logger.info("Resolved company '%s' -> '%s'", company, result["name"]) + else: + resolved[company] = company + logger.warning("Could not resolve company '%s', using as-is", company) + return resolved + + +def entity_resolution( + entity_extraction: dict, + neo4j_driver: Driver, + query_intent: str, +) -> dict: + """ + Resolve each extracted entity to its canonical form in Neo4j. + + Returns the same structure as entity_extraction but with values + replaced by their canonical graph names. Also normalises: + - year_after -> date string '>YYYY-01-01' for Cypher comparison + - year_before -> date string ' kept as floats + """ + if query_intent == "VECTOR" or not entity_extraction: + return entity_extraction + + resolved = {} + + with neo4j_driver.session() as session: + if entity_extraction.get("persons"): + resolved["persons"] = _resolve_persons( + entity_extraction["persons"], session + ) + + if entity_extraction.get("movies"): + resolved["movies"] = _resolve_movies( + entity_extraction["movies"], session + ) + + if entity_extraction.get("genres"): + resolved["genres"] = _resolve_genres( + entity_extraction["genres"], session + ) + + if entity_extraction.get("companies"): + resolved["companies"] = _resolve_companies( + entity_extraction["companies"], session + ) + + # Pass through numeric/date filters unchanged + for key in ("year_after", "year_before", "rating_above", "rating_below"): + if key in entity_extraction: + resolved[key] = entity_extraction[key] + + logger.info("Resolved entities: %s", resolved) + return resolved + + +# --------------------------------------------------------------------------- +# 4. Vector path +# --------------------------------------------------------------------------- + +def query_embedding( + user_query: str, + openai_api_key: str, + query_intent: str, +) -> list[float]: + """Embed the user query. Returns empty list for CYPHER/AGGREGATE.""" + if query_intent not in ("VECTOR", "HYBRID"): + logger.info("Skipping embedding for intent: %s", query_intent) + return [] + + client = openai.OpenAI(api_key=openai_api_key) + response = client.embeddings.create( + model=EMBEDDING_MODEL, + input=user_query, + ) + embedding = response.data[0].embedding + logger.info("Computed query embedding (%d dims)", len(embedding)) + return embedding + + +def vector_results( + query_embedding: list[float], + neo4j_driver: Driver, + query_intent: str, +) -> list[dict]: + """Run vector similarity search. Returns empty list for CYPHER/AGGREGATE.""" + if query_intent not in ("VECTOR", "HYBRID") or not query_embedding: + return [] + + cypher = """ + CALL db.index.vector.queryNodes($index_name, $top_k, $query_vector) + YIELD node, score + RETURN + node.id AS id, + node.title AS title, + node.overview AS overview, + node.release_date AS release_date, + node.vote_average AS vote_average, + score + ORDER BY score DESC + """ + with neo4j_driver.session() as session: + rows = session.run( + cypher, + { + "index_name": VECTOR_INDEX_NAME, + "top_k": TOP_K, + "query_vector": query_embedding, + }, + ).data() + + logger.info( + "Vector search: %d results (top score: %.4f)", + len(rows), + rows[0]["score"] if rows else 0.0, + ) + return rows + + +# --------------------------------------------------------------------------- +# 5. Cypher generation using resolved entities +# --------------------------------------------------------------------------- + +def _build_entity_context(resolved: dict) -> str: + """ + Build a plain-English summary of resolved entities for the Cypher + generation prompt so the LLM uses exact canonical names. + """ + if not resolved: + return "" + + lines = ["Resolved entities to use in the query (use these EXACT values):"] + + persons = resolved.get("persons", {}) + if persons: + for original, canonical in persons.items(): + lines.append(f' Person: "{canonical}"') + + movies = resolved.get("movies", {}) + if movies: + for original, canonical in movies.items(): + lines.append(f' Movie title: "{canonical}"') + + genres = resolved.get("genres", {}) + if genres: + for original, canonical in genres.items(): + lines.append(f' Genre: "{canonical}"') + + companies = resolved.get("companies", {}) + if companies: + for original, canonical in companies.items(): + lines.append(f' ProductionCompany: "{canonical}"') + + if "year_after" in resolved: + lines.append(f' Date filter: m.release_date > \'{resolved["year_after"]}-01-01\'') + if "year_before" in resolved: + lines.append(f' Date filter: m.release_date < \'{resolved["year_before"]}-12-31\'') + if "rating_above" in resolved: + lines.append(f' Rating filter: m.vote_average > {resolved["rating_above"]}') + if "rating_below" in resolved: + lines.append(f' Rating filter: m.vote_average < {resolved["rating_below"]}') + + return "\n".join(lines) + + +def _generate_cypher( + user_query: str, + entity_context: str, + client: openai.OpenAI, + hint: str = "", +) -> str: + """Generate Cypher using the schema, examples, and resolved entity context.""" + system = f"""You are an expert Neo4j Cypher query generator. + +Graph schema: +{SCHEMA_PROMPT} + +Example queries: +{CYPHER_EXAMPLES} + +{entity_context} + +Rules: +- Use ONLY the exact entity names provided above — do not paraphrase or guess +- Relationship directions: (Person)-[:DIRECTED]->(Movie), (Person)-[:ACTED_IN]->(Movie), + (Movie)-[:IN_GENRE]->(Genre), (Movie)-[:PRODUCED_BY]->(ProductionCompany) +- Use exact match {{name: 'resolved name'}} when entity is resolved, CONTAINS only as fallback +- Always alias RETURN fields with meaningful names: actor, director, movie, genre, company +- For date comparisons use STRING comparison: m.release_date > '2010-12-31' + (release_date is stored as a string, NOT a Neo4j date type) +- For aggregation ranking queries add: WITH ..., count(m) AS film_count WHERE film_count >= 3 +- For "highest rated" queries ALWAYS add WHERE m.popularity > 5 to exclude obscure low-vote films +- LIMIT 20 unless counting +- Return ONLY the Cypher query, no explanation, no markdown fences +{hint}""" + + response = client.chat.completions.create( + model="gpt-4o", + messages=[ + {"role": "system", "content": system}, + {"role": "user", "content": f"Generate a Cypher query for: {user_query}"}, + ], + temperature=0.0, + max_tokens=500, + ) + raw = response.choices[0].message.content.strip() + if raw.startswith("```"): + raw = "\n".join(raw.split("\n")[1:]) + if raw.endswith("```"): + raw = "\n".join(raw.split("\n")[:-1]) + return raw.strip() + + +def _execute_cypher(cypher: str, driver: Driver) -> list[dict]: + """Execute Cypher and return results as list of dicts.""" + try: + with driver.session() as session: + return session.run(cypher).data() + except Exception as e: + logger.warning("Cypher execution failed: %s", e) + return [] + + +def cypher_query( + user_query: str, + entity_resolution: dict, + openai_api_key: str, + query_intent: str, + neo4j_driver: Driver, +) -> str: + """ + Generate and validate a Cypher query using resolved entities. + Retries once with a relaxed hint if first attempt returns no results. + Returns the successful Cypher string or empty string. + """ + if query_intent == "VECTOR": + return "" + + client = openai.OpenAI(api_key=openai_api_key) + entity_context = _build_entity_context(entity_resolution) + + for attempt in range(MAX_RETRIES): + hint = "" + if attempt == 1: + hint = ( + "\nHINT: The previous query returned no results. " + "Double-check relationship directions. " + "Try broadening filters or using CONTAINS as a fallback for name matching." + ) + + cypher = _generate_cypher(user_query, entity_context, client, hint) + logger.info("Generated Cypher (attempt %d):\n%s", attempt + 1, cypher) + + results = _execute_cypher(cypher, neo4j_driver) + if results: + logger.info("Cypher returned %d results", len(results)) + return cypher + + logger.warning("Cypher attempt %d returned no results", attempt + 1) + + logger.error("All Cypher attempts failed for: %s", user_query) + return "" + + +def cypher_results( + cypher_query: str, + neo4j_driver: Driver, +) -> list[dict]: + """Execute the validated Cypher query and return results.""" + if not cypher_query: + return [] + results = _execute_cypher(cypher_query, neo4j_driver) + logger.info("Cypher results: %d rows", len(results)) + return results + + +# --------------------------------------------------------------------------- +# 6. Enrich vector results with graph traversal +# --------------------------------------------------------------------------- + +def _enrich_movie(movie_id: int, driver: Driver) -> dict | None: + """Pull directors, cast, genres, companies for a movie node.""" + cypher = """ + MATCH (m:Movie {id: $movie_id}) + OPTIONAL MATCH (d:Person)-[:DIRECTED]->(m) + OPTIONAL MATCH (a:Person)-[r:ACTED_IN]->(m) + WITH m, d, a, r ORDER BY r.order ASC + OPTIONAL MATCH (m)-[:IN_GENRE]->(g:Genre) + OPTIONAL MATCH (m)-[:PRODUCED_BY]->(c:ProductionCompany) + RETURN + m.title AS title, + m.overview AS overview, + m.release_date AS release_date, + m.vote_average AS vote_average, + collect(DISTINCT d.name) AS directors, + collect(DISTINCT a.name)[0..$max_cast] AS cast, + collect(DISTINCT g.name) AS genres, + collect(DISTINCT c.name) AS companies + """ + with driver.session() as session: + row = session.run(cypher, {"movie_id": movie_id, "max_cast": MAX_CAST}).single() + return dict(row) if row else None + + +# --------------------------------------------------------------------------- +# 7. Merge results +# --------------------------------------------------------------------------- + +def merged_results( + vector_results: list[dict], + cypher_results: list[dict], + neo4j_driver: Driver, + query_intent: str, +) -> list[dict]: + """Merge Cypher and vector results. Cypher results come first.""" + final = [] + + if cypher_results: + final.extend({"_source": "cypher", **row} for row in cypher_results) + + if vector_results: + seen_ids = set() + for hit in vector_results: + movie_id = hit.get("id") + if movie_id in seen_ids: + continue + seen_ids.add(movie_id) + enriched = _enrich_movie(movie_id, neo4j_driver) + if enriched: + enriched["_source"] = "vector" + enriched["_score"] = hit.get("score", 0.0) + final.append(enriched) + + logger.info( + "Merged %d results (%d cypher, %d vector)", + len(final), + len(cypher_results), + len([r for r in final if r.get("_source") == "vector"]), + ) + return final + + +# --------------------------------------------------------------------------- +# 8. Format context +# --------------------------------------------------------------------------- + +def retrieved_context(merged_results: list[dict], query_intent: str) -> str: + """ + Format merged results into plain-text context for the generation DAG. + + Enriched movie records (from vector path) are formatted with full + metadata. Raw Cypher rows are formatted as numbered plain-English + lines with field labels so the LLM can read them directly. + """ + if not merged_results: + return "No relevant information found in the knowledge graph for this query." + + FIELD_LABELS = { + "movie": "Movie", + "director": "Director", + "actor": "Actor", + "genre": "Genre", + "company": "Production company", + "film_count": "Films", + "movie_count": "Count", + "action_movie_count": "Action movies", + "avg_rating": "Avg rating", + "average_rating": "Avg rating", + "vote_average": "Rating", + "release_date": "Released", + } + + lines = [] + i = 0 + + for row in merged_results: + i += 1 + source = row.get("_source", "unknown") + + if "directors" in row: + # Enriched movie record from vector path + directors = ", ".join(row.get("directors") or []) or "Unknown" + cast = ", ".join(row.get("cast") or []) or "Unknown" + genres = ", ".join(row.get("genres") or []) or "Unknown" + companies = ", ".join(row.get("companies") or []) or "Unknown" + release = (row.get("release_date") or "")[:4] or "N/A" + rating = row.get("vote_average", "N/A") + title = row.get("title", "Unknown") + overview = row.get("overview", "") + lines.append(f"{i}. {title} ({release}) — Rating: {rating}") + lines.append(f" Directed by: {directors}") + lines.append(f" Cast: {cast}") + lines.append(f" Genres: {genres}") + lines.append(f" Produced by: {companies}") + if overview: + lines.append(f" Overview: {overview}") + lines.append("") + else: + # Raw Cypher result row + clean = {k: v for k, v in row.items() if not k.startswith("_")} + parts = [] + for k, v in clean.items(): + label = FIELD_LABELS.get(k, k.replace("_", " ").capitalize()) + if isinstance(v, float): + v = round(v, 3) + parts.append(f"{label}: {v}") + lines.append(f"{i}. " + " | ".join(parts)) + + context = "\n".join(lines) + logger.info("Formatted context: %d chars from %d results", len(context), len(merged_results)) + return context \ No newline at end of file diff --git a/examples/LLM_Workflows/neo4j_graph_rag/run.py b/examples/LLM_Workflows/neo4j_graph_rag/run.py new file mode 100644 index 000000000..ca995bd14 --- /dev/null +++ b/examples/LLM_Workflows/neo4j_graph_rag/run.py @@ -0,0 +1,242 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Entry point for the Neo4j GraphRAG example. + +Usage +----- +# Step 1 — ingest TMDB data into Neo4j +python run.py --mode ingest + +# Step 2 — compute and store embeddings on Movie nodes +python run.py --mode embed + +# Step 3 — ask questions against the populated graph +python run.py --mode query --question "Who directed Inception?" + +# Visualise any DAG without executing +python run.py --mode ingest --visualise +python run.py --mode embed --visualise +python run.py --mode query --visualise +""" + +import argparse +import logging +import os +from pathlib import Path + +from dotenv import load_dotenv +from hamilton import driver +from neo4j import GraphDatabase + +import embed_module +import generation_module +import ingest_module +import retrieval_module +from graph_schema import CONSTRAINTS + +load_dotenv() +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# helpers +# --------------------------------------------------------------------------- + + +def get_env(key: str) -> str: + val = os.getenv(key) + if not val: + raise EnvironmentError(f"Missing required environment variable: {key}") + return val + + +def make_neo4j_driver(): + uri = get_env("NEO4J_URI") + user = get_env("NEO4J_USERNAME") + password = get_env("NEO4J_PASSWORD") + drv = GraphDatabase.driver(uri, auth=(user, password)) + drv.verify_connectivity() + logger.info("Connected to Neo4j at %s", uri) + return drv + + +def apply_constraints(drv): + with drv.session() as session: + for constraint in CONSTRAINTS: + session.run(constraint) + logger.info("Applied %d constraints", len(CONSTRAINTS)) + + +# --------------------------------------------------------------------------- +# ingest +# --------------------------------------------------------------------------- + + +def run_ingest(visualise: bool = False): + data_dir = Path(__file__).parent / "data" + movies_path = str(data_dir / "tmdb_5000_movies.json") + credits_path = str(data_dir / "tmdb_5000_credits.json") + + for p in [movies_path, credits_path]: + if not Path(p).exists(): + raise FileNotFoundError( + f"Missing data file: {p}\n" + "Download from https://www.kaggle.com/datasets/tmdb/tmdb-movie-metadata " + "and place both JSON files in the data/ folder.\n" + "See data/README.md for conversion instructions." + ) + + drv = make_neo4j_driver() + apply_constraints(drv) + + ingest_driver = driver.Builder().with_modules(ingest_module).build() + + if visualise: + ingest_driver.display_all_functions("ingest_dag.png") + logger.info("Saved ingest DAG visualisation to ingest_dag.png") + drv.close() + return + + result = ingest_driver.execute( + ["ingestion_summary"], + inputs={ + "movies_path": movies_path, + "credits_path": credits_path, + "neo4j_driver": drv, + }, + ) + s = result["ingestion_summary"] + logger.info( + "Ingestion complete — movies: %d, genre edges: %d, " + "company edges: %d, person edges: %d", + s["movies"], s["genre_edges"], s["company_edges"], s["person_edges"], + ) + drv.close() + + +# --------------------------------------------------------------------------- +# embed +# --------------------------------------------------------------------------- + + +def run_embed(visualise: bool = False): + drv = make_neo4j_driver() + openai_api_key = get_env("OPENAI_API_KEY") + + embed_driver = driver.Builder().with_modules(embed_module).build() + + if visualise: + embed_driver.display_all_functions("embed_dag.png") + logger.info("Saved embed DAG visualisation to embed_dag.png") + drv.close() + return + + result = embed_driver.execute( + ["embedding_summary"], + inputs={ + "neo4j_driver": drv, + "openai_api_key": openai_api_key, + }, + ) + s = result["embedding_summary"] + logger.info( + "Embedding complete — %d embeddings written, index: %s, model: %s", + s["embeddings_written"], s["vector_index"], s["model"], + ) + drv.close() + + +# --------------------------------------------------------------------------- +# query +# --------------------------------------------------------------------------- + + +def run_query(question: str, visualise: bool = False): + drv = make_neo4j_driver() + openai_api_key = get_env("OPENAI_API_KEY") + + rag_driver = ( + driver.Builder() + .with_modules(retrieval_module, generation_module) + .build() + ) + + if visualise: + rag_driver.display_all_functions("rag_dag.png") + logger.info("Saved RAG DAG visualisation to rag_dag.png") + drv.close() + return + + result = rag_driver.execute( + ["answer"], + inputs={ + "user_query": question, + "neo4j_driver": drv, + "openai_api_key": openai_api_key, + }, + ) + print("\n" + "=" * 60) + print(f"Q: {question}") + print("=" * 60) + print(f"A: {result['answer']}") + print("=" * 60 + "\n") + drv.close() + + +# --------------------------------------------------------------------------- +# main +# --------------------------------------------------------------------------- + + +def main(): + parser = argparse.ArgumentParser(description="Neo4j GraphRAG — TMDB Movies") + parser.add_argument( + "--mode", + choices=["ingest", "embed", "query"], + required=True, + help=( + "ingest: load TMDB data into Neo4j | " + "embed: compute and store embeddings | " + "query: ask a question" + ), + ) + parser.add_argument( + "--question", + type=str, + default="Which movies did Christopher Nolan direct?", + help="Question to ask (only used in query mode)", + ) + parser.add_argument( + "--visualise", + action="store_true", + help="Save a PNG of the Hamilton DAG and exit without executing", + ) + args = parser.parse_args() + + if args.mode == "ingest": + run_ingest(visualise=args.visualise) + elif args.mode == "embed": + run_embed(visualise=args.visualise) + elif args.mode == "query": + run_query(question=args.question, visualise=args.visualise) + + +if __name__ == "__main__": + main() \ No newline at end of file