Skip to content

Commit 998bf0a

Browse files
HADOOP-19864. Cut WritableRPCEngine (#8433)
WritableRPCEngine is no more; MR TaskUmbilicalProtocol now uses protobuf. RpcWritable still exists, and ClientCache uses it, but those are vestigal uses. This is not backwards compatible as Tez uses it for their heartbeat protocol in all versions without TEZ-4708. Contains content generated by GitHub Copilot Contributed by Steve Loughran
1 parent 66a2e99 commit 998bf0a

7 files changed

Lines changed: 30 additions & 662 deletions

File tree

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.hadoop.security.token.TokenIdentifier;
5858
import org.apache.hadoop.classification.InterfaceAudience;
5959
import org.apache.hadoop.classification.InterfaceStability;
60+
import org.apache.hadoop.util.Preconditions;
6061
import org.apache.hadoop.util.ReflectionUtils;
6162
import org.apache.hadoop.util.Time;
6263

@@ -88,7 +89,7 @@ public class RPC {
8889
final static int RPC_SERVICE_CLASS_DEFAULT = 0;
8990
public enum RpcKind {
9091
RPC_BUILTIN ((short) 1), // Used for built in calls by tests
91-
RPC_WRITABLE ((short) 2), // Use WritableRpcEngine
92+
RPC_WRITABLE((short) 2), // WritableRpcEngine removed; kept for wire-level detection
9293
RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
9394
final static short MAX_SIZE = RPC_PROTOCOL_BUFFER.value; // used for array size
9495
private final short value;
@@ -221,9 +222,11 @@ static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
221222
Configuration conf) {
222223
RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
223224
if (engine == null) {
224-
Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
225-
WritableRpcEngine.class);
226-
engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
225+
Class<?> impl = conf.getClass(ENGINE_PROP + "." + protocol.getName(),
226+
null);
227+
Preconditions.checkState(impl != null,
228+
"No RPC engine configured for %s", protocol.getName());
229+
engine = (RpcEngine) ReflectionUtils.newInstance(impl, conf);
227230
PROTOCOL_ENGINES.put(protocol, engine);
228231
}
229232
return engine;

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcWritable.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,14 @@
3333
import org.apache.hadoop.thirdparty.protobuf.CodedOutputStream;
3434
import org.apache.hadoop.thirdparty.protobuf.Message;
3535

36-
// note anything marked public is solely for access by SaslRpcClient
36+
/**
37+
* Marshalling support, for hadoop shaded protobuf and legacy
38+
* protobuf 2.5.
39+
* It originally supported hadoop Writables for the WritableRPCEngine;
40+
* that is no removed. All that is retained is the name.
41+
* Anything marked public is solely for access by SaslRpcClient
42+
*/
43+
// note
3744
@InterfaceAudience.Private
3845
public abstract class RpcWritable implements Writable {
3946

@@ -66,7 +73,7 @@ public final void write(DataOutput out) throws IOException {
6673
abstract void writeTo(ResponseBuffer out) throws IOException;
6774
abstract <T> T readFrom(ByteBuffer bb) throws IOException;
6875

69-
// adapter for Writables.
76+
// adapter for Writables. Used for RPC_BUILTIN calls.
7077
static class WritableWrapper extends RpcWritable {
7178
private final Writable writable;
7279

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2939,6 +2939,13 @@ private void checkRpcHeaders(RpcRequestHeaderProto header)
29392939
private void processRpcRequest(RpcRequestHeaderProto header,
29402940
RpcWritable.Buffer buffer) throws RpcServerException,
29412941
InterruptedException {
2942+
if (header.getRpcKind() == RpcKindProto.RPC_WRITABLE) {
2943+
final String err = "WritableRpcEngine is no longer supported."
2944+
+ " Upgrade your Hadoop client to use ProtobufRpcEngine.";
2945+
LOG.warn(err + " Client: " + getHostAddress());
2946+
throw new FatalRpcServerException(
2947+
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
2948+
}
29422949
Class<? extends Writable> rpcRequestClass =
29432950
getRpcRequestWrapper(header.getRpcKind());
29442951
if (rpcRequestClass == null) {
@@ -3642,6 +3649,8 @@ private void setupResponse(RpcCall call,
36423649
call.setResponse(ByteBuffer.wrap(response));
36433650
}
36443651

3652+
3653+
36453654
private byte[] setupResponseForWritable(
36463655
RpcResponseHeaderProto header, Writable rv) throws IOException {
36473656
ResponseBuffer buf = responseBuffer.get().reset();
@@ -3660,7 +3669,6 @@ private byte[] setupResponseForWritable(
36603669
}
36613670
}
36623671

3663-
36643672
// writing to a pre-allocated array is the most efficient way to construct
36653673
// a protobuf response.
36663674
private byte[] setupResponseForProtobuf(

0 commit comments

Comments
 (0)