4949public class InfluxDBUtil {
5050 public static final String TAG = "MilvusUtil" ;
5151
52+ public static String getSchema (String schema , String defaultSchema ) {
53+ return getSchema (schema , defaultSchema , true );
54+ }
55+ public static String getSchema (String schema , String defaultSchema , boolean isInfluxDB ) {
56+ if (StringUtil .isEmpty (schema ) && isInfluxDB ) {
57+ schema = defaultSchema ;
58+ }
59+ return schema ;
60+ }
61+
62+ public static String getSQLSchema (String schema ) {
63+ return getSQLSchema (schema , true );
64+ }
65+ public static String getSQLSchema (String schema , boolean isInfluxDB ) {
66+ return isInfluxDB ? null : schema ;
67+ }
68+
69+ public static <T > String getClientKey (@ NotNull SQLConfig <T > config ) {
70+ String uri = config .getDBUri ();
71+ return uri + (uri .contains ("?" ) ? "&" : "?" ) + "username=" + config .getDBAccount ();
72+ }
73+
74+ public static final Map <String , InfluxDB > CLIENT_MAP = new LinkedHashMap <>();
75+ public static <T > InfluxDB getClient (@ NotNull SQLConfig <T > config ) {
76+ return getClient (config , true );
77+ }
78+ public static <T > InfluxDB getClient (@ NotNull SQLConfig <T > config , boolean autoNew ) {
79+ String key = getClientKey (config );
80+
81+ InfluxDB client = CLIENT_MAP .get (key );
82+ if (autoNew && client == null ) {
83+ client = InfluxDBFactory .connect (config .getDBUri (), config .getDBAccount (), config .getDBPassword ());
84+ client .setDatabase (config .getSchema ());
85+
86+ client .enableBatch (
87+ BatchOptions .DEFAULTS
88+ .threadFactory (runnable -> {
89+ Thread thread = new Thread (runnable );
90+ thread .setDaemon (true );
91+ return thread ;
92+ })
93+ );
94+
95+ Runtime .getRuntime ().addShutdownHook (new Thread (client ::close ));
96+
97+ CLIENT_MAP .put (key , client );
98+ }
99+
100+ return client ;
101+ }
102+
103+ public static <T > void closeClient (@ NotNull SQLConfig <T > config ) {
104+ InfluxDB client = getClient (config , false );
105+ if (client != null ) {
106+ String key = getClientKey (config );
107+ CLIENT_MAP .remove (key );
108+
109+ // try {
110+ client .close ();
111+ // }
112+ // catch (Throwable e) {
113+ // e.printStackTrace();
114+ // }
115+ }
116+ }
117+
118+ public static <T > void closeAllClient () {
119+ Collection <InfluxDB > cs = CLIENT_MAP .values ();
120+ for (InfluxDB c : cs ) {
121+ try {
122+ c .close ();
123+ }
124+ catch (Throwable e ) {
125+ e .printStackTrace ();
126+ }
127+ }
128+
129+ CLIENT_MAP .clear ();
130+ }
131+
132+
52133 public static <T > JSONObject execute (@ NotNull SQLConfig <T > config , String sql , boolean unknownType ) throws Exception {
53134 if (RequestMethod .isQueryMethod (config .getMethod ())) {
54135 List <JSONObject > list = executeQuery (config , sql , unknownType );
@@ -73,21 +154,14 @@ public static <T> int execUpdate(SQLConfig<T> config, String sql) throws Excepti
73154 }
74155
75156 public static <T > JSONObject executeUpdate (SQLConfig <T > config , String sql ) throws Exception {
76- InfluxDB influxDB = InfluxDBFactory .connect (config .getDBUri (), config .getDBAccount (), config .getDBPassword ());
77- influxDB .setDatabase (config .getSchema ());
78-
79- influxDB .enableBatch (
80- BatchOptions .DEFAULTS
81- .threadFactory (runnable -> {
82- Thread thread = new Thread (runnable );
83- thread .setDaemon (true );
84- return thread ;
85- })
86- );
87-
88- Runtime .getRuntime ().addShutdownHook (new Thread (influxDB ::close ));
157+ return executeUpdate (null , config , sql );
158+ }
159+ public static <T > JSONObject executeUpdate (InfluxDB client , SQLConfig <T > config , String sql ) throws Exception {
160+ if (client == null ) {
161+ client = getClient (config );
162+ }
89163
90- influxDB .write (StringUtil .isEmpty (sql ) ? config .getSQL (false ) : sql );
164+ client .write (StringUtil .isEmpty (sql ) ? config .getSQL (false ) : sql );
91165
92166 JSONObject result = AbstractParser .newSuccessResult ();
93167
@@ -117,10 +191,31 @@ public static <T> JSONObject executeUpdate(SQLConfig<T> config, String sql) thro
117191 return result ;
118192 }
119193
194+
195+ public static <T > JSONObject execQuery (@ NotNull SQLConfig <T > config , String sql , boolean unknownType ) throws Exception {
196+ List <JSONObject > list = executeQuery (config , sql , unknownType );
197+ JSONObject result = list == null || list .isEmpty () ? null : list .get (0 );
198+ if (result == null ) {
199+ result = new JSONObject (true );
200+ }
201+
202+ if (list != null && list .size () > 1 ) {
203+ result .put (KEY_RAW_LIST , list );
204+ }
205+
206+ return result ;
207+ }
208+
120209 public static <T > List <JSONObject > executeQuery (@ NotNull SQLConfig <T > config , String sql , boolean unknownType ) throws Exception {
121- InfluxDB influxDB = InfluxDBFactory .connect (config .getDBUri (), config .getDBAccount (), config .getDBPassword ());
122- influxDB .setDatabase (config .getSchema ());
123- QueryResult qr = influxDB .query (new Query (StringUtil .isEmpty (sql ) ? config .getSQL (false ) : sql ));
210+ return executeQuery (null , config , sql , unknownType );
211+ }
212+ public static <T > List <JSONObject > executeQuery (InfluxDB client , @ NotNull SQLConfig <T > config , String sql , boolean unknownType ) throws Exception {
213+ if (client == null ) {
214+ client = getClient (config );
215+ }
216+
217+ client .setDatabase (config .getSchema ());
218+ QueryResult qr = client .query (new Query (StringUtil .isEmpty (sql ) ? config .getSQL (false ) : sql ));
124219
125220 String err = qr == null ? null : qr .getError ();
126221 if (StringUtil .isNotEmpty (err , true )) {
@@ -167,4 +262,5 @@ public static <T> List<JSONObject> executeQuery(@NotNull SQLConfig<T> config, St
167262 return resultList ;
168263 }
169264
265+
170266}
0 commit comments