2020
2121package org .entur .gbfs .validator .loader ;
2222
23+ import org .apache .hc .client5 .http .classic .methods .HttpGet ;
24+ import org .apache .hc .client5 .http .config .RequestConfig ;
25+ import org .apache .hc .client5 .http .impl .classic .CloseableHttpClient ;
26+ import org .apache .hc .client5 .http .impl .classic .CloseableHttpResponse ;
27+ import org .apache .hc .client5 .http .impl .classic .HttpClients ;
28+ import org .apache .hc .client5 .http .impl .io .PoolingHttpClientConnectionManager ;
29+ import org .apache .hc .core5 .http .ParseException ;
30+ import org .apache .hc .core5 .http .io .entity .EntityUtils ;
31+ import org .apache .hc .core5 .util .Timeout ;
2332import org .json .JSONObject ;
2433import org .json .JSONTokener ;
2534
26- import java .io .BufferedReader ;
2735import java .io .ByteArrayInputStream ;
2836import java .io .ByteArrayOutputStream ;
2937import java .io .File ;
3038import java .io .FileInputStream ;
3139import java .io .FileNotFoundException ;
3240import java .io .IOException ;
3341import java .io .InputStream ;
34- import java .io .InputStreamReader ;
35- import java .net .HttpURLConnection ;
3642import java .net .URI ;
37- import java .net .URL ;
3843import java .util .ArrayList ;
3944import java .util .HashMap ;
4045import java .util .List ;
46+ import java .util .concurrent .CompletableFuture ;
47+ import java .util .concurrent .ExecutorService ;
48+ import java .util .concurrent .Executors ;
49+ import java .util .concurrent .TimeUnit ;
4150
4251public class Loader {
52+ private final CloseableHttpClient httpClient ;
53+ private final ExecutorService executorService ;
54+
55+ public Loader () {
56+ // Create connection pool manager
57+ PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager ();
58+ // Set the maximum number of total connections
59+ // TODO configurable max total
60+ connectionManager .setMaxTotal (50 );
61+ // Set the maximum number of connections per route
62+ // TODO configurable max per route
63+ connectionManager .setDefaultMaxPerRoute (20 );
64+
65+ // Configure request timeouts
66+ RequestConfig requestConfig = RequestConfig .custom ()
67+ // TODO configurable timeouts
68+ .setConnectTimeout (Timeout .of (5 , TimeUnit .SECONDS ))
69+ .setResponseTimeout (Timeout .of (5 , TimeUnit .SECONDS ))
70+ .build ();
71+
72+ // Build the HttpClient with connection pooling
73+ httpClient = HttpClients .custom ()
74+ .setConnectionManager (connectionManager )
75+ .setDefaultRequestConfig (requestConfig )
76+ .build ();
77+
78+ // Create a thread pool for parallel execution
79+ // TODO configurable pool size
80+ executorService = Executors .newFixedThreadPool (20 );
81+ }
4382
4483 public List <LoadedFile > load (String discoveryURI ) throws IOException {
4584 InputStream discoveryFileStream = loadFile (URI .create (discoveryURI ));
4685
86+
4787 ByteArrayOutputStream discoveryFileCopy = new ByteArrayOutputStream ();
4888 org .apache .commons .io .IOUtils .copy (discoveryFileStream , discoveryFileCopy );
4989 byte [] discoveryFileBytes = discoveryFileCopy .toByteArray ();
@@ -74,23 +114,33 @@ private List<LoadedFile> getV3Files(JSONObject discoveryFile, String discoveryFi
74114 new ByteArrayInputStream (discoveryFileBytes )
75115 ));
76116
77- loadedFiles .addAll (
78- discoveryFile .getJSONObject ("data" ).getJSONArray ("feeds" ).toList ().stream ().map (feed -> {
79- var feedObj = (HashMap ) feed ;
80- var url = (String ) feedObj .get ("url" );
81- var file = loadFile (URI .create (url ));
82- return new LoadedFile (
83- (String ) feedObj .get ("name" ),
84- url ,
85- file
86- );
87- }).toList ());
117+ // Load files in parallel using CompletableFuture
118+ List <CompletableFuture <LoadedFile >> futures = discoveryFile .getJSONObject ("data" ).getJSONArray ("feeds" ).toList ().stream ()
119+ .map (feed -> {
120+ var feedObj = (HashMap ) feed ;
121+ var url = (String ) feedObj .get ("url" );
122+ var name = (String ) feedObj .get ("name" );
123+
124+ // Create a CompletableFuture for each file to load
125+ return CompletableFuture .supplyAsync (() -> {
126+ var file = loadFile (URI .create (url ));
127+ return new LoadedFile (name , url , file );
128+ }, executorService );
129+ })
130+ .toList ();
131+
132+ // Wait for all futures to complete and collect results
133+ loadedFiles .addAll (futures .stream ()
134+ .map (CompletableFuture ::join )
135+ .toList ());
88136
89137 return loadedFiles ;
90138 }
91139
92140 private List <LoadedFile > getPreV3Files (JSONObject discoveryFile , String discoveryFileUrl , byte [] discoveryFileBytes ) {
93141 List <LoadedFile > result = new ArrayList <>();
142+ List <CompletableFuture <LoadedFile >> futures = new ArrayList <>();
143+
94144 discoveryFile .getJSONObject ("data" )
95145 .keys ()
96146 .forEachRemaining (key -> {
@@ -102,21 +152,26 @@ private List<LoadedFile> getPreV3Files(JSONObject discoveryFile, String discover
102152 key
103153 )
104154 );
155+
156+ // Create CompletableFutures for each feed file
105157 discoveryFile .getJSONObject ("data" ).getJSONObject (key ).getJSONArray ("feeds" ).toList ().forEach (feed -> {
106158 var feedObj = (HashMap ) feed ;
107159 var url = (String ) feedObj .get ("url" );
108- var file = loadFile (URI .create (url ));
109- result .add (
110- new LoadedFile (
111- (String ) feedObj .get ("name" ),
112- url ,
113- file ,
114- key
115- )
116- );
160+ var name = (String ) feedObj .get ("name" );
161+
162+ // Create a CompletableFuture for each file to load
163+ futures .add (CompletableFuture .supplyAsync (() -> {
164+ var file = loadFile (URI .create (url ));
165+ return new LoadedFile (name , url , file , key );
166+ }, executorService ));
117167 });
118168 });
119169
170+ // Wait for all futures to complete and collect results
171+ result .addAll (futures .stream ()
172+ .map (CompletableFuture ::join )
173+ .toList ());
174+
120175 return result ;
121176 }
122177
@@ -143,23 +198,39 @@ private static FileInputStream getFileInputStream(URI fileURI) throws FileNotFou
143198 }
144199
145200 private InputStream getHTTPInputStream (URI fileURI ) throws IOException {
146- URL url = fileURI .toURL ();
147- HttpURLConnection con = (HttpURLConnection ) url .openConnection ();
148- con .setRequestMethod ("GET" );
149- con .setConnectTimeout (5000 );
150- con .setReadTimeout (5000 );
151- con .setRequestProperty ("Et-Client-Name" , "entur-gbfs-validator" );
152- con .connect ();
153- BufferedReader in = new BufferedReader (
154- new InputStreamReader (con .getInputStream ()));
155-
156- String inputLine ;
157- StringBuffer content = new StringBuffer ();
158- while ((inputLine = in .readLine ()) != null ) {
159- content .append (inputLine );
201+ HttpGet httpGet = new HttpGet (fileURI );
202+
203+ // TODO configurable headers
204+ httpGet .setHeader ("Et-Client-Name" , "entur-gbfs-validator" );
205+
206+ try (CloseableHttpResponse response = httpClient .execute (httpGet )) {
207+ String content = EntityUtils .toString (response .getEntity ());
208+ return new ByteArrayInputStream (content .getBytes ());
209+ } catch (ParseException e ) {
210+
211+ // Todo handle parse exception
212+ throw new RuntimeException (e );
213+ }
214+ }
215+
216+ // Close the connection pool when the application shuts down
217+ public void close () throws IOException {
218+ if (httpClient != null ) {
219+ httpClient .close ();
220+ }
221+
222+ // Shutdown the executor service
223+ if (executorService != null && !executorService .isShutdown ()) {
224+ executorService .shutdown ();
225+ try {
226+ // Wait for tasks to complete
227+ if (!executorService .awaitTermination (5 , TimeUnit .SECONDS )) {
228+ executorService .shutdownNow ();
229+ }
230+ } catch (InterruptedException e ) {
231+ executorService .shutdownNow ();
232+ Thread .currentThread ().interrupt ();
233+ }
160234 }
161- in .close ();
162- con .disconnect ();
163- return new ByteArrayInputStream (content .toString ().getBytes ());
164235 }
165236}
0 commit comments