1818package it .eng .spagobi .tools .dataset .persist ;
1919
2020import java .io .BufferedWriter ;
21+ import java .io .IOException ;
2122import java .lang .reflect .Constructor ;
2223import java .lang .reflect .InvocationTargetException ;
2324import java .math .BigDecimal ;
@@ -179,65 +180,69 @@ private void persistDoris(IDataSet dataSet, IDataSource datasource, String table
179180 LOGGER .debug ("Compressing CSV to GZIP: " + gzPath );
180181 gzipFile (tempCsv , gzPath );
181182
182- String label = "knowage_" + tableName + "_" + Instant . now (). getEpochSecond ( );
183+ callDoris ( datasource , tableName , gzPath );
183184
184- String user = Optional .ofNullable (datasource .getUser ()).filter (u -> !u .isEmpty ())
185- .orElse (SingletonConfig .getInstance ().getConfigValue ("KNOWAGE.DORIS.USER" ));
186- if (user == null || user .isEmpty ()) {
187- LOGGER .error ("Error : User doris is undefined" );
188- throw new RuntimeException ("Error : User doris is undefined" );
189- }
190- String password = Optional .ofNullable (datasource .getPwd ()).filter (u -> !u .isEmpty ())
191- .or (() -> Optional .ofNullable (SingletonConfig .getInstance ().getConfigValue ("KNOWAGE.DORIS.PASSWORD" )).filter (u -> !u .isEmpty ())).orElse ("" );
185+ } finally {
186+ Files .deleteIfExists (tempCsv );
187+ Files .deleteIfExists (gzPath );
188+ }
189+ }
192190
193- String endpoint = SingletonConfig .getInstance ().getConfigValue ("KNOWAGE.DORIS.ENDPOINT" );
194- if (endpoint == null || endpoint .isEmpty ()) {
195- LOGGER .error ("Error : Endpoint doris is undefined" );
196- throw new RuntimeException ("Error : Endpoint doris is undefined" );
197- }
198- endpoint = endpoint .replace ("{table}" , tableName );
191+ private void callDoris (IDataSource datasource , String tableName , Path gzPath ) throws IOException , InterruptedException {
192+ String label = "knowage_" + tableName + "_" + Instant .now ().getEpochSecond ();
199193
200- String basicAuth = Base64 .getEncoder ().encodeToString ((user + ":" + password ).getBytes (StandardCharsets .UTF_8 ));
194+ String user = Optional .ofNullable (datasource .getUser ()).filter (u -> !u .isEmpty ())
195+ .orElse (SingletonConfig .getInstance ().getConfigValue ("KNOWAGE.DORIS.USER" ));
196+ if (user == null || user .isEmpty ()) {
197+ LOGGER .error ("Error : User doris is undefined" );
198+ throw new RuntimeException ("Error : User doris is undefined" );
199+ }
200+ String password = Optional .ofNullable (datasource .getPwd ()).filter (u -> !u .isEmpty ())
201+ .or (() -> Optional .ofNullable (SingletonConfig .getInstance ().getConfigValue ("KNOWAGE.DORIS.PASSWORD" )).filter (u -> !u .isEmpty ())).orElse ("" );
201202
202- try {
203- HttpClient client = HttpClient .newBuilder ().version (HttpClient .Version .HTTP_1_1 ).connectTimeout (java .time .Duration .ofSeconds (10 )).build ();
203+ String endpoint = SingletonConfig .getInstance ().getConfigValue ("KNOWAGE.DORIS.ENDPOINT" );
204+ if (endpoint == null || endpoint .isEmpty ()) {
205+ LOGGER .error ("Error : Endpoint doris is undefined" );
206+ throw new RuntimeException ("Error : Endpoint doris is undefined" );
207+ }
208+ endpoint = endpoint .replace ("{table}" , tableName );
204209
205- HttpRequest request = HttpRequest .newBuilder ().uri (URI .create (endpoint )).timeout (java .time .Duration .ofMinutes (10 ))
206- .header ("Authorization" , "Basic " + basicAuth ).header ("label" , label ).header ("format" , "csv" ).header ("compress_type" , "gz" )
207- .header ("column_separator" , COLUMN_SEPARATOR ).header ("Content-Type" , "text/csv; charset=UTF-8" ).expectContinue (true )
208- .PUT (HttpRequest .BodyPublishers .ofFile (gzPath ))
209- .build ();
210+ String basicAuth = Base64 .getEncoder ().encodeToString ((user + ":" + password ).getBytes (StandardCharsets .UTF_8 ));
210211
211- LOGGER . info ( "Executing Stream Load: " + endpoint );
212- LOGGER . debug ( "Label: " + label );
212+ try {
213+ HttpClient client = HttpClient . newBuilder (). version ( HttpClient . Version . HTTP_1_1 ). connectTimeout ( java . time . Duration . ofSeconds ( 10 )). build ( );
213214
214- HttpResponse <String > resp = client .send (request , HttpResponse .BodyHandlers .ofString ());
215+ HttpRequest request = HttpRequest .newBuilder ().uri (URI .create (endpoint )).timeout (java .time .Duration .ofMinutes (10 ))
216+ .header ("Authorization" , "Basic " + basicAuth ).header ("label" , label ).header ("format" , "csv" ).header ("compress_type" , "gz" )
217+ .header ("column_separator" , COLUMN_SEPARATOR ).header ("Content-Type" , "text/csv; charset=UTF-8" ).expectContinue (true )
218+ .PUT (HttpRequest .BodyPublishers .ofFile (gzPath ))
219+ .build ();
215220
216- LOGGER .info ("Stream Load HTTP status : " + resp . statusCode () );
217- LOGGER .debug ("Stream Load response body : " + resp . body () );
221+ LOGGER .info ("Executing Stream Load: " + endpoint );
222+ LOGGER .debug ("Label : " + label );
218223
219- ObjectMapper mapper = new ObjectMapper ();
220- JsonNode node = mapper .readTree (resp .body ());
221- String status = node .get ("Status" ).asText ();
224+ HttpResponse <String > resp = client .send (request , HttpResponse .BodyHandlers .ofString ());
222225
223- if ( resp . statusCode () == 200 && status != null && status . equalsIgnoreCase ( "Success" )) {
224- LOGGER .info ("Stream Load completed successfully for table " + tableName );
226+ LOGGER . info ( "Stream Load HTTP status: " + resp . statusCode ());
227+ LOGGER .debug ("Stream Load response body: " + resp . body () );
225228
226- } else {
227- dropTableIfExists (datasource , tableName );
228- LOGGER .error ("Error Doris body " + resp .body ());
229- throw new RuntimeException ("Stream Load failed: HTTP " + resp .statusCode () + " - " + resp .body ());
230- }
229+ ObjectMapper mapper = new ObjectMapper ();
230+ JsonNode node = mapper .readTree (resp .body ());
231+ String status = node .get ("Status" ).asText ();
231232
232- } catch (Exception e ) {
233+ if (resp .statusCode () == 200 && status != null && status .equalsIgnoreCase ("Success" )) {
234+ LOGGER .info ("Stream Load completed successfully for table " + tableName );
235+
236+ } else {
233237 dropTableIfExists (datasource , tableName );
234- LOGGER .error ("Error Communication Knowage/ Doris " + endpoint , e );
235- throw e ;
238+ LOGGER .error ("Error Doris body " + resp . body () );
239+ throw new RuntimeException ( "Stream Load failed: HTTP " + resp . statusCode () + " - " + resp . body ()) ;
236240 }
237241
238- } finally {
239- Files .deleteIfExists (tempCsv );
240- Files .deleteIfExists (gzPath );
242+ } catch (Exception e ) {
243+ dropTableIfExists (datasource , tableName );
244+ LOGGER .error ("Error Communication Knowage/Doris " + endpoint , e );
245+ throw e ;
241246 }
242247 }
243248
@@ -257,6 +262,21 @@ private void serializeIteratorToCsvStreaming(DataIterator iterator, Path tempCsv
257262 }
258263 }
259264
265+ private void serializeIteratorToCsvStreaming (List <IRecord > records , Path tempCsv ) throws Exception {
266+ try (BufferedWriter bw = Files .newBufferedWriter (tempCsv , StandardCharsets .UTF_8 )) {
267+ long count = 0 ;
268+ for (IRecord r : records ) {
269+ bw .write (toCsvLine (r ));
270+ bw .write ("\n " );
271+ count ++;
272+ if (count % 10000 == 0 ) {
273+ LOGGER .debug ("Serialized records " + count );
274+ }
275+ }
276+ bw .flush ();
277+ }
278+ }
279+
260280 private String toCsvLine (IRecord record ) {
261281 StringBuilder sb = new StringBuilder ();
262282 for (int j = 0 ; j < record .getFields ().size (); j ++) {
@@ -691,58 +711,82 @@ private IDataStore normalizeFileDataSet(IDataSet dataSet, IDataStore datastore)
691711 }
692712
693713 public void persistDataset (IDataStore datastore , IDataSource datasource ) throws Exception {
694- LOGGER .debug ("IN" );
695- Connection connection = null ;
696- String dialect = datasource .getHibDialectClass ();
697- try {
698- LOGGER .debug ("The datastore metadata object contains # [" + datastore .getMetaData ().getFieldCount ()
699- + "] fields" );
700- if (datastore .getMetaData ().getFieldCount () == 0 ) {
701- LOGGER .debug ("The datastore metadata object hasn't fields. Dataset doesn't persisted!!" );
702- return ;
703- }
704- connection = getConnection (datasource );
714+ if (datasource .getDialectName ().contains (DatabaseDialect .DORIS .getValue ())) {
715+ LOGGER .debug ("IN" );
716+ Path tempCsv = null ;
717+ Path gzPath = null ;
718+ try {
719+ createTable (datastore .getMetaData (), datasource );
705720
706- // VoltDB does not allow explicit commit/rollback actions.
707- // It uses an internal transaction committing mechanism.
708- // More tests to see the consistency has to be done on this.
709- if (!dialect .contains ("VoltDB" )) {
710- connection .setAutoCommit (false );
721+ tempCsv = Files .createTempFile ("doris_streamload_" , ".csv" );
722+ LOGGER .debug ("Serializing dataset to CSV: " + tempCsv );
723+ serializeIteratorToCsvStreaming (datastore .getRecords (), tempCsv );
724+
725+ gzPath = Files .createTempFile ("doris_streamload_" , ".csv.gz" );
726+ LOGGER .debug ("Compressing CSV to GZIP: " + gzPath );
727+ gzipFile (tempCsv , gzPath );
728+
729+ callDoris (datasource , tableName , gzPath );
730+
731+ } finally {
732+ Files .deleteIfExists (tempCsv );
733+ Files .deleteIfExists (gzPath );
711734 }
712- // Steps #1: define prepared statement (and max column size for
713- // strings type)
714- PreparedStatement [] statements = defineStatements (datastore , datasource , connection );
715- // Steps #2: set query timeout (if necessary)
716- if (queryTimeout > 0 ) {
735+ } else {
736+
737+ LOGGER .debug ("IN" );
738+ Connection connection = null ;
739+ String dialect = datasource .getHibDialectClass ();
740+ try {
741+ LOGGER .debug ("The datastore metadata object contains # [" + datastore .getMetaData ().getFieldCount ()
742+ + "] fields" );
743+ if (datastore .getMetaData ().getFieldCount () == 0 ) {
744+ LOGGER .debug ("The datastore metadata object hasn't fields. Dataset doesn't persisted!!" );
745+ return ;
746+ }
747+ connection = getConnection (datasource );
748+
749+ // VoltDB does not allow explicit commit/rollback actions.
750+ // It uses an internal transaction committing mechanism.
751+ // More tests to see the consistency has to be done on this.
752+ if (!dialect .contains ("VoltDB" )) {
753+ connection .setAutoCommit (false );
754+ }
755+ // Steps #1: define prepared statement (and max column size for
756+ // strings type)
757+ PreparedStatement [] statements = defineStatements (datastore , datasource , connection );
758+ // Steps #2: set query timeout (if necessary)
759+ if (queryTimeout > 0 ) {
760+ for (int i = 0 ; i < statements .length ; i ++) {
761+ statements [i ].setQueryTimeout (queryTimeout );
762+ }
763+ }
764+ // Steps #3,4: define create table statement
765+ createTable (datastore .getMetaData (), datasource );
766+ // Step #5: execute batch with insert statements
717767 for (int i = 0 ; i < statements .length ; i ++) {
718- statements [i ].setQueryTimeout (queryTimeout );
768+ PreparedStatement statement = statements [i ];
769+ statement .executeBatch ();
770+ statement .close ();
719771 }
720- }
721- // Steps #3,4: define create table statement
722- createTable (datastore .getMetaData (), datasource );
723- // Step #5: execute batch with insert statements
724- for (int i = 0 ; i < statements .length ; i ++) {
725- PreparedStatement statement = statements [i ];
726- statement .executeBatch ();
727- statement .close ();
728- }
729- if (!dialect .contains ("VoltDB" )) {
730- connection .commit ();
731- }
732- LOGGER .debug ("Insertion of records on persistable table executed successfully!" );
733- } catch (Exception e ) {
734- LOGGER .error ("Error persisting the dataset into table" , e );
735- if (connection != null && !dialect .contains ("VoltDB" )) {
736- connection .rollback ();
737- }
738- throw new SpagoBIEngineRuntimeException ("Error persisting the dataset into table" , e );
739- } finally {
740- if (connection != null && !connection .isClosed ()) {
741- connection .close ();
772+ if (!dialect .contains ("VoltDB" )) {
773+ connection .commit ();
774+ }
775+ LOGGER .debug ("Insertion of records on persistable table executed successfully!" );
776+ } catch (Exception e ) {
777+ LOGGER .error ("Error persisting the dataset into table" , e );
778+ if (connection != null && !dialect .contains ("VoltDB" )) {
779+ connection .rollback ();
780+ }
781+ throw new SpagoBIEngineRuntimeException ("Error persisting the dataset into table" , e );
782+ } finally {
783+ if (connection != null && !connection .isClosed ()) {
784+ connection .close ();
785+ }
786+ LOGGER .debug ("OUT" );
742787 }
743788 LOGGER .debug ("OUT" );
744789 }
745- LOGGER .debug ("OUT" );
746790 }
747791
748792 private PreparedStatement [] defineStatements (IDataStore datastore , IDataSource datasource , Connection connection )
0 commit comments