-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathMessageSchemaToJsonBuilder.java
More file actions
82 lines (72 loc) · 3.25 KB
/
MessageSchemaToJsonBuilder.java
File metadata and controls
82 lines (72 loc) · 3.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package io.mapsmessaging.engine.schema;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.mapsmessaging.api.MessageBuilder;
import io.mapsmessaging.api.message.Message;
import io.mapsmessaging.api.message.TypedData;
import io.mapsmessaging.rest.translation.GsonDateTimeSerialiser;
import io.mapsmessaging.schemas.config.SchemaConfig;
import io.mapsmessaging.schemas.formatters.MessageFormatter;
import io.mapsmessaging.schemas.formatters.MessageFormatterFactory;
import io.mapsmessaging.schemas.formatters.impl.RawFormatter;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Map;
public class MessageSchemaToJsonBuilder {
private final Gson gson;
public MessageSchemaToJsonBuilder() {
gson = new GsonBuilder()
.setPrettyPrinting()
.registerTypeAdapter(LocalDateTime.class, new GsonDateTimeSerialiser())
.create();
}
public Message parse(Message message, String destinationName) throws Exception {
String schemaId = message.getSchemaId();
if(schemaId != null && !destinationName.startsWith("$")) {
SchemaConfig config = SchemaManager.getInstance().getSchema(schemaId);
if(config != null) {
MessageFormatter formatter = SchemaManager.getInstance().getMessageFormatter(config);
if (formatter != null && !(formatter instanceof RawFormatter)) {
byte[] data = pack(message, config, formatter);
MessageBuilder messageBuilder = new MessageBuilder();
messageBuilder.setOpaqueData(data);
return messageBuilder.build();
}
}
}
return null;
}
private byte[] pack(Message message, SchemaConfig config, MessageFormatter formatter) throws IOException {
byte[] payload = message.getOpaqueData();
JsonObject jsonObject = formatter.parseToJson(payload);
JsonObject wrapper = new JsonObject();
wrapper.add("payload", jsonObject);
wrapper.addProperty("schemaId", config.getUniqueId());
wrapper.addProperty("schemaTitle", config.getTitle());
if(message.getDataMap() != null && !message.getDataMap().isEmpty()){
JsonObject map = new JsonObject();
for (Map.Entry<String, TypedData> entry : message.getDataMap().entrySet()) {
Object data = entry.getValue().getData();
JsonElement element = gson.toJsonTree(data);
map.add(entry.getKey(), element);
}
wrapper.add("map", map);
}
JsonObject metaObject = new JsonObject();
for (Map.Entry<String, String> meta : message.getMeta().entrySet()) {
metaObject.addProperty(meta.getKey(), meta.getValue());
}
wrapper.add("meta", metaObject);
if(message.getContentType() != null)wrapper.addProperty("content-type", message.getContentType());
if(message.getCorrelationData() != null)wrapper.addProperty("correlationId", new String(message.getCorrelationData()));
if(message.getResponseTopic() != null)wrapper.addProperty("responseTopic", message.getResponseTopic());
if(message.getCreation() != 0){
wrapper.addProperty("creation", Instant.ofEpochMilli(message.getCreation()).atZone(ZoneId.systemDefault()).toString());
}
return gson.toJson(wrapper).getBytes();
}
}