-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathStreamMessage.java
More file actions
48 lines (42 loc) · 1.46 KB
/
Copy pathStreamMessage.java
File metadata and controls
48 lines (42 loc) · 1.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.danubemessaging.client.model;
import danube.DanubeApi;
import java.util.Map;
/**
* Message delivered by a consumer receive stream.
*/
public record StreamMessage(
long requestId,
MessageId messageId,
byte[] payload,
long publishTime,
String producerName,
String subscriptionName,
Map<String, String> attributes,
Long schemaId,
Integer schemaVersion,
String routingKey) {
public StreamMessage {
payload = payload == null ? new byte[0] : payload.clone();
attributes = attributes == null ? Map.of() : Map.copyOf(attributes);
}
@Override
public byte[] payload() {
return payload.clone();
}
public static StreamMessage fromProto(DanubeApi.StreamMessage proto) {
Long schemaId = proto.hasSchemaId() ? proto.getSchemaId() : null;
Integer schemaVersion = proto.hasSchemaVersion() ? proto.getSchemaVersion() : null;
String routingKey = proto.hasRoutingKey() ? proto.getRoutingKey() : null;
return new StreamMessage(
proto.getRequestId(),
MessageId.fromProto(proto.getMsgId()),
proto.getPayload().toByteArray(),
proto.getPublishTime(),
proto.getProducerName(),
proto.getSubscriptionName(),
proto.getAttributesMap(),
schemaId,
schemaVersion,
routingKey);
}
}