Skip to content

Commit 71c7f1b

Browse files
committed
Run code formatter.
1 parent 7cb3761 commit 71c7f1b

19 files changed

Lines changed: 335 additions & 298 deletions

contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public IPFSCompat(String host, int port, String version, boolean ssl, int readTi
6868
this.host = host;
6969
this.port = port;
7070

71-
if(ssl) {
71+
if (ssl) {
7272
this.protocol = "https";
7373
} else {
7474
this.protocol = "http";
@@ -81,15 +81,16 @@ public IPFSCompat(String host, int port, String version, boolean ssl, int readTi
8181
/**
8282
* Resolve names to IPFS CIDs.
8383
* See <a href="https://docs.ipfs.io/reference/http/api/#api-v0-resolve">resolve in IPFS doc</a>.
84-
* @param scheme the scheme of the name to resolve, usually IPFS or IPNS
85-
* @param path the path to the object
84+
*
85+
* @param scheme the scheme of the name to resolve, usually IPFS or IPNS
86+
* @param path the path to the object
8687
* @param recursive whether recursively resolve names until it is a IPFS CID
8788
* @return a Map of JSON object, with the result as the value of key "Path"
8889
*/
8990
public Map resolve(String scheme, String path, boolean recursive) {
9091
AtomicReference<Map> ret = new AtomicReference<>();
9192
getObjectStream(
92-
"resolve?arg=/" + scheme+"/"+path +"&r="+recursive,
93+
"resolve?arg=/" + scheme + "/" + path + "&r=" + recursive,
9394
res -> {
9495
ret.set((Map) res);
9596
return true;
@@ -120,15 +121,16 @@ public enum DHTQueryEventType {
120121
// Adding a peer to the query.
121122
AddingPeer,
122123
// Dialing a peer.
123-
DialingPeer;
124+
DialingPeer
124125
}
125126

126127
public class DHT {
127128
/**
128129
* Find internet addresses of a given peer.
129130
* See <a href="https://docs.ipfs.io/reference/http/api/#api-v0-dht-findpeer">dht/findpeer in IPFS doc</a>.
130-
* @param id the id of the peer to query
131-
* @param timeout timeout value in seconds
131+
*
132+
* @param id the id of the peer to query
133+
* @param timeout timeout value in seconds
132134
* @param executor executor
133135
* @return List of Multiaddresses of the peer
134136
*/
@@ -142,7 +144,7 @@ public List<String> findpeerListTimeout(Multihash id, int timeout, ExecutorServi
142144
if (peer == null) {
143145
return false;
144146
}
145-
if ( (int) peer.get("Type") != DHTQueryEventType.FinalPeer.ordinal() ) {
147+
if ((int) peer.get("Type") != DHTQueryEventType.FinalPeer.ordinal()) {
146148
return false;
147149
}
148150
List<Map> responses = (List<Map>) peer.get("Responses");
@@ -176,8 +178,9 @@ public List<String> findpeerListTimeout(Multihash id, int timeout, ExecutorServi
176178
/**
177179
* Find providers of a given CID.
178180
* See <a href="https://docs.ipfs.io/reference/http/api/#api-v0-dht-findprovs">dht/findprovs in IPFS doc</a>.
179-
* @param id the CID of the IPFS object
180-
* @param timeout timeout value in seconds
181+
*
182+
* @param id the CID of the IPFS object
183+
* @param timeout timeout value in seconds
181184
* @param executor executor
182185
* @return List of Multihash of providers of the object
183186
*/
@@ -188,10 +191,10 @@ public List<String> findprovsListTimeout(Multihash id, int maxPeers, int timeout
188191
timeout,
189192
res -> {
190193
Map peer = (Map) res;
191-
if ( peer == null ) {
194+
if (peer == null) {
192195
return false;
193196
}
194-
if ( (int) peer.get("Type") != DHTQueryEventType.Provider.ordinal() ) {
197+
if ((int) peer.get("Type") != DHTQueryEventType.Provider.ordinal()) {
195198
return false;
196199
}
197200
List<Map> responses = (List<Map>) peer.get("Responses");
@@ -227,30 +230,31 @@ public class Name {
227230
/**
228231
* Resolve a IPNS name.
229232
* See <a href="https://docs.ipfs.io/reference/http/api/#api-v0-name-resolve">name/resolve in IPFS doc</a>.
230-
* @param hash the IPNS name to resolve
231-
* @param timeout timeout value in seconds
233+
*
234+
* @param hash the IPNS name to resolve
235+
* @param timeout timeout value in seconds
232236
* @param executor executor
233237
* @return a Multihash of resolved name
234238
*/
235239
public Optional<String> resolve(Multihash hash, int timeout, ExecutorService executor) {
236240
AtomicReference<String> ret = new AtomicReference<>();
237241
timeLimitedExec(
238-
"name/resolve?arg=" + hash,
239-
timeout,
240-
res -> {
241-
Map peer = (Map) res;
242-
if (peer != null) {
243-
ret.set((String) peer.get(("Path")));
244-
return true;
245-
}
246-
return false;
247-
},
248-
err -> {
249-
if (!(err instanceof TimeoutException)) {
250-
throw new RuntimeException(err);
251-
}
252-
},
253-
executor
242+
"name/resolve?arg=" + hash,
243+
timeout,
244+
res -> {
245+
Map peer = (Map) res;
246+
if (peer != null) {
247+
ret.set((String) peer.get(("Path")));
248+
return true;
249+
}
250+
return false;
251+
},
252+
err -> {
253+
if (!(err instanceof TimeoutException)) {
254+
throw new RuntimeException(err);
255+
}
256+
},
257+
executor
254258
);
255259
return Optional.ofNullable(ret.get());
256260
}
@@ -259,8 +263,8 @@ public Optional<String> resolve(Multihash hash, int timeout, ExecutorService exe
259263
private void timeLimitedExec(String path, int timeout, Predicate<Object> processor, Consumer<Exception> error,
260264
ExecutorService executor) {
261265
CompletableFuture<Void> f = CompletableFuture.runAsync(
262-
()-> getObjectStream(path, processor, error),
263-
executor
266+
() -> getObjectStream(path, processor, error),
267+
executor
264268
);
265269
try {
266270
f.get(timeout, TimeUnit.SECONDS);
@@ -271,7 +275,7 @@ private void timeLimitedExec(String path, int timeout, Predicate<Object> process
271275
}
272276

273277
private void getObjectStream(String path, Predicate<Object> processor, Consumer<Exception> error) {
274-
byte LINE_FEED = (byte)10;
278+
byte LINE_FEED = (byte) 10;
275279

276280
try {
277281
InputStream in = getStream(path);

contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public IPFSPeer load(Multihash key) {
6464
.build(new CacheLoader<Multihash, List<Multihash>>() {
6565
@Override
6666
public List<Multihash> load(Multihash key) {
67-
return ipfsHelper.findprovsTimeout(key);
67+
return ipfsHelper.findprovsTimeout(key);
6868
}
6969
});
7070
}

contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public IPFSGroupScan(IPFSContext ipfsContext,
106106
this.ipfsScanSpec = ipfsScanSpec;
107107
this.config = ipfsContext.getStoragePluginConfig();
108108
logger.debug("GroupScan constructor called with columns {}", columns);
109-
this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns;
109+
this.columns = columns == null || columns.size() == 0 ? ALL_COLUMNS : columns;
110110
init();
111111
}
112112

@@ -251,7 +251,7 @@ public ScanStats getScanStats() {
251251
}
252252

253253
@Override
254-
public IPFSGroupScan clone(List<SchemaPath> columns){
254+
public IPFSGroupScan clone(List<SchemaPath> columns) {
255255
logger.debug("IPFSGroupScan clone {}", columns);
256256
IPFSGroupScan cloned = new IPFSGroupScan(this);
257257
cloned.columns = columns;
@@ -299,7 +299,9 @@ public IPFSWork(Multihash root) {
299299
this.partialRoot = root;
300300
}
301301

302-
public Multihash getPartialRootHash() {return partialRoot;}
302+
public Multihash getPartialRootHash() {
303+
return partialRoot;
304+
}
303305

304306
public void setOnEndpoint(DrillbitEndpoint endpointAddress) {
305307
this.onEndpoint = endpointAddress;
@@ -340,12 +342,12 @@ static class IPFSTreeFlattener extends RecursiveTask<Map<Multihash, String>> {
340342

341343
public IPFSTreeFlattener(Multihash hash, boolean isProvider, IPFSContext context) {
342344
this(
343-
hash,
344-
isProvider,
345-
context.getMyself(),
346-
context.getIPFSHelper(),
347-
context.getIPFSPeerCache(),
348-
context.getProviderCache()
345+
hash,
346+
isProvider,
347+
context.getMyself(),
348+
context.getIPFSHelper(),
349+
context.getIPFSPeerCache(),
350+
context.getProviderCache()
349351
);
350352
}
351353

contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public void setMyself(IPFSPeer myself) {
8989
/**
9090
* Set maximum number of providers per leaf node. The more providers, the more time it takes to do DHT queries, while
9191
* it is more likely we can find an optimal peer.
92+
*
9293
* @param maxPeersPerLeaf max number of providers to search per leaf node
9394
*/
9495
public void setMaxPeersPerLeaf(int maxPeersPerLeaf) {
@@ -113,7 +114,7 @@ public List<Multihash> findprovsTimeout(Multihash id) {
113114
public List<MultiAddress> findpeerTimeout(Multihash peerId) {
114115
// trying to resolve addresses of a node itself will always hang
115116
// so we treat it specially
116-
if(peerId.equals(myself.getId())) {
117+
if (peerId.equals(myself.getId())) {
117118
return myself.getMultiAddresses();
118119
}
119120

@@ -180,7 +181,7 @@ public Multihash resolve(String prefix, String path, boolean recursive) {
180181
}
181182

182183
@FunctionalInterface
183-
public interface ThrowingFunction<T, R, E extends Exception>{
184+
public interface ThrowingFunction<T, R, E extends Exception> {
184185
R apply(final T in) throws E;
185186
}
186187

@@ -192,12 +193,13 @@ public interface ThrowingSupplier<R, E extends Exception> {
192193
/**
193194
* Execute a time-critical operation op within time timeout. Causes the query to fail completely
194195
* if the operation times out.
195-
* @param op a Function that represents the operation to perform
196-
* @param in the parameter for op
196+
*
197+
* @param op a Function that represents the operation to perform
198+
* @param in the parameter for op
197199
* @param timeout consider the execution has timed out after this amount of time in seconds
198-
* @param <T> Input type
199-
* @param <R> Return type
200-
* @param <E> Type of checked exception op throws
200+
* @param <T> Input type
201+
* @param <R> Return type
202+
* @param <E> Type of checked exception op throws
201203
* @return R the result of the operation
202204
* @throws E when the function throws an E
203205
*/
@@ -228,9 +230,11 @@ private <R, E extends Exception> R timedFailure(Callable<R> task, int timeout, T
228230
* DRILL-7753: implement a more advanced algorithm that picks optimal addresses. Maybe check reachability, latency
229231
* and bandwidth?
230232
*/
233+
231234
/**
232235
* Choose a peer's network address from its advertised Multiaddresses.
233236
* Prefer globally routable address over local addresses.
237+
*
234238
* @param peerAddrs Multiaddresses obtained from IPFS.DHT.findprovs
235239
* @return network address
236240
*/
@@ -262,6 +266,7 @@ public Optional<String> getPeerDrillHostname(Multihash peerId) {
262266

263267
/**
264268
* Check if an IPFS peer is also running a Drillbit so that it can be used to execute a part of a query.
269+
*
265270
* @param peerId the id of the peer
266271
* @return if the peer is Drill-ready
267272
*/
@@ -287,8 +292,9 @@ public Optional<Multihash> getIPNSDataHash(Multihash peerId) {
287292

288293
/**
289294
* Get from IPFS data under a peer's ID, i.e. the data identified by /ipfs/{ID}/key.
295+
*
290296
* @param peerId the peer's ID
291-
* @param key key
297+
* @param key key
292298
* @return data in bytes
293299
*/
294300
private Optional<byte[]> getPeerData(Multihash peerId, String key) {
@@ -300,7 +306,7 @@ private Optional<byte[]> getPeerData(Multihash peerId, String key) {
300306
for (MerkleNode link : links.get()) {
301307
if (link.name.equals(Optional.of(key))) {
302308
try {
303-
byte[] result = timedFailure(client.object::data, link.hash,timeouts.get(FETCH_DATA));
309+
byte[] result = timedFailure(client.object::data, link.hash, timeouts.get(FETCH_DATA));
304310
return Optional.of(result);
305311
} catch (IOException e) {
306312
return Optional.empty();
@@ -312,6 +318,7 @@ private Optional<byte[]> getPeerData(Multihash peerId, String key) {
312318

313319
/**
314320
* Get all the links under a peer's ID.
321+
*
315322
* @param peerId peer's ID
316323
* @return List of links
317324
*/
@@ -327,11 +334,12 @@ private Optional<List<MerkleNode>> getPeerLinks(Multihash peerId) {
327334
client.object::get,
328335
Cid.decode(path),
329336
timeouts.get(FETCH_DATA)
330-
).links;
337+
).links;
331338
if (links.size() > 0) {
332339
return Optional.of(links);
333340
}
334-
} catch (IOException ignored) { }
341+
} catch (IOException ignored) {
342+
}
335343
return Optional.empty();
336344
}
337345
}

contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSJSONReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ public void addContext(UserException.Builder builder) {
8080
String rootJson = new String(rawDataBytes);
8181
int start = rootJson.indexOf("{");
8282
int end = rootJson.lastIndexOf("}");
83-
rootJson = rootJson.substring(start,end+1);
84-
InputStream inStream = new ByteArrayInputStream(rootJson.getBytes());
83+
rootJson = rootJson.substring(start, end + 1);
84+
InputStream inStream = new ByteArrayInputStream(rootJson.getBytes());
8585

8686
try {
8787
jsonLoader = new JsonLoaderImpl.JsonLoaderBuilder()

contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanBatchCreator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ public IPFSJSONReaderFactory(IPFSSubScan subScan) {
9191
}
9292

9393
@Override
94-
public void bind(ManagedScanFramework framework) { }
94+
public void bind(ManagedScanFramework framework) {
95+
}
9596

9697
@Override
9798
public ManagedReader<SchemaNegotiator> next() {

0 commit comments

Comments
 (0)