@@ -199,12 +199,10 @@ private void persistDoris(IDataSet dataSet, IDataSource datasource, String table
199199
200200 String basicAuth = Base64 .getEncoder ().encodeToString ((user + ":" + password ).getBytes (StandardCharsets .UTF_8 ));
201201
202- HttpResponse <String > resp = null ;
203- HttpRequest request = null ;
204202 try {
205203 HttpClient client = HttpClient .newBuilder ().version (HttpClient .Version .HTTP_1_1 ).connectTimeout (java .time .Duration .ofSeconds (10 )).build ();
206204
207- request = HttpRequest .newBuilder ().uri (URI .create (endpoint )).timeout (java .time .Duration .ofMinutes (10 ))
205+ HttpRequest request = HttpRequest .newBuilder ().uri (URI .create (endpoint )).timeout (java .time .Duration .ofMinutes (10 ))
208206 .header ("Authorization" , "Basic " + basicAuth ).header ("label" , label ).header ("format" , "csv" ).header ("compress_type" , "gz" )
209207 .header ("column_separator" , COLUMN_SEPARATOR ).header ("Content-Type" , "text/csv; charset=UTF-8" ).expectContinue (true )
210208 .PUT (HttpRequest .BodyPublishers .ofFile (gzPath ))
@@ -213,24 +211,28 @@ private void persistDoris(IDataSet dataSet, IDataSource datasource, String table
213211 LOGGER .info ("Executing Stream Load: " + endpoint );
214212 LOGGER .debug ("Label: " + label );
215213
216- resp = client .send (request , HttpResponse .BodyHandlers .ofString ());
217- } catch (Exception e ) {
218- LOGGER .error ("Error Communication Knowage/Doris " + endpoint , e );
219- throw e ;
220- }
221- LOGGER .info ("Stream Load HTTP status: " + resp .statusCode ());
222- LOGGER .debug ("Stream Load response body: " + resp .body ());
214+ HttpResponse <String > resp = client .send (request , HttpResponse .BodyHandlers .ofString ());
215+
216+ LOGGER .info ("Stream Load HTTP status: " + resp .statusCode ());
217+ LOGGER .debug ("Stream Load response body: " + resp .body ());
223218
224- ObjectMapper mapper = new ObjectMapper ();
225- JsonNode node = mapper .readTree (resp .body ());
226- String status = node .get ("Status" ).asText ();
219+ ObjectMapper mapper = new ObjectMapper ();
220+ JsonNode node = mapper .readTree (resp .body ());
221+ String status = node .get ("Status" ).asText ();
227222
228- if (resp .statusCode () == 200 && status != null && status .equalsIgnoreCase ("Success" )) {
229- LOGGER .info ("Stream Load completed successfully for table " + tableName );
223+ if (resp .statusCode () == 200 && status != null && status .equalsIgnoreCase ("Success" )) {
224+ LOGGER .info ("Stream Load completed successfully for table " + tableName );
230225
231- } else {
232- LOGGER .error ("Error Doris body " + resp .body ());
233- throw new RuntimeException ("Stream Load failed: HTTP " + resp .statusCode () + " - " + resp .body ());
226+ } else {
227+ dropTableIfExists (datasource , tableName );
228+ LOGGER .error ("Error Doris body " + resp .body ());
229+ throw new RuntimeException ("Stream Load failed: HTTP " + resp .statusCode () + " - " + resp .body ());
230+ }
231+
232+ } catch (Exception e ) {
233+ dropTableIfExists (datasource , tableName );
234+ LOGGER .error ("Error Communication Knowage/Doris " + endpoint , e );
235+ throw e ;
234236 }
235237
236238 } finally {
0 commit comments