|
9 | 9 | import com.google.common.net.HostAndPort; |
10 | 10 | import com.google.common.util.concurrent.ThreadFactoryBuilder; |
11 | 11 | import diskCacheV111.util.CacheException; |
| 12 | +import dmg.util.command.Command; |
12 | 13 | import io.grpc.ChannelCredentials; |
13 | 14 | import io.grpc.ConnectivityState; |
14 | 15 | import io.grpc.Deadline; |
|
27 | 28 | import java.util.Objects; |
28 | 29 | import java.util.Set; |
29 | 30 | import java.util.UUID; |
30 | | - |
| 31 | +import java.util.concurrent.Callable; |
31 | 32 | import java.util.concurrent.ConcurrentHashMap; |
32 | 33 | import java.util.concurrent.ConcurrentMap; |
33 | 34 | import java.util.concurrent.ExecutionException; |
34 | | -import ch.cern.cta.rpc.CtaRpcGrpc.CtaRpcBlockingStub; |
35 | 35 | import java.util.concurrent.Executors; |
36 | 36 | import java.util.concurrent.TimeUnit; |
| 37 | +import java.util.stream.Collectors; |
| 38 | + |
| 39 | +import ch.cern.cta.rpc.CtaRpcGrpc.CtaRpcBlockingStub; |
37 | 40 | import org.dcache.namespace.FileAttribute; |
38 | 41 | import org.dcache.nearline.cta.xrootd.DataMover; |
39 | 42 | import org.dcache.pool.nearline.spi.FlushRequest; |
@@ -249,7 +252,7 @@ public void completed(Set<URI> uris) { |
249 | 252 |
|
250 | 253 | var cancelRequest = ctaRequestFactory.getAbortStoreRequest(ar, response); |
251 | 254 |
|
252 | | - pendingRequests.put(id, new PendingRequest(r) { |
| 255 | + pendingRequests.put(id, new PendingRequest(r, response.getRequestObjectstoreId(), PendingRequest.Type.FLUSH) { |
253 | 256 | @Override |
254 | 257 | public void cancel() { |
255 | 258 | try { |
@@ -318,7 +321,7 @@ public void completed(Set<Checksum> checksums) { |
318 | 321 | ); |
319 | 322 |
|
320 | 323 | var cancelRequest = ctaRequestFactory.getAbortStageRequest(rr, response); |
321 | | - pendingRequests.put(id, new PendingRequest(r) { |
| 324 | + pendingRequests.put(id, new PendingRequest(r, response.getRequestObjectstoreId(), PendingRequest.Type.STAGE) { |
322 | 325 | @Override |
323 | 326 | public void cancel() { |
324 | 327 | // on cancel send the request to CTA; on success cancel the requests |
@@ -543,4 +546,33 @@ private CacheException asCacheException(Throwable e) { |
543 | 546 | private URI createZeroFileUri(FileAttributes attrs) { |
544 | 547 | return URI.create(type + "://" + name + "/" + attrs.getPnfsId() + "?archiveid=*"); |
545 | 548 | } |
| 549 | + |
| 550 | + @Command(name="show requests") |
| 551 | + public class ShowRequestsCommand implements Callable<String> { |
| 552 | + @Override |
| 553 | + public String call() { |
| 554 | + if (pendingRequests.isEmpty()) { |
| 555 | + return "No pending requests"; |
| 556 | + } |
| 557 | + |
| 558 | + StringBuilder sb = new StringBuilder(); |
| 559 | + sb.append("Pending requests:\n"); |
| 560 | + |
| 561 | + pendingRequests.entrySet().stream() |
| 562 | + .collect(Collectors.groupingBy(e -> e.getValue().getAction())) |
| 563 | + .forEach((type, entries) -> { |
| 564 | + sb.append(type).append(":\n"); |
| 565 | + sb.append(" count: ").append(entries.size()).append("\n"); |
| 566 | + for (var entry : entries) { |
| 567 | + sb.append(" ") |
| 568 | + .append(entry.getKey()) |
| 569 | + .append(" -> ") |
| 570 | + .append(entry.getValue().getCtaRequestId()).append("\n"); |
| 571 | + } |
| 572 | + sb.append("\n"); |
| 573 | + }); |
| 574 | + |
| 575 | + return sb.toString(); |
| 576 | + } |
| 577 | + } |
546 | 578 | } |
0 commit comments