Skip to content

Commit 56fa6f1

Browse files
author
Colm Dougan
committed
HDDS-14004. EventNotification: Capture data to the completed operation ledger table
1 parent 01b2b2a commit 56fa6f1

29 files changed

Lines changed: 617 additions & 13 deletions

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketDeleteRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
189189

190190
// Add to double buffer.
191191
omClientResponse = new OMBucketDeleteResponse(omResponse.build(),
192-
volumeName, bucketName, omVolumeArgs.copyObject());
192+
volumeName, bucketName, omVolumeArgs.copyObject(), transactionLogIndex);
193193
} catch (IOException | InvalidPathException ex) {
194194
success = false;
195195
exception = ex;

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
295295
.setOpenVersion(openVersion).build())
296296
.setCmdType(CreateFile);
297297
omClientResponse = new OMFileCreateResponse(omResponse.build(),
298-
omKeyInfo, missingParentInfos, clientID, omBucketInfo.copyObject());
298+
omKeyInfo, missingParentInfos, clientID, omBucketInfo.copyObject(),
299+
isRecursive, isOverWrite);
299300

300301
result = Result.SUCCESS;
301302
} catch (IOException | InvalidPathException ex) {

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
216216
.setCmdType(Type.CreateFile);
217217
omClientResponse = new OMFileCreateResponseWithFSO(omResponse.build(),
218218
omFileInfo, missingParentInfos, clientID,
219-
omBucketInfo.copyObject(), volumeId);
219+
omBucketInfo.copyObject(), volumeId,
220+
isRecursive, isOverWrite);
220221

221222
result = Result.SUCCESS;
222223
} catch (IOException | InvalidPathException ex) {

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestWithFSO.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,8 @@ private OMClientResponse renameKey(OmKeyInfo toKeyParent, String toKeyName,
342342
OMClientResponse omClientResponse = new OMKeyRenameResponseWithFSO(
343343
omResponse.setRenameKeyResponse(RenameKeyResponse.newBuilder()).build(),
344344
dbFromKey, dbToKey, fromKeyParent, toKeyParent, fromKeyValue,
345-
omBucketInfo, isRenameDirectory, getBucketLayout());
345+
omBucketInfo, isRenameDirectory, getBucketLayout(),
346+
fromKeyName, toKeyName);
346347
return omClientResponse;
347348
}
348349

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/bucket/OMBucketCreateResponse.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,12 @@
2626
import org.apache.hadoop.hdds.utils.db.BatchOperation;
2727
import org.apache.hadoop.ozone.om.OMMetadataManager;
2828
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
29+
import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
2930
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
3031
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
3132
import org.apache.hadoop.ozone.om.response.OMClientResponse;
3233
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
34+
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
3335

3436
/**
3537
* Response for CreateBucket request.
@@ -80,12 +82,26 @@ public void addToDBBatch(OMMetadataManager omMetadataManager,
8082
omMetadataManager.getVolumeKey(omVolumeArgs.getVolume()),
8183
omVolumeArgs);
8284
}
85+
86+
omMetadataManager.getCompletedRequestInfoTable()
87+
.putWithBatch(batchOperation, omBucketInfo.getUpdateID(),
88+
getCompletedRequestInfo(omBucketInfo.getUpdateID()));
8389
}
8490

8591
@Nullable
8692
public OmBucketInfo getOmBucketInfo() {
8793
return omBucketInfo;
8894
}
8995

96+
protected OmCompletedRequestInfo getCompletedRequestInfo(long trxnLogIndex) {
97+
return OmCompletedRequestInfo.newBuilder()
98+
.setTrxLogIndex(trxnLogIndex)
99+
.setCmdType(Type.CreateBucket)
100+
.setCreationTime(System.currentTimeMillis())
101+
.setVolumeName(omBucketInfo.getVolumeName())
102+
.setBucketName(omBucketInfo.getBucketName())
103+
.setOpArgs(new OmCompletedRequestInfo.OperationArgs.NoArgs())
104+
.build();
105+
}
90106
}
91107

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/bucket/OMBucketDeleteResponse.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424
import java.io.IOException;
2525
import org.apache.hadoop.hdds.utils.db.BatchOperation;
2626
import org.apache.hadoop.ozone.om.OMMetadataManager;
27+
import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
2728
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
2829
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
2930
import org.apache.hadoop.ozone.om.response.OMClientResponse;
3031
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
32+
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
3133

3234
/**
3335
* Response for DeleteBucket request.
@@ -38,13 +40,15 @@ public final class OMBucketDeleteResponse extends OMClientResponse {
3840
private String volumeName;
3941
private String bucketName;
4042
private final OmVolumeArgs omVolumeArgs;
43+
private final long updateID;
4144

4245
public OMBucketDeleteResponse(@Nonnull OMResponse omResponse,
43-
String volumeName, String bucketName, OmVolumeArgs volumeArgs) {
46+
String volumeName, String bucketName, OmVolumeArgs volumeArgs, long updateID) {
4447
super(omResponse);
4548
this.volumeName = volumeName;
4649
this.bucketName = bucketName;
4750
this.omVolumeArgs = volumeArgs;
51+
this.updateID = updateID;
4852
}
4953

5054
public OMBucketDeleteResponse(@Nonnull OMResponse omResponse,
@@ -53,6 +57,7 @@ public OMBucketDeleteResponse(@Nonnull OMResponse omResponse,
5357
this.volumeName = volumeName;
5458
this.bucketName = bucketName;
5559
this.omVolumeArgs = null;
60+
this.updateID = 0;
5661
}
5762

5863
/**
@@ -63,6 +68,7 @@ public OMBucketDeleteResponse(@Nonnull OMResponse omResponse) {
6368
super(omResponse);
6469
checkStatusNotOK();
6570
this.omVolumeArgs = null;
71+
this.updateID = 0;
6672
}
6773

6874
@Override
@@ -79,6 +85,12 @@ public void addToDBBatch(OMMetadataManager omMetadataManager,
7985
omMetadataManager.getVolumeKey(omVolumeArgs.getVolume()),
8086
omVolumeArgs);
8187
}
88+
89+
// Add to completed requests table.
90+
if (omVolumeArgs != null) {
91+
omMetadataManager.getCompletedRequestInfoTable()
92+
.putWithBatch(batchOperation, updateID, getCompletedRequestInfo(updateID));
93+
}
8294
}
8395

8496
public String getVolumeName() {
@@ -89,5 +101,14 @@ public String getBucketName() {
89101
return bucketName;
90102
}
91103

104+
protected OmCompletedRequestInfo getCompletedRequestInfo(long trxnLogIndex) {
105+
return OmCompletedRequestInfo.newBuilder()
106+
.setTrxLogIndex(trxnLogIndex)
107+
.setCmdType(Type.DeleteBucket)
108+
.setCreationTime(System.currentTimeMillis())
109+
.setVolumeName(volumeName)
110+
.setBucketName(bucketName)
111+
.setOpArgs(new OmCompletedRequestInfo.OperationArgs.NoArgs())
112+
.build();
113+
}
92114
}
93-

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMDirectoryCreateResponse.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@
2626
import org.apache.hadoop.ozone.om.OMMetadataManager;
2727
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
2828
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
29+
import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
2930
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
3031
import org.apache.hadoop.ozone.om.request.file.OMDirectoryCreateRequest.Result;
3132
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
3233
import org.apache.hadoop.ozone.om.response.key.OmKeyResponse;
3334
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
35+
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
3436
import org.slf4j.Logger;
3537
import org.slf4j.LoggerFactory;
3638

@@ -92,10 +94,28 @@ protected void addToDBBatch(OMMetadataManager omMetadataManager,
9294
bucketInfo.getVolumeName(), bucketInfo.getBucketName());
9395
omMetadataManager.getBucketTable().putWithBatch(batchOperation,
9496
bucketKey, bucketInfo);
97+
98+
// Add to completed requests table.
99+
omMetadataManager.getCompletedRequestInfoTable()
100+
.putWithBatch(batchOperation, dirKeyInfo.getUpdateID(),
101+
getCompletedRequestInfo(dirKeyInfo.getUpdateID()));
102+
95103
} else if (Result.DIRECTORY_ALREADY_EXISTS == result) {
96104
// When directory already exists, we don't add it to cache. And it is
97105
// not an error, in this case dirKeyInfo will be null.
98106
LOG.debug("Directory already exists. addToDBBatch is a no-op");
99107
}
100108
}
109+
110+
protected OmCompletedRequestInfo getCompletedRequestInfo(long trxnLogIndex) {
111+
return OmCompletedRequestInfo.newBuilder()
112+
.setTrxLogIndex(trxnLogIndex)
113+
.setCmdType(Type.CreateDirectory)
114+
.setCreationTime(System.currentTimeMillis())
115+
.setVolumeName(dirKeyInfo.getVolumeName())
116+
.setBucketName(dirKeyInfo.getBucketName())
117+
.setKeyName(dirKeyInfo.getKeyName())
118+
.setOpArgs(new OmCompletedRequestInfo.OperationArgs.NoArgs())
119+
.build();
120+
}
101121
}

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMFileCreateResponse.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,32 @@
2424
import java.util.List;
2525
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
2626
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
27+
import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
2728
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
2829
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
2930
import org.apache.hadoop.ozone.om.response.key.OMKeyCreateResponse;
3031
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
32+
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
3133

3234
/**
3335
* Response for crate file request.
3436
*/
3537
@CleanupTableInfo(cleanupTables = {KEY_TABLE, OPEN_KEY_TABLE})
3638
public class OMFileCreateResponse extends OMKeyCreateResponse {
3739

40+
private boolean isRecursive;
41+
private boolean isOverWrite;
42+
3843
public OMFileCreateResponse(@Nonnull OMResponse omResponse,
3944
@Nonnull OmKeyInfo omKeyInfo, @Nonnull List<OmKeyInfo> parentKeyInfos,
4045
long openKeySessionID,
41-
@Nonnull OmBucketInfo omBucketInfo) {
46+
@Nonnull OmBucketInfo omBucketInfo,
47+
boolean isRecursive, boolean isOverWrite) {
4248
super(omResponse, omKeyInfo, parentKeyInfos, openKeySessionID,
4349
omBucketInfo);
50+
51+
this.isRecursive = isRecursive;
52+
this.isOverWrite = isOverWrite;
4453
}
4554

4655
/**
@@ -53,4 +62,13 @@ public OMFileCreateResponse(@Nonnull OMResponse omResponse, @Nonnull
5362
checkStatusNotOK();
5463
}
5564

65+
@Override
66+
public Type getOperationType() {
67+
return Type.CreateFile;
68+
}
69+
70+
@Override
71+
public OmCompletedRequestInfo.OperationArgs getCompletedRequestInfoArgs() {
72+
return new OmCompletedRequestInfo.OperationArgs.CreateFileArgs(isRecursive, isOverWrite);
73+
}
5674
}

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMFileCreateResponseWithFSO.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,29 @@
4242
BUCKET_TABLE})
4343
public class OMFileCreateResponseWithFSO extends OMFileCreateResponse {
4444

45+
private static final boolean DEFAULT_IS_RECURSIVE = false;
46+
private static final boolean DEFAULT_IS_OVERWRITE = false;
47+
4548
private List<OmDirectoryInfo> parentDirInfos;
4649
private long volumeId;
4750

4851
public OMFileCreateResponseWithFSO(@Nonnull OMResponse omResponse,
4952
@Nonnull OmKeyInfo omKeyInfo,
5053
@Nonnull List<OmDirectoryInfo> parentDirInfos, long openKeySessionID,
5154
@Nonnull OmBucketInfo omBucketInfo, @Nonnull long volumeId) {
55+
this(omResponse, omKeyInfo, parentDirInfos, openKeySessionID,
56+
omBucketInfo, volumeId, DEFAULT_IS_RECURSIVE,
57+
DEFAULT_IS_OVERWRITE);
58+
}
59+
60+
@SuppressWarnings("checkstyle:ParameterNumber")
61+
public OMFileCreateResponseWithFSO(@Nonnull OMResponse omResponse,
62+
@Nonnull OmKeyInfo omKeyInfo,
63+
@Nonnull List<OmDirectoryInfo> parentDirInfos, long openKeySessionID,
64+
@Nonnull OmBucketInfo omBucketInfo, @Nonnull long volumeId,
65+
boolean isRecursive, boolean isOverWrite) {
5266
super(omResponse, omKeyInfo, new ArrayList<>(), openKeySessionID,
53-
omBucketInfo);
67+
omBucketInfo, isRecursive, isOverWrite);
5468
this.parentDirInfos = parentDirInfos;
5569
this.volumeId = volumeId;
5670
}
@@ -95,6 +109,11 @@ public void addToDBBatch(OMMetadataManager omMetadataMgr,
95109

96110
OMFileRequest.addToOpenFileTable(omMetadataMgr, batchOp, getOmKeyInfo(),
97111
getOpenKeySessionID(), volumeId, getOmBucketInfo().getObjectID());
112+
113+
// Add to completed requests table.
114+
omMetadataMgr.getCompletedRequestInfoTable()
115+
.putWithBatch(batchOp, getOmKeyInfo().getUpdateID(),
116+
getCompletedRequestInfo(getOmKeyInfo().getUpdateID()));
98117
}
99118

100119
@Override

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@
3030
import org.apache.hadoop.ozone.om.OMMetadataManager;
3131
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
3232
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
33+
import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
3334
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
3435
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
3536
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
3637
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
38+
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
3739

3840
/**
3941
* Response for CommitKey request.
@@ -105,6 +107,12 @@ public void addToDBBatch(OMMetadataManager omMetadataManager,
105107
omMetadataManager.getBucketTable().putWithBatch(batchOperation,
106108
omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(),
107109
omBucketInfo.getBucketName()), omBucketInfo);
110+
111+
// Add to completed requests table.
112+
omMetadataManager.getCompletedRequestInfoTable()
113+
.putWithBatch(batchOperation, omKeyInfo.getUpdateID(),
114+
getCompletedRequestInfo(omKeyInfo.getUpdateID()));
115+
108116
}
109117

110118
protected String getOpenKeyName() {
@@ -154,4 +162,16 @@ protected boolean isHSync() {
154162
public OmKeyInfo getNewOpenKeyInfo() {
155163
return newOpenKeyInfo;
156164
}
165+
166+
protected OmCompletedRequestInfo getCompletedRequestInfo(long trxnLogIndex) {
167+
return OmCompletedRequestInfo.newBuilder()
168+
.setTrxLogIndex(trxnLogIndex)
169+
.setCmdType(Type.CommitKey)
170+
.setCreationTime(System.currentTimeMillis())
171+
.setVolumeName(omKeyInfo.getVolumeName())
172+
.setBucketName(omKeyInfo.getBucketName())
173+
.setKeyName(omKeyInfo.getKeyName())
174+
.setOpArgs(new OmCompletedRequestInfo.OperationArgs.NoArgs())
175+
.build();
176+
}
157177
}

0 commit comments

Comments
 (0)