Skip to content

Commit ba9169a

Browse files
authored
streaming finalize_graph (#2240)
* add streaming * add patch * fix spelling * add improvements and test
1 parent cd0c405 commit ba9169a

5 files changed

Lines changed: 639 additions & 95 deletions

File tree

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"type": "patch",
3+
"description": "finalize_graph streaming"
4+
}
Lines changed: 49 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,56 @@
1-
# Copyright (c) 2024 Microsoft Corporation.
1+
# Copyright (C) 2026 Microsoft
22
# Licensed under the MIT License
33

4-
"""All the steps to transform final entities."""
4+
"""Stream-finalize entity rows into an output Table."""
55

6+
from typing import Any
67
from uuid import uuid4
78

8-
import pandas as pd
9+
from graphrag_storage.tables.table import Table
910

1011
from graphrag.data_model.schemas import ENTITIES_FINAL_COLUMNS
11-
from graphrag.graphs.compute_degree import compute_degree
12-
13-
14-
def finalize_entities(
15-
entities: pd.DataFrame,
16-
relationships: pd.DataFrame,
17-
) -> pd.DataFrame:
18-
"""All the steps to transform final entities."""
19-
degrees = compute_degree(relationships)
20-
final_entities = entities.merge(degrees, on="title", how="left").drop_duplicates(
21-
subset="title"
22-
)
23-
final_entities = final_entities.loc[entities["title"].notna()].reset_index()
24-
# disconnected nodes and those with no community even at level 0 can be missing degree
25-
final_entities["degree"] = final_entities["degree"].fillna(0).astype(int)
26-
final_entities.reset_index(inplace=True)
27-
final_entities["human_readable_id"] = final_entities.index
28-
final_entities["id"] = final_entities["human_readable_id"].apply(
29-
lambda _x: str(uuid4())
30-
)
31-
return final_entities.loc[
32-
:,
33-
ENTITIES_FINAL_COLUMNS,
34-
]
12+
13+
14+
async def finalize_entities(
15+
entities_table: Table,
16+
degree_map: dict[str, int],
17+
) -> list[dict[str, Any]]:
18+
"""Read entity rows, enrich with degree, and write back.
19+
20+
Streams through the entities table, deduplicates by title,
21+
assigns degree from the pre-computed degree map, and writes
22+
each finalized row back to the same table (safe when using
23+
truncate=True, which reads from the original and writes to
24+
a temp file).
25+
26+
Args
27+
----
28+
entities_table: Table
29+
Opened table for both reading input and writing output.
30+
degree_map: dict[str, int]
31+
Pre-computed mapping of entity title to node degree.
32+
33+
Returns
34+
-------
35+
list[dict[str, Any]]
36+
Sample of up to 5 entity rows for logging.
37+
"""
38+
sample_rows: list[dict[str, Any]] = []
39+
seen_titles: set[str] = set()
40+
human_readable_id = 0
41+
42+
async for row in entities_table:
43+
title = row.get("title")
44+
if not title or title in seen_titles:
45+
continue
46+
seen_titles.add(title)
47+
row["degree"] = degree_map.get(title, 0)
48+
row["human_readable_id"] = human_readable_id
49+
row["id"] = str(uuid4())
50+
human_readable_id += 1
51+
out = {col: row.get(col) for col in ENTITIES_FINAL_COLUMNS}
52+
await entities_table.write(out)
53+
if len(sample_rows) < 5:
54+
sample_rows.append(out)
55+
56+
return sample_rows
Lines changed: 48 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,55 @@
1-
# Copyright (c) 2024 Microsoft Corporation.
1+
# Copyright (C) 2026 Microsoft
22
# Licensed under the MIT License
33

4-
"""All the steps to transform final relationships."""
4+
"""Stream-finalize relationship rows into an output Table."""
55

6+
from typing import Any
67
from uuid import uuid4
78

8-
import pandas as pd
9+
from graphrag_storage.tables.table import Table
910

1011
from graphrag.data_model.schemas import RELATIONSHIPS_FINAL_COLUMNS
11-
from graphrag.graphs.compute_degree import compute_degree
12-
from graphrag.index.operations.compute_edge_combined_degree import (
13-
compute_edge_combined_degree,
14-
)
15-
16-
17-
def finalize_relationships(
18-
relationships: pd.DataFrame,
19-
) -> pd.DataFrame:
20-
"""All the steps to transform final relationships."""
21-
degrees = compute_degree(relationships)
22-
23-
final_relationships = relationships.drop_duplicates(subset=["source", "target"])
24-
final_relationships["combined_degree"] = compute_edge_combined_degree(
25-
final_relationships,
26-
degrees,
27-
node_name_column="title",
28-
node_degree_column="degree",
29-
edge_source_column="source",
30-
edge_target_column="target",
31-
)
32-
33-
final_relationships.reset_index(inplace=True)
34-
final_relationships["human_readable_id"] = final_relationships.index
35-
final_relationships["id"] = final_relationships["human_readable_id"].apply(
36-
lambda _x: str(uuid4())
37-
)
38-
39-
return final_relationships.loc[
40-
:,
41-
RELATIONSHIPS_FINAL_COLUMNS,
42-
]
12+
13+
14+
async def finalize_relationships(
15+
relationships_table: Table,
16+
degree_map: dict[str, int],
17+
) -> list[dict[str, Any]]:
18+
"""Deduplicate relationships, enrich with combined degree, and write.
19+
20+
Streams through the relationships table, deduplicates by
21+
(source, target) pair, computes combined_degree as the sum of
22+
source and target node degrees, and writes each finalized row
23+
back to the table.
24+
25+
Args
26+
----
27+
relationships_table: Table
28+
Opened table for reading and writing relationship rows.
29+
degree_map: dict[str, int]
30+
Pre-computed mapping of entity title to node degree.
31+
32+
Returns
33+
-------
34+
list[dict[str, Any]]
35+
Sample of up to 5 relationship rows for logging.
36+
"""
37+
sample_rows: list[dict[str, Any]] = []
38+
seen: set[tuple[str, str]] = set()
39+
human_readable_id = 0
40+
41+
async for row in relationships_table:
42+
key = (row.get("source", ""), row.get("target", ""))
43+
if key in seen:
44+
continue
45+
seen.add(key)
46+
row["combined_degree"] = degree_map.get(key[0], 0) + degree_map.get(key[1], 0)
47+
row["human_readable_id"] = human_readable_id
48+
row["id"] = str(uuid4())
49+
human_readable_id += 1
50+
final = {col: row.get(col) for col in RELATIONSHIPS_FINAL_COLUMNS}
51+
await relationships_table.write(final)
52+
if len(sample_rows) < 5:
53+
sample_rows.append(final)
54+
55+
return sample_rows
Lines changed: 94 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
1-
# Copyright (c) 2024 Microsoft Corporation.
1+
# Copyright (C) 2026 Microsoft
22
# Licensed under the MIT License
33

44
"""A module containing run_workflow method definition."""
55

66
import logging
7+
from collections import Counter
8+
from typing import Any
79

8-
import pandas as pd
10+
from graphrag_storage.tables.table import Table
911

1012
from graphrag.config.models.graph_rag_config import GraphRagConfig
11-
from graphrag.data_model.data_reader import DataReader
13+
from graphrag.data_model.row_transformers import (
14+
transform_entity_row,
15+
transform_relationship_row,
16+
)
1217
from graphrag.index.operations.finalize_entities import finalize_entities
13-
from graphrag.index.operations.finalize_relationships import finalize_relationships
18+
from graphrag.index.operations.finalize_relationships import (
19+
finalize_relationships,
20+
)
1421
from graphrag.index.operations.snapshot_graphml import snapshot_graphml
1522
from graphrag.index.typing.context import PipelineRunContext
1623
from graphrag.index.typing.workflow import WorkflowFunctionOutput
@@ -24,41 +31,95 @@ async def run_workflow(
2431
) -> WorkflowFunctionOutput:
2532
"""All the steps to create the base entity graph."""
2633
logger.info("Workflow started: finalize_graph")
27-
reader = DataReader(context.output_table_provider)
28-
entities = await reader.entities()
29-
relationships = await reader.relationships()
3034

31-
final_entities, final_relationships = finalize_graph(
32-
entities,
33-
relationships,
34-
)
35-
36-
await context.output_table_provider.write_dataframe("entities", final_entities)
37-
await context.output_table_provider.write_dataframe(
38-
"relationships", final_relationships
39-
)
35+
async with (
36+
context.output_table_provider.open(
37+
"entities",
38+
transformer=transform_entity_row,
39+
) as entities_table,
40+
context.output_table_provider.open(
41+
"relationships",
42+
transformer=transform_relationship_row,
43+
) as relationships_table,
44+
):
45+
result = await finalize_graph(
46+
entities_table,
47+
relationships_table,
48+
)
4049

4150
if config.snapshots.graphml:
51+
rels = await context.output_table_provider.read_dataframe("relationships")
4252
await snapshot_graphml(
43-
final_relationships,
53+
rels,
4454
name="graph",
4555
storage=context.output_storage,
4656
)
4757

4858
logger.info("Workflow completed: finalize_graph")
49-
return WorkflowFunctionOutput(
50-
result={
51-
"entities": entities,
52-
"relationships": relationships,
53-
}
54-
)
55-
56-
57-
def finalize_graph(
58-
entities: pd.DataFrame,
59-
relationships: pd.DataFrame,
60-
) -> tuple[pd.DataFrame, pd.DataFrame]:
61-
"""All the steps to finalize the entity and relationship formats."""
62-
final_entities = finalize_entities(entities, relationships)
63-
final_relationships = finalize_relationships(relationships)
64-
return (final_entities, final_relationships)
59+
return WorkflowFunctionOutput(result=result)
60+
61+
62+
async def finalize_graph(
63+
entities_table: Table,
64+
relationships_table: Table,
65+
) -> dict[str, list[dict[str, Any]]]:
66+
"""Compute degrees and finalize entities and relationships.
67+
68+
Streams relationship rows to build a degree map without
69+
materializing a DataFrame, then delegates to the individual
70+
finalize operations for streaming row-by-row enrichment and
71+
writing.
72+
73+
Args
74+
----
75+
entities_table: Table
76+
Opened table for reading and writing entity rows.
77+
relationships_table: Table
78+
Opened table for reading relationships into a DataFrame
79+
and writing finalized relationship rows.
80+
81+
Returns
82+
-------
83+
dict[str, list[dict[str, Any]]]
84+
Sample rows keyed by ``"entities"`` and
85+
``"relationships"``, up to 5 each.
86+
"""
87+
degree_map = await _build_degree_map(relationships_table)
88+
89+
entity_samples = await finalize_entities(entities_table, degree_map)
90+
relationship_samples = await finalize_relationships(relationships_table, degree_map)
91+
92+
return {
93+
"entities": entity_samples,
94+
"relationships": relationship_samples,
95+
}
96+
97+
98+
async def _build_degree_map(
99+
relationships_table: Table,
100+
) -> dict[str, int]:
101+
"""Stream relationship rows to compute node degrees.
102+
103+
Normalizes each edge to an undirected pair and deduplicates
104+
on the fly, matching the behavior of ``compute_degree`` but
105+
without materializing a DataFrame.
106+
107+
Args
108+
----
109+
relationships_table: Table
110+
Opened table to stream relationship rows from.
111+
112+
Returns
113+
-------
114+
dict[str, int]
115+
Mapping of entity title to its node degree.
116+
"""
117+
seen: set[tuple[str, str]] = set()
118+
degree: Counter[str] = Counter()
119+
async for row in relationships_table:
120+
lo, hi = sorted((row["source"], row["target"]))
121+
if (lo, hi) not in seen:
122+
seen.add((lo, hi))
123+
degree[lo] += 1
124+
degree[hi] += 1
125+
return dict(degree)

0 commit comments

Comments
 (0)