Skip to content

Commit b7d391c

Browse files
authored
[#436] feat(client,server): Introduce multi-part LocalStorageManager (#2253)
### What changes were proposed in this pull request? - Introduce a factory to create specific LocalStorageManager by config. - Introduce multiply disk LocalStorageManager. ### Why are the changes needed? Fix: #436 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Existing UTs and new added UT - Tested on our pressure test cluster. - new client -> old server ✓ - new client -> new server ✓ - old client -> new server ❌, so we have to upgraded client first, than upgrade the servers
1 parent 54611f3 commit b7d391c

40 files changed

Lines changed: 1526 additions & 248 deletions

File tree

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package io.netty.util;
19+
20+
import java.io.IOException;
21+
import java.nio.channels.WritableByteChannel;
22+
23+
import io.netty.channel.FileRegion;
24+
25+
import org.apache.uniffle.common.netty.protocol.AbstractFileRegion;
26+
27+
public class CompositeFileRegion extends AbstractFileRegion {
28+
private final FileRegion[] regions;
29+
private long totalSize = 0;
30+
private long bytesTransferred = 0;
31+
32+
public CompositeFileRegion(FileRegion... regions) {
33+
this.regions = regions;
34+
for (FileRegion region : regions) {
35+
totalSize += region.count();
36+
}
37+
}
38+
39+
@Override
40+
public long position() {
41+
return bytesTransferred;
42+
}
43+
44+
@Override
45+
public long count() {
46+
return totalSize;
47+
}
48+
49+
@Override
50+
public long transferTo(WritableByteChannel target, long position) throws IOException {
51+
long totalBytesTransferred = 0;
52+
53+
for (FileRegion region : regions) {
54+
if (position >= region.count()) {
55+
position -= region.count();
56+
} else {
57+
long currentBytesTransferred = region.transferTo(target, position);
58+
totalBytesTransferred += currentBytesTransferred;
59+
bytesTransferred += currentBytesTransferred;
60+
61+
if (currentBytesTransferred < region.count() - position) {
62+
break;
63+
}
64+
position = 0;
65+
}
66+
}
67+
68+
return totalBytesTransferred;
69+
}
70+
71+
@Override
72+
public long transferred() {
73+
return bytesTransferred;
74+
}
75+
76+
@Override
77+
public AbstractFileRegion retain() {
78+
super.retain();
79+
for (FileRegion region : regions) {
80+
region.retain();
81+
}
82+
return this;
83+
}
84+
85+
@Override
86+
public AbstractFileRegion retain(int increment) {
87+
super.retain(increment);
88+
for (FileRegion region : regions) {
89+
region.retain(increment);
90+
}
91+
return this;
92+
}
93+
94+
@Override
95+
public boolean release() {
96+
boolean released = super.release();
97+
for (FileRegion region : regions) {
98+
if (!region.release()) {
99+
released = false;
100+
}
101+
}
102+
return released;
103+
}
104+
105+
@Override
106+
public boolean release(int decrement) {
107+
boolean released = super.release(decrement);
108+
for (FileRegion region : regions) {
109+
if (!region.release(decrement)) {
110+
released = false;
111+
}
112+
}
113+
return released;
114+
}
115+
116+
@Override
117+
protected void deallocate() {
118+
for (FileRegion region : regions) {
119+
if (region instanceof AbstractReferenceCounted) {
120+
((AbstractReferenceCounted) region).deallocate();
121+
}
122+
}
123+
}
124+
125+
@Override
126+
public AbstractFileRegion touch() {
127+
super.touch();
128+
for (FileRegion region : regions) {
129+
region.touch();
130+
}
131+
return this;
132+
}
133+
134+
@Override
135+
public AbstractFileRegion touch(Object hint) {
136+
super.touch(hint);
137+
for (FileRegion region : regions) {
138+
region.touch(hint);
139+
}
140+
return this;
141+
}
142+
}

common/src/main/java/org/apache/uniffle/common/ShuffleDataSegment.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,15 @@
2727
public class ShuffleDataSegment {
2828
private final long offset;
2929
private final int length;
30+
31+
private final int storageId;
3032
private final List<BufferSegment> bufferSegments;
3133

32-
public ShuffleDataSegment(long offset, int length, List<BufferSegment> bufferSegments) {
34+
public ShuffleDataSegment(
35+
long offset, int length, int storageId, List<BufferSegment> bufferSegments) {
3336
this.offset = offset;
3437
this.length = length;
38+
this.storageId = storageId;
3539
this.bufferSegments = bufferSegments;
3640
}
3741

@@ -46,4 +50,8 @@ public int getLength() {
4650
public List<BufferSegment> getBufferSegments() {
4751
return bufferSegments;
4852
}
53+
54+
public int getStorageId() {
55+
return storageId;
56+
}
4957
}

common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@
3030

3131
public class ShuffleIndexResult {
3232
private static final Logger LOG = LoggerFactory.getLogger(ShuffleIndexResult.class);
33+
private static final int[] DEFAULT_STORAGE_IDS = new int[] {0};
3334

3435
private final ManagedBuffer buffer;
36+
private final int[] storageIds;
3537
private long dataFileLen;
3638
private String dataFileName;
3739

@@ -44,15 +46,28 @@ public ShuffleIndexResult(byte[] data, long dataFileLen) {
4446
}
4547

4648
public ShuffleIndexResult(ByteBuffer data, long dataFileLen) {
47-
this.buffer =
48-
new NettyManagedBuffer(data != null ? Unpooled.wrappedBuffer(data) : Unpooled.EMPTY_BUFFER);
49-
this.dataFileLen = dataFileLen;
49+
this(
50+
new NettyManagedBuffer(data != null ? Unpooled.wrappedBuffer(data) : Unpooled.EMPTY_BUFFER),
51+
dataFileLen,
52+
null,
53+
DEFAULT_STORAGE_IDS);
5054
}
5155

5256
public ShuffleIndexResult(ManagedBuffer buffer, long dataFileLen, String dataFileName) {
57+
this(buffer, dataFileLen, dataFileName, DEFAULT_STORAGE_IDS);
58+
}
59+
60+
public ShuffleIndexResult(
61+
ManagedBuffer buffer, long dataFileLen, String dataFileName, int storageId) {
62+
this(buffer, dataFileLen, dataFileName, new int[] {storageId});
63+
}
64+
65+
public ShuffleIndexResult(
66+
ManagedBuffer buffer, long dataFileLen, String dataFileName, int[] storageIds) {
5367
this.buffer = buffer;
5468
this.dataFileLen = dataFileLen;
5569
this.dataFileName = dataFileName;
70+
this.storageIds = storageIds;
5671
}
5772

5873
public byte[] getData() {
@@ -99,4 +114,8 @@ public ManagedBuffer getManagedBuffer() {
99114
public String getDataFileName() {
100115
return dataFileName;
101116
}
117+
118+
public int[] getStorageIds() {
119+
return storageIds;
120+
}
102121
}

common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) thro
8686
header.writeInt(bodyLength);
8787
in.encode(header);
8888
if (header.writableBytes() != 0) {
89-
throw new RssException("header's writable bytes should be 0");
89+
throw new RssException(
90+
"header's writable bytes should be 0, but it is " + header.writableBytes());
9091
}
9192

9293
if (body != null) {
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.uniffle.common.netty.buffer;
19+
20+
import java.nio.ByteBuffer;
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
24+
import io.netty.buffer.ByteBuf;
25+
import io.netty.buffer.Unpooled;
26+
import io.netty.channel.FileRegion;
27+
import io.netty.util.CompositeFileRegion;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
/** A wrapper of multiple {@link FileSegmentManagedBuffer}, used for combine shuffle index files. */
32+
public class MultiFileSegmentManagedBuffer extends ManagedBuffer {
33+
34+
private static final Logger LOG = LoggerFactory.getLogger(MultiFileSegmentManagedBuffer.class);
35+
private final List<ManagedBuffer> managedBuffers;
36+
37+
public MultiFileSegmentManagedBuffer(List<ManagedBuffer> managedBuffers) {
38+
this.managedBuffers = managedBuffers;
39+
}
40+
41+
@Override
42+
public int size() {
43+
return managedBuffers.stream().mapToInt(ManagedBuffer::size).sum();
44+
}
45+
46+
@Override
47+
public ByteBuf byteBuf() {
48+
return Unpooled.wrappedBuffer(this.nioByteBuffer());
49+
}
50+
51+
@Override
52+
public ByteBuffer nioByteBuffer() {
53+
ByteBuffer merged = ByteBuffer.allocate(size());
54+
for (ManagedBuffer managedBuffer : managedBuffers) {
55+
ByteBuffer buffer = managedBuffer.nioByteBuffer();
56+
merged.put(buffer.slice());
57+
}
58+
merged.flip();
59+
return merged;
60+
}
61+
62+
@Override
63+
public ManagedBuffer retain() {
64+
return this;
65+
}
66+
67+
@Override
68+
public ManagedBuffer release() {
69+
return this;
70+
}
71+
72+
@Override
73+
public Object convertToNetty() {
74+
List<FileRegion> fileRegions = new ArrayList<>(managedBuffers.size());
75+
for (ManagedBuffer managedBuffer : managedBuffers) {
76+
Object object = managedBuffer.convertToNetty();
77+
if (object instanceof FileRegion) {
78+
fileRegions.add((FileRegion) object);
79+
}
80+
}
81+
return new CompositeFileRegion(fileRegions.toArray(new FileRegion[0]));
82+
}
83+
}

common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class GetLocalShuffleDataRequest extends RequestMessage {
3030
private long offset;
3131
private int length;
3232
private long timestamp;
33+
private int storageId;
3334

3435
public GetLocalShuffleDataRequest(
3536
long requestId,
@@ -41,6 +42,30 @@ public GetLocalShuffleDataRequest(
4142
long offset,
4243
int length,
4344
long timestamp) {
45+
this(
46+
requestId,
47+
appId,
48+
shuffleId,
49+
partitionId,
50+
partitionNumPerRange,
51+
partitionNum,
52+
offset,
53+
length,
54+
-1,
55+
timestamp);
56+
}
57+
58+
protected GetLocalShuffleDataRequest(
59+
long requestId,
60+
String appId,
61+
int shuffleId,
62+
int partitionId,
63+
int partitionNumPerRange,
64+
int partitionNum,
65+
long offset,
66+
int length,
67+
int storageId,
68+
long timestamp) {
4469
super(requestId);
4570
this.appId = appId;
4671
this.shuffleId = shuffleId;
@@ -49,6 +74,7 @@ public GetLocalShuffleDataRequest(
4974
this.partitionNum = partitionNum;
5075
this.offset = offset;
5176
this.length = length;
77+
this.storageId = storageId;
5278
this.timestamp = timestamp;
5379
}
5480

@@ -132,6 +158,10 @@ public long getTimestamp() {
132158
return timestamp;
133159
}
134160

161+
public int getStorageId() {
162+
return storageId;
163+
}
164+
135165
@Override
136166
public String getOperationType() {
137167
return "getLocalShuffleData";

0 commit comments

Comments
 (0)