Skip to content

Commit cb72133

Browse files
committed
drv: keep track of active requests
Motivation: to handle dCache pool restarts driver should keep track of submitted requests. Modification: added BerkeleyDB based db to keep track of submitted requests. Result: on start, the driver can find out which requests already submitted to CTA.
1 parent 474d5b8 commit cb72133

8 files changed

Lines changed: 295 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ queue define class -expire=0 -pending=0 -total=0 -open <hsmType> *
109109
| io-port | The TCP port offered by dCache for IO by CTA | no | - |
110110
| use-dio | Use Direct-I/O | no | `false` |
111111
| restore-success-on-close | **obsolete** | - | - |
112+
| cleanup-journal | The location of the journal file used for cleanup on start | no | - |
112113

113114
### Load balancing and failover
114115

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,13 @@
135135
<version>2.2</version>
136136
<scope>test</scope>
137137
</dependency>
138+
<dependency>
139+
<groupId>com.sleepycat</groupId>
140+
<artifactId>je</artifactId>
141+
<version>7.5.11</version>
142+
<!-- direct dependency in dCache -->
143+
<scope>provided</scope>
144+
</dependency>
138145
</dependencies>
139146

140147
<build>
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.dcache.nearline.cta;
2+
3+
import ch.cern.cta.rpc.CtaRpcGrpc;
4+
import ch.cern.cta.rpc.Request;
5+
6+
import java.util.function.BiFunction;
7+
8+
/**
9+
* Interface to persistent storage for pending requests.
10+
* <p>
11+
* The new requests are added to the list and removed on completion.
12+
* On submit, pool try to add the request to the list. And if already exist,
13+
* the pending request will cancel (delete) in CTA before submission.
14+
*/
15+
public interface CleanupJournal extends AutoCloseable {
16+
void cleanup(CtaRpcGrpc.CtaRpcBlockingStub cta);
17+
18+
void cleanup(CtaRpcGrpc.CtaRpcBlockingStub cta, BiFunction<String, Request, Boolean> function);
19+
20+
void put(String pnfsid, Request archiveResponse);
21+
22+
void remove(String pnfsid);
23+
24+
@Override
25+
void close();
26+
}

src/main/java/org/dcache/nearline/cta/CtaNearlineStorage.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ public class CtaNearlineStorage implements NearlineStorage {
5858

5959
private static final Logger LOGGER = LoggerFactory.getLogger(CtaNearlineStorage.class);
6060

61+
/**
62+
* Unicode character for screaming face.
63+
*/
64+
public static final String MWAI_SCREAMING = Character.toString(0x1F631);
65+
6166
public static final String CTA_INSTANCE = "cta-instance-name";
6267
public static final String CTA_ENDPOINT = "cta-frontend-addr";
6368
public static final String CTA_USER = "cta-user";
@@ -68,6 +73,7 @@ public class CtaNearlineStorage implements NearlineStorage {
6873
public static final String IO_PORT = "io-port";
6974
public static final String CTA_REQUEST_TIMEOUT = "cta-frontend-timeout";
7075
public static final String DIO = "use-dio";
76+
public static final String CTA_REQUEST_JOURNAL = "cleanup-journal";
7177

7278
protected final String type;
7379
protected final String name;
@@ -166,6 +172,11 @@ enum Action {
166172
*/
167173
private final RequestCounters<Action> requestCounters = new RequestCounters<>("CTA gRPC Requests counters");
168174

175+
/**
176+
* Pending request to CTA.
177+
*/
178+
private CleanupJournal pendingRequestsTracker;
179+
169180
public CtaNearlineStorage(String type, String name) {
170181

171182
Objects.requireNonNull(type, "HSM type is not provided");
@@ -251,18 +262,21 @@ protected FlushRequest delegate() {
251262
@Override
252263
public void failed(Exception e) {
253264
pendingRequests.remove(id);
265+
pendingRequestsTracker.remove(id);
254266
super.failed(e);
255267
}
256268

257269
@Override
258270
public void failed(int i, String s) {
259271
pendingRequests.remove(id);
272+
pendingRequestsTracker.remove(id);
260273
super.failed(i, s);
261274
}
262275

263276
@Override
264277
public void completed(Set<URI> uris) {
265278
pendingRequests.remove(id);
279+
pendingRequestsTracker.remove(id);
266280
super.completed(uris);
267281
}
268282
};
@@ -280,7 +294,9 @@ public void completed(Set<URI> uris) {
280294

281295
var cancelRequest = ctaRequestFactory.getAbortStoreRequest(ar, response);
282296

297+
pendingRequestsTracker.put(id, cancelRequest);
283298
pendingRequests.put(id, new PendingRequest(r, response.getRequestObjectstoreId(), PendingRequest.Type.FLUSH) {
299+
284300
@Override
285301
public void cancel() {
286302
try {
@@ -313,18 +329,21 @@ protected StageRequest delegate() {
313329
@Override
314330
public void failed(Exception e) {
315331
pendingRequests.remove(id);
332+
pendingRequestsTracker.remove(id);
316333
super.failed(e);
317334
}
318335

319336
@Override
320337
public void failed(int i, String s) {
321338
pendingRequests.remove(id);
339+
pendingRequestsTracker.remove(id);
322340
super.failed(i, s);
323341
}
324342

325343
@Override
326344
public void completed(Set<Checksum> checksums) {
327345
pendingRequests.remove(id);
346+
pendingRequestsTracker.remove(id);
328347
super.completed(checksums);
329348
}
330349
};
@@ -349,7 +368,9 @@ public void completed(Set<Checksum> checksums) {
349368
);
350369

351370
var cancelRequest = ctaRequestFactory.getAbortStageRequest(rr, response);
371+
pendingRequestsTracker.put(id, cancelRequest);
352372
pendingRequests.put(id, new PendingRequest(r, response.getRequestObjectstoreId(), PendingRequest.Type.STAGE) {
373+
353374
@Override
354375
public void cancel() {
355376
// on cancel send the request to CTA; on success cancel the requests
@@ -442,6 +463,7 @@ public void configure(Map<String, String> properties) throws IllegalArgumentExce
442463
String user = properties.get(CTA_USER);
443464
String group = properties.get(CTA_GROUP);
444465
String timeoutString = properties.get(CTA_REQUEST_TIMEOUT);
466+
String journal = properties.get(CTA_REQUEST_JOURNAL);
445467

446468
checkArgument(instance != null, "dCache instance name is not set.");
447469
checkArgument(endpoint != null, "CTA frontend is not set.");
@@ -485,6 +507,12 @@ public void configure(Map<String, String> properties) throws IllegalArgumentExce
485507
}
486508

487509
dio = Boolean.parseBoolean(properties.getOrDefault(DIO, "false"));
510+
511+
if (journal != null) {
512+
pendingRequestsTracker = new DigitalMwai(journal);
513+
} else {
514+
pendingRequestsTracker = new NopCleanupJournal();
515+
}
488516
}
489517

490518
@Override
@@ -526,6 +554,12 @@ public void start() {
526554
LOGGER.info("Connected to CTA frontend");
527555
}
528556
);
557+
558+
pendingRequestsTracker.cleanup(cta, (p, r) -> {
559+
LOGGER.warn("[Mwai {}]: Found pending entry for {} : {}", MWAI_SCREAMING, p, r.getNotification().getFile().getRequestObjectstoreId());
560+
return Boolean.TRUE;
561+
});
562+
529563
ctaRequestFactory = new RequestsFactory(instanceName, ctaUser, ctaGroup, dataMover);
530564
}
531565

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package org.dcache.nearline.cta;
2+
3+
4+
import static io.grpc.Status.Code.NOT_FOUND;
5+
import static java.nio.charset.StandardCharsets.UTF_8;
6+
import ch.cern.cta.rpc.CtaRpcGrpc.CtaRpcBlockingStub;
7+
import ch.cern.cta.rpc.Request;
8+
import com.sleepycat.je.Cursor;
9+
import com.sleepycat.je.CursorConfig;
10+
import com.sleepycat.je.Database;
11+
import com.sleepycat.je.DatabaseConfig;
12+
import com.sleepycat.je.DatabaseEntry;
13+
import com.sleepycat.je.Environment;
14+
import com.sleepycat.je.EnvironmentConfig;
15+
import com.sleepycat.je.OperationStatus;
16+
17+
import com.sleepycat.je.Transaction;
18+
import io.grpc.StatusRuntimeException;
19+
20+
import java.io.File;
21+
import java.util.function.BiFunction;
22+
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
27+
/**
28+
* This class provided a persistent storage for pending requests.
29+
* <p>
30+
* The new requests are added to the list and removed on completion.
31+
* On submit, pool try to add the request to the list. And if already exist,
32+
* the pending request will cancel (delete) in CTA before submission.
33+
*/
34+
public class DigitalMwai implements CleanupJournal {
35+
36+
private static final Logger LOGGER = LoggerFactory.getLogger(DigitalMwai.class);
37+
38+
private static final String CTA_PENDING_REQUEST = "archive-requests";
39+
40+
private final CursorConfig config = new CursorConfig();
41+
private final Environment env;
42+
private final DatabaseConfig dbConfig;
43+
private Database pendingRequests;
44+
45+
46+
/**
47+
* Create a new instance of {@link DigitalMwai}.
48+
*
49+
* @param path the path to the directory where the persistent storage will be created.
50+
*/
51+
DigitalMwai(String path) {
52+
53+
File dir = new File(path);
54+
55+
if (!dir.exists()) {
56+
dir.mkdirs();
57+
}
58+
59+
EnvironmentConfig envConfig = new EnvironmentConfig();
60+
envConfig.setAllowCreate(true);
61+
envConfig.setTransactional(true);
62+
envConfig.setReadOnly(false);
63+
64+
env = new Environment(dir, envConfig);
65+
66+
dbConfig = new DatabaseConfig();
67+
dbConfig.setTransactional(true);
68+
dbConfig.setAllowCreate(true);
69+
dbConfig.setReadOnly(false);
70+
71+
pendingRequests = env.openDatabase(null, CTA_PENDING_REQUEST, dbConfig);
72+
}
73+
74+
75+
@Override
76+
public void cleanup(CtaRpcBlockingStub cta) {
77+
78+
Transaction tx = env.beginTransaction(null, null);
79+
try (Cursor cursor = pendingRequests.openCursor(tx, config)) {
80+
81+
DatabaseEntry key = new DatabaseEntry();
82+
DatabaseEntry data = new DatabaseEntry();
83+
84+
/*
85+
* Remove all pending requests that are submitted before restart.
86+
*/
87+
while (cursor.getNext(key, data, null) == OperationStatus.SUCCESS) {
88+
boolean canDelete = false;
89+
try {
90+
var cancelRequest = Request.parseFrom(data.getData());
91+
switch (cancelRequest.getNotification().getWf().getEvent()) {
92+
case CLOSEW:
93+
cta.delete(cancelRequest);
94+
break;
95+
case PREPARE:
96+
cta.cancelRetrieve(cancelRequest);
97+
break;
98+
default:
99+
LOGGER.warn("Unexpected request type: {}", cancelRequest.getNotification().getWf().getEvent());
100+
}
101+
canDelete = true;
102+
} catch (StatusRuntimeException e) {
103+
if (e.getStatus().getCode() == NOT_FOUND) {
104+
canDelete = true;
105+
} else {
106+
LOGGER.error("Failed to remove pending request: {}", e.getStatus());
107+
}
108+
} catch (Exception e) {
109+
LOGGER.error("Unexpected error while removing pending request: {}", e.getMessage());
110+
}
111+
112+
if (canDelete) {
113+
cursor.delete();
114+
}
115+
}
116+
} finally {
117+
tx.commit();
118+
}
119+
}
120+
121+
@Override
122+
public void cleanup(CtaRpcBlockingStub cta, BiFunction<String, Request, Boolean> consumer) {
123+
124+
Transaction tx = env.beginTransaction(null, null);
125+
try (Cursor cursor = pendingRequests.openCursor(tx, config)) {
126+
127+
DatabaseEntry key = new DatabaseEntry();
128+
DatabaseEntry data = new DatabaseEntry();
129+
130+
/*
131+
* Remove all pending requests that are submitted before restart.
132+
*/
133+
while (cursor.getNext(key, data, null) == OperationStatus.SUCCESS) {
134+
135+
try {
136+
String id = new String(key.getData(), UTF_8);
137+
var cancelRequest = Request.parseFrom(data.getData());
138+
if (consumer.apply(id, cancelRequest)) {
139+
cursor.delete();
140+
}
141+
} catch (Exception e) {
142+
LOGGER.error("Unexpected error while removing pending request: {}", e.getMessage());
143+
}
144+
}
145+
} finally {
146+
tx.commit();
147+
}
148+
}
149+
150+
@Override
151+
public void put(String pnfsid, Request archiveResponse) {
152+
DatabaseEntry key = new DatabaseEntry(pnfsid.getBytes(UTF_8));
153+
DatabaseEntry data = new DatabaseEntry(archiveResponse.toByteArray());
154+
155+
var status = pendingRequests.putNoOverwrite(null, key, data);
156+
if (status != OperationStatus.SUCCESS) {
157+
throw new IllegalStateException("Unexpected status: " + status);
158+
}
159+
}
160+
161+
@Override
162+
public void remove(String pnfsid) {
163+
DatabaseEntry key = new DatabaseEntry(pnfsid.getBytes(UTF_8));
164+
var status = pendingRequests.delete(null, key);
165+
if (status != OperationStatus.SUCCESS) {
166+
LOGGER.error("Failed to remove persistent for {} entry: {}", pnfsid, status);
167+
}
168+
}
169+
170+
@Override
171+
public void close() {
172+
pendingRequests.close();
173+
env.close();
174+
}
175+
176+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package org.dcache.nearline.cta;
2+
3+
import ch.cern.cta.rpc.CtaRpcGrpc;
4+
import ch.cern.cta.rpc.Request;
5+
6+
import java.util.function.BiFunction;
7+
8+
public class NopCleanupJournal implements CleanupJournal {
9+
10+
@Override
11+
public void cleanup(CtaRpcGrpc.CtaRpcBlockingStub cta) {
12+
// NOP
13+
}
14+
15+
@Override
16+
public void cleanup(CtaRpcGrpc.CtaRpcBlockingStub cta, BiFunction<String, Request, Boolean> function) {
17+
// NOP
18+
}
19+
20+
@Override
21+
public void put(String pnfsid, Request archiveResponse) {
22+
// NOP
23+
}
24+
25+
@Override
26+
public void remove(String pnfsid) {
27+
// NOP
28+
}
29+
30+
@Override
31+
public void close() {
32+
// nop
33+
}
34+
}

src/main/java/org/dcache/nearline/cta/xrootd/DataMover.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,4 +231,8 @@ public Transport getTransport(String id, long ctaArchiveId) {
231231
.setReportUrl(reporterUrl)
232232
.build();
233233
}
234+
235+
public String getId() {
236+
return url;
237+
}
234238
}

0 commit comments

Comments
 (0)