Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public static Observation graphToObservations(McfOptimizedGraph graph, String im
obs.variableMeasured(graph.getSvObsSeries().getKey().getVariableMeasured());
obs.unit(graph.getSvObsSeries().getKey().getUnit());
obs.scalingFactor(graph.getSvObsSeries().getKey().getScalingFactor());
obs.provenanceUrl(graph.getSvObsSeries().getKey().getProvenanceUrl());
Observations.Builder ob = Observations.newBuilder();
for (StatVarObs svo : graph.getSvObsSeries().getSvObsListList()) {
if (svo.hasNumber()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ public void testGraphToObservations() {
.setObservationPeriod("P1Y")
.setMeasurementMethod("dcAggregate/testMethod")
.setUnit("testUnit")
.setScalingFactor("100"))
.setScalingFactor("100")
.setProvenanceUrl("http://example.com"))
.addSvObsList(
StatVarObs.newBuilder().setDcid("obs1").setDate("2020").setNumber(10.0))
.addSvObsList(
Expand All @@ -335,6 +336,7 @@ public void testGraphToObservations() {
.observationPeriod("P1Y")
.unit("testUnit")
.scalingFactor("100")
.provenanceUrl("http://example.com")
.observations(expectedObsValues)
.build();

Expand Down
19 changes: 14 additions & 5 deletions simple/stats/jsonld_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from util.filesystem import create_store

DCID_URL = "https://datacommons.org/browser/"
PREDICATE_URL = "url"


def expand_id(item):
Expand Down Expand Up @@ -90,7 +91,7 @@ def process_triples(db, output_dir, ns_map: dict, chunk_size: int):
break


def _add_observation_to_graph(g, row, DCID):
def _add_observation_to_graph(g, row, DCID, prov_urls):
"""Helper to add an observation row to the graph."""
entity, variable, date, value, provenance, unit, scaling_factor, mmethod, period, props = row

Expand All @@ -111,6 +112,8 @@ def _add_observation_to_graph(g, row, DCID):

if provenance:
g.add((subject, DCID["provenance"], expand_id(provenance)))
if provenance in prov_urls and prov_urls[provenance]:
g.add((subject, DCID["provenanceUrl"], Literal(prov_urls[provenance])))
if unit:
g.add((subject, DCID["unit"], expand_id(unit)))
if scaling_factor:
Expand All @@ -137,7 +140,7 @@ def _process_observation_chunk(args):
This runs in a separate process, so it must establish its own DB connection
and import necessary modules locally.
"""
shard_index, offset, chunk_size, db_path, output_dir_path, ns_map = args
shard_index, offset, chunk_size, db_path, output_dir_path, ns_map, prov_urls = args

# Open a new connection for this worker process (SQLite connections cannot be shared across processes)
conn = sqlite3.connect(db_path)
Expand All @@ -160,7 +163,7 @@ def _process_observation_chunk(args):
g.bind("dcid", DCID)

for row in obs_tuples:
_add_observation_to_graph(g, row, DCID)
_add_observation_to_graph(g, row, DCID, prov_urls)

# Write the graph to a JSON-LD shard file
with create_store(output_dir_path) as store:
Expand All @@ -179,9 +182,15 @@ def process_observations(db, output_dir, ns_map: dict, chunk_size: int):
total_obs = db.engine.fetch_all("SELECT COUNT(*) FROM observations")[0][0]
num_chunks = (total_obs + chunk_size - 1) // chunk_size

# Fetch all provenance URLs once to pass to workers
rows = db.engine.fetch_all(
f"SELECT subject_id, object_value FROM triples WHERE predicate = '{PREDICATE_URL}'"
)
prov_urls = {row[0]: row[1] for row in rows}

# Prepare arguments for the worker pool (each chunk gets its own offset and index)
args_list = [(i, i * chunk_size, chunk_size, db_path, output_dir_path, ns_map)
for i in range(num_chunks)]
args_list = [(i, i * chunk_size, chunk_size, db_path, output_dir_path, ns_map,
prov_urls) for i in range(num_chunks)]

# Cap the number of processes to avoid overloading the machine
num_processes = min(multiprocessing.cpu_count(), 8)
Expand Down
22 changes: 14 additions & 8 deletions simple/tests/stats/jsonld_exporter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ def test_export(self):
Triple(subject_id="sub1",
predicate="typeOf",
object_id="StatisticalVariable"),
Triple(subject_id="sub1", predicate="name", object_value="Name1")
Triple(subject_id="sub1", predicate="name", object_value="Name1"),
Triple(subject_id="p1",
predicate="url",
object_value="http://example.com/p1")
])

# Insert some observations
Expand All @@ -64,11 +67,12 @@ def test_export(self):
export_to_jsonld(db, output_dir, chunk_size=1)

# Verify files exist
# 2 triples with chunk_size=1 -> 2 shards (0 and 1)
# 1 observation with chunk_size=1 -> 1 shard (2)
# 3 triples with chunk_size=1 -> 3 shards (0, 1 and 2)
# 1 observation with chunk_size=1 -> 1 shard (3)
shard_paths = [
os.path.join(temp_dir, "node-00000.jsonld"),
os.path.join(temp_dir, "node-00001.jsonld"),
os.path.join(temp_dir, "node-00002.jsonld"),
os.path.join(temp_dir, "observation-00000.jsonld")
]
for path in shard_paths:
Expand All @@ -83,16 +87,18 @@ def test_export(self):
self.assertEqual(nodes['dcid:sub1']['@type'],
'dcid:StatisticalVariable')

# Verify content of shard 2 (should have observations)
with open(shard_paths[2], 'r') as f:
shard2 = json.load(f)
self.assertIn('@graph', shard2)
nodes = {node['@id']: node for node in shard2['@graph']}
# Verify content of observation shard (should have observations)
with open(shard_paths[3], 'r') as f:
shard3 = json.load(f)
self.assertIn('@graph', shard3)
nodes = {node['@id']: node for node in shard3['@graph']}
obs_nodes = [node for node in nodes if node.startswith('dcid:obs_')]
self.assertTrue(
len(obs_nodes) > 0, "No observation node found in shard")
obs_node_id = obs_nodes[0]
self.assertEqual(nodes[obs_node_id]['dcid:value'], 100.0)
self.assertEqual(nodes[obs_node_id]['dcid:provenanceUrl'],
"http://example.com/p1")

def test_empty_db(self):
with tempfile.TemporaryDirectory() as temp_dir:
Expand Down
15 changes: 13 additions & 2 deletions util/src/main/java/org/datacommons/util/GraphUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public enum Property {
value,
name,
scalingFactor,
dcid;
dcid,
provenanceUrl;
}

public static final String STAT_VAR_OB = "StatVarObservation";
Expand All @@ -55,6 +56,9 @@ public enum Property {
Property.value.name()
};

private static final List<String> PROVENANCE_URL_PROPS =
List.of("provenanceUrl", "dcid:provenanceUrl");

private static final Set<String> SVOBS_PROTO_PROPS =
Set.of(
/** Standard properties expected in StatVarObservation nodes. */
Expand All @@ -67,7 +71,8 @@ public enum Property {
Property.unit.name(),
Property.value.name(),
Property.scalingFactor.name(),
Property.dcid.name());
Property.dcid.name(),
Property.provenanceUrl.name());

/**
* Checks if a given property name is one of the standard SVObs properties.
Expand Down Expand Up @@ -290,6 +295,12 @@ public static McfStatVarObsSeries convertMcfGraphToMcfStatVarObsSeries(
if (!(val = getPropVal(node, "unit")).isEmpty()) {
key.setUnit(val);
}
for (String prop : PROVENANCE_URL_PROPS) {
if (!(val = getPropVal(node, prop)).isEmpty()) {
key.setProvenanceUrl(val);
break;
}
}
res.setKey(key.build());
// Assemble StatVarObs.
McfStatVarObsSeries.StatVarObs.Builder svo = res.addSvObsListBuilder();
Expand Down
1 change: 1 addition & 0 deletions util/src/main/proto/Mcf.proto
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ message McfStatVarObsSeries {
optional string observation_period = 4;
optional string scaling_factor = 5;
optional string unit = 6;
optional string provenance_url = 7;
}

required Key key = 1;
Expand Down