Skip to content

Commit d2c2cc6

Browse files
committed
Added call API Doris for insert data
1 parent aa39c92 commit d2c2cc6

1 file changed

Lines changed: 136 additions & 0 deletions

File tree

knowageutils/src/main/java/it/eng/spagobi/tools/dataset/persist/PersistedTableManager.java

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,32 +17,47 @@
1717
*/
1818
package it.eng.spagobi.tools.dataset.persist;
1919

20+
import java.io.BufferedWriter;
2021
import java.lang.reflect.Constructor;
2122
import java.lang.reflect.InvocationTargetException;
2223
import java.math.BigDecimal;
24+
import java.net.URI;
25+
import java.net.http.HttpClient;
26+
import java.net.http.HttpRequest;
27+
import java.net.http.HttpResponse;
28+
import java.nio.charset.StandardCharsets;
29+
import java.nio.file.Files;
30+
import java.nio.file.Path;
2331
import java.security.SecureRandom;
2432
import java.sql.Connection;
2533
import java.sql.PreparedStatement;
2634
import java.sql.ResultSet;
2735
import java.sql.ResultSetMetaData;
2836
import java.sql.SQLException;
2937
import java.sql.Statement;
38+
import java.time.Instant;
3039
import java.util.ArrayList;
40+
import java.util.Base64;
3141
import java.util.HashMap;
3242
import java.util.HashSet;
3343
import java.util.Iterator;
3444
import java.util.List;
3545
import java.util.Map;
46+
import java.util.Optional;
3647
import java.util.Random;
3748
import java.util.Set;
3849
import java.util.UUID;
50+
import java.util.zip.GZIPOutputStream;
3951

4052
import org.apache.log4j.Logger;
4153

54+
import com.fasterxml.jackson.databind.JsonNode;
55+
import com.fasterxml.jackson.databind.ObjectMapper;
4256
import com.jamonapi.Monitor;
4357
import com.jamonapi.MonitorFactory;
4458

4559
import it.eng.spago.security.IEngUserProfile;
60+
import it.eng.spagobi.commons.SingletonConfig;
4661
import it.eng.spagobi.tools.dataset.bo.AbstractJDBCDataset;
4762
import it.eng.spagobi.tools.dataset.bo.FileDataSet;
4863
import it.eng.spagobi.tools.dataset.bo.IDataSet;
@@ -79,6 +94,7 @@ public class PersistedTableManager implements IPersistedManager {
7994
private static final int BATCH_SIZE = 1000;
8095
private static final Random RANDOM = new SecureRandom();
8196
private static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
97+
private static final String COLUMN_SEPARATOR = "|";
8298

8399
private DatabaseDialect dialect = null;
84100
private String tableName = "";
@@ -148,7 +164,127 @@ public void persistDataSet(IDataSet dataset, IDataSource dsPersist, String table
148164
}
149165
}
150166

167+
private void persistDoris(IDataSet dataSet, IDataSource datasource, String tableName) throws Exception {
168+
LOGGER.debug("IN");
169+
Path tempCsv = null;
170+
Path gzPath = null;
171+
try (DataIterator iterator = dataSet.iterator()) {
172+
createTable(iterator.getMetaData(), datasource);
173+
174+
tempCsv = Files.createTempFile("doris_streamload_", ".csv");
175+
LOGGER.debug("Serializing dataset to CSV: " + tempCsv);
176+
serializeIteratorToCsvStreaming(iterator, tempCsv);
177+
178+
gzPath = Files.createTempFile("doris_streamload_", ".csv.gz");
179+
LOGGER.debug("Compressing CSV to GZIP: " + gzPath);
180+
gzipFile(tempCsv, gzPath);
181+
182+
String label = "knowage_" + tableName + "_" + Instant.now().getEpochSecond();
183+
184+
String user = Optional.ofNullable(datasource.getUser()).filter(u -> !u.isEmpty())
185+
.orElse(SingletonConfig.getInstance().getConfigValue("KNOWAGE.DORIS.USER"));
186+
if (user == null || user.isEmpty()) {
187+
throw new RuntimeException("Error : User doris is undefined");
188+
}
189+
String password = Optional.ofNullable(datasource.getPwd()).filter(u -> !u.isEmpty())
190+
.or(() -> Optional.ofNullable(SingletonConfig.getInstance().getConfigValue("KNOWAGE.DORIS.PASSWORD")).filter(u -> !u.isEmpty())).orElse("");
191+
192+
String endpoint = SingletonConfig.getInstance().getConfigValue("KNOWAGE.DORIS.ENDPOINT");
193+
if (endpoint == null || endpoint.isEmpty()) {
194+
throw new RuntimeException("Error : Endpoint doris is undefined");
195+
}
196+
endpoint = endpoint.replace("{table}", tableName);
197+
198+
String basicAuth = Base64.getEncoder().encodeToString((user + ":" + password).getBytes(StandardCharsets.UTF_8));
199+
200+
HttpClient client = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).connectTimeout(java.time.Duration.ofSeconds(10)).build();
201+
202+
HttpRequest request = HttpRequest.newBuilder().uri(URI.create(endpoint)).timeout(java.time.Duration.ofMinutes(10))
203+
.header("Authorization", "Basic " + basicAuth).header("label", label).header("format", "csv").header("compress_type", "gz")
204+
.header("column_separator", COLUMN_SEPARATOR).header("Content-Type", "text/csv; charset=UTF-8").expectContinue(true)
205+
.PUT(HttpRequest.BodyPublishers.ofFile(gzPath))
206+
.build();
207+
208+
LOGGER.info("Executing Stream Load: " + endpoint);
209+
LOGGER.debug("Label: " + label);
210+
211+
HttpResponse<String> resp = client.send(request, HttpResponse.BodyHandlers.ofString());
212+
LOGGER.info("Stream Load HTTP status: " + resp.statusCode());
213+
LOGGER.debug("Stream Load response body: " + resp.body());
214+
215+
ObjectMapper mapper = new ObjectMapper();
216+
JsonNode node = mapper.readTree(resp.body());
217+
String status = node.get("Status").asText();
218+
219+
if (resp.statusCode() == 200 && status != null && status.equalsIgnoreCase("Success")) {
220+
LOGGER.info("Stream Load completed successfully for table " + tableName);
221+
222+
} else {
223+
throw new RuntimeException("Stream Load failed: HTTP " + resp.statusCode() + " - " + resp.body());
224+
}
225+
226+
} finally {
227+
Files.deleteIfExists(tempCsv);
228+
Files.deleteIfExists(gzPath);
229+
}
230+
}
231+
232+
private void serializeIteratorToCsvStreaming(DataIterator iterator, Path tempCsv) throws Exception {
233+
try (BufferedWriter bw = Files.newBufferedWriter(tempCsv, StandardCharsets.UTF_8)) {
234+
long count = 0;
235+
while (iterator.hasNext()) {
236+
IRecord r = iterator.next();
237+
bw.write(toCsvLine(r));
238+
bw.write("\n");
239+
count++;
240+
if (count % 10000 == 0) {
241+
LOGGER.debug("Serialized records " + count);
242+
}
243+
}
244+
bw.flush();
245+
}
246+
}
247+
248+
private String toCsvLine(IRecord record) {
249+
StringBuilder sb = new StringBuilder();
250+
for (int j = 0; j < record.getFields().size(); j++) {
251+
IField field = record.getFieldAt(j);
252+
Object fieldValue = field.getValue();
253+
if (j > 0) {
254+
sb.append(COLUMN_SEPARATOR);
255+
}
256+
sb.append(escapeCsv(fieldValue));
257+
}
258+
259+
return sb.toString();
260+
}
261+
262+
private String escapeCsv(Object v) {
263+
if (v == null) {
264+
return "\\N";// convenzione Doris per NULL
265+
}
266+
String s = String.valueOf(v);
267+
s = s.replace("\r", " ").replace("\n", " ");
268+
return s;
269+
}
270+
271+
private void gzipFile(Path src, Path dst) throws Exception {
272+
try (var in = Files.newInputStream(src); var out = new GZIPOutputStream(Files.newOutputStream(dst))) {
273+
in.transferTo(out);
274+
// GZIPOutputStream chiude l'header/CRC su close()
275+
}
276+
}
277+
151278
public void persist(IDataSet dataSet, IDataSource datasource, String tableName) throws Exception {
279+
if (datasource.getDialectName().contains(DatabaseDialect.DORIS.getValue())) {
280+
persistDoris(dataSet, datasource, tableName);
281+
} else {
282+
persistJdbc(dataSet, datasource, tableName);
283+
}
284+
285+
}
286+
287+
private void persistJdbc(IDataSet dataSet, IDataSource datasource, String tableName) throws Exception {
152288
LOGGER.debug("IN");
153289

154290
Monitor monitor = MonitorFactory.start("spagobi.cache.sqldb.persist.paginated");

0 commit comments

Comments
 (0)