11package org .datacommons .ingestion .data ;
22
3- import com .google .cloud .ByteArray ;
43import com .google .cloud .spanner .Mutation ;
54import java .io .IOException ;
65import java .io .Serializable ;
98import java .util .Collections ;
109import java .util .List ;
1110import java .util .Map ;
11+ import java .util .Set ;
1212import org .apache .beam .sdk .Pipeline ;
1313import org .apache .beam .sdk .metrics .Counter ;
14+ import org .apache .beam .sdk .transforms .Combine ;
1415import org .apache .beam .sdk .transforms .Create ;
1516import org .apache .beam .sdk .transforms .DoFn ;
1617import org .apache .beam .sdk .transforms .Flatten ;
@@ -40,6 +41,118 @@ public class GraphReader implements Serializable {
4041 private static final String DATCOM_AGGREGATE = "DataCommonsAggregate" ;
4142 private static final String IMPORT_METADATA_FILE = "import_metadata_mcf.mcf" ;
4243
44+ public static PCollection <Node > combineNodes (PCollection <Node > nodes ) {
45+ return nodes
46+ .apply (
47+ "MapNodesToKV" ,
48+ ParDo .of (
49+ new DoFn <Node , KV <String , Node >>() {
50+ @ ProcessElement
51+ public void processElement (
52+ @ Element Node node , OutputReceiver <KV <String , Node >> receiver ) {
53+ receiver .output (KV .of (node .getSubjectId (), node ));
54+ }
55+ }))
56+ .apply (
57+ "CombineNodes" ,
58+ Combine .perKey (
59+ new Combine .CombineFn <Node , List <Node >, Node >() {
60+ @ Override
61+ public List <Node > createAccumulator () {
62+ return new ArrayList <>();
63+ }
64+
65+ @ Override
66+ public List <Node > addInput (List <Node > accumulator , Node input ) {
67+ accumulator .add (input );
68+ return accumulator ;
69+ }
70+
71+ @ Override
72+ public List <Node > mergeAccumulators (Iterable <List <Node >> accumulators ) {
73+ List <Node > merged = new ArrayList <>();
74+ for (List <Node > acc : accumulators ) {
75+ merged .addAll (acc );
76+ }
77+ return merged ;
78+ }
79+
80+ @ Override
81+ public Node extractOutput (List <Node > accumulator ) {
82+ if (accumulator .isEmpty ()) return null ;
83+ Node first = accumulator .get (0 );
84+ Node .Builder builder =
85+ Node .builder ()
86+ .subjectId (first .getSubjectId ())
87+ .value (first .getValue ())
88+ .name (first .getName ())
89+ .types (first .getTypes ())
90+ .bytes (first .getBytes ());
91+
92+ Set <String > types = new java .util .TreeSet <>();
93+ for (Node n : accumulator ) {
94+ types .addAll (n .getTypes ());
95+ if (!n .getValue ().isEmpty ()) {
96+ builder .value (n .getValue ());
97+ }
98+ if (!n .getName ().isEmpty ()) {
99+ builder .name (n .getName ());
100+ }
101+ if (n .getBytes ().length > 0 ) {
102+ builder .bytes (n .getBytes ());
103+ }
104+ }
105+ if (types .size () > 1 && types .contains ("ProvisionalNode" )) {
106+ types .remove ("ProvisionalNode" );
107+ }
108+ builder .types (new ArrayList <>(types ));
109+ return builder .build ();
110+ }
111+ }))
112+ .apply (
113+ "ExtractNodes" ,
114+ ParDo .of (
115+ new DoFn <KV <String , Node >, Node >() {
116+ @ ProcessElement
117+ public void processElement (
118+ @ Element KV <String , Node > element , OutputReceiver <Node > receiver ) {
119+ receiver .output (element .getValue ());
120+ }
121+ }));
122+ }
123+
124+ public static PCollection <Mutation > nodeToMutations (
125+ PCollection <Node > nodes , SpannerClient spannerClient ) {
126+ return nodes .apply (
127+ "NodesToMutations" ,
128+ ParDo .of (
129+ new DoFn <Node , Mutation >() {
130+ @ ProcessElement
131+ public void processElement (@ Element Node node , OutputReceiver <Mutation > receiver ) {
132+ Mutation mutation = spannerClient .toNodeMutation (node );
133+ if (mutation != null ) {
134+ receiver .output (mutation );
135+ }
136+ }
137+ }));
138+ }
139+
140+ public static PCollection <Mutation > edgeToMutations (
141+ PCollection <Edge > edges , SpannerClient spannerClient ) {
142+ return edges .apply (
143+ "EdgesToMutations" ,
144+ ParDo .of (
145+ new DoFn <Edge , Mutation >() {
146+ @ ProcessElement
147+ public void processElement (@ Element Edge edge , OutputReceiver <Mutation > receiver ) {
148+ Mutation mutation = spannerClient .toEdgeMutation (edge );
149+ if (mutation != null ) {
150+ receiver .output (mutation );
151+ }
152+ }
153+ }));
154+ }
155+
43156 public static List <Node > graphToNodes (McfGraph graph , Counter mcfNodesWithoutTypeCounter ) {
44157 List <Node > nodes = new ArrayList <>();
45158 for (Map .Entry <String , PropertyValues > nodeEntry : graph .getNodesMap ().entrySet ()) {
@@ -79,10 +192,11 @@ public static List<Node> graphToNodes(McfGraph graph, Counter mcfNodesWithoutTyp
79192 node = Node .builder ();
80193 node .subjectId (PipelineUtils .generateObjectValueKey (val .getValue ()));
81194 if (PipelineUtils .storeValueAsBytes (entry .getKey ())) {
82- node .bytes (ByteArray . copyFrom ( PipelineUtils .compressString (val .getValue () )));
195+ node .bytes (PipelineUtils .compressString (val .getValue ()));
83196 } else {
84197 node .value (val .getValue ());
85198 }
199+ node .types (List .of (ValueType .TEXT .toString ()));
86200 nodes .add (node .build ());
87201 }
88202 }
@@ -101,13 +215,13 @@ public static PCollection<McfGraph> getProvenanceMcf(
101215 String defaultProvenance =
102216 "Node: dcid:dc/base/" + importName + "\n " + "typeOf: dcid:Provenance\n " ;
103217 mcfList .add (GraphUtils .convertToGraph (defaultProvenance ));
218+ // try {
219+ // mcfList.add(GraphUtils.convertToGraph(PipelineUtils.getGCSFileContent(metadataFile)));
220+ // } catch (IOException e) {
221+ // LOGGER.warn("Failed to read provenance metadata file: " + e.getMessage());
222+ // }
104223 try {
105- mcfList .addAll (GraphUtils .readMcfString (PipelineUtils .getGcsFileContent (metadataFile )));
106- } catch (IOException e ) {
107- LOGGER .warn ("Failed to read provenance metadata file: " + e .getMessage ());
108- }
109- try {
110- mcfList .addAll (GraphUtils .readMcfString (PipelineUtils .getGcsFileContent (provenanceFile )));
224+ mcfList .add (GraphUtils .convertToGraph (PipelineUtils .getGCSFileContent (provenanceFile )));
111225 } catch (IOException e ) {
112226 LOGGER .warn ("Failed to read provenance metadata file: " + e .getMessage ());
113227 }
@@ -214,6 +328,42 @@ public void processElement(
214328 }));
215329 }
216330
331+ public static PCollection <Node > mcfToNodes (
332+ PCollection <McfGraph > graph , Counter nodeCounter , Counter mcfNodesWithoutTypeCounter ) {
333+ return graph .apply (
334+ "McfToNodes" ,
335+ ParDo .of (
336+ new DoFn <McfGraph , Node >() {
337+ @ ProcessElement
338+ public void processElement (@ Element McfGraph element , OutputReceiver <Node > receiver ) {
339+ List <Node > nodes = graphToNodes (element , mcfNodesWithoutTypeCounter );
340+ for (Node node : nodes ) {
341+ // LOGGER.info("Node: {}", node.toString());
342+ receiver .output (node );
343+ }
344+ nodeCounter .inc (nodes .size ());
345+ }
346+ }));
347+ }
348+
349+ public static PCollection <Edge > mcfToEdges (
350+ PCollection <McfGraph > graph , String provenance , Counter edgeCounter ) {
351+ return graph .apply (
352+ "McfToEdges" ,
353+ ParDo .of (
354+ new DoFn <McfGraph , Edge >() {
355+ @ ProcessElement
356+ public void processElement (@ Element McfGraph element , OutputReceiver <Edge > receiver ) {
357+ List <Edge > edges = graphToEdges (element , provenance );
358+ for (Edge edge : edges ) {
359+ receiver .output (edge );
360+ // LOGGER.info("Edge : {}", edge.toString());
361+ }
362+ edgeCounter .inc (edges .size ());
363+ }
364+ }));
365+ }
366+
217367 public static PCollection <KV <String , Mutation >> graphToNodes (
218368 PCollection <McfGraph > graph ,
219369 SpannerClient spannerClient ,
0 commit comments