Skip to content

Commit 5f5d0cb

Browse files
authored
Support protobuf native schema (#486)
1 parent 3f4b619 commit 5f5d0cb

13 files changed

Lines changed: 1066 additions & 84 deletions

.eslintignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@
1919

2020
examples
2121
perf
22+
tests/protobuf_schema/generated

index.d.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,21 @@ export interface SchemaInfo {
225225
properties?: Record<string, string>;
226226
}
227227

228+
export namespace ProtobufNativeSchema {
229+
interface CreateSchemaInfoFromRootOptions {
230+
root: any;
231+
rootMessageTypeName: string;
232+
rootFileDescriptorName: string;
233+
schemaType?: 'ProtobufNative';
234+
syntax?: 'proto3' | 'proto2' | string;
235+
name?: string;
236+
properties?: Record<string, string>;
237+
}
238+
239+
function createRootFromJson(rootJson: Record<string, unknown>): any;
240+
function createSchemaInfoFromRoot(options: CreateSchemaInfoFromRootOptions): SchemaInfo;
241+
}
242+
228243
export interface DeadLetterPolicy {
229244
deadLetterTopic: string;
230245
maxRedeliverCount?: number;
@@ -385,6 +400,7 @@ export type SchemaType =
385400
'Float32' |
386401
'Float64' |
387402
'KeyValue' |
403+
'ProtobufNative' |
388404
'Bytes' |
389405
'AutoConsume' |
390406
'AutoPublish';

index.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const AuthenticationToken = require('./src/AuthenticationToken');
2424
const AuthenticationOauth2 = require('./src/AuthenticationOauth2');
2525
const AuthenticationBasic = require('./src/AuthenticationBasic');
2626
const Client = require('./src/Client');
27+
const ProtobufNativeSchema = require('./src/ProtobufNativeSchema');
2728

2829
const LogLevel = {
2930
DEBUG: 0,
@@ -43,6 +44,7 @@ const Pulsar = {
4344
AuthenticationToken,
4445
AuthenticationOauth2,
4546
AuthenticationBasic,
47+
ProtobufNativeSchema,
4648
LogLevel,
4749
};
4850

package-lock.json

Lines changed: 21 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@
6666
"dependencies": {
6767
"@mapbox/node-pre-gyp": "^2.0.3",
6868
"bindings": "^1.5.0",
69-
"node-addon-api": "^4.3.0"
69+
"node-addon-api": "^4.3.0",
70+
"protobufjs": "^8.4.2"
7071
},
7172
"binary": {
7273
"module_name": "pulsar",

src/ProtobufNativeSchema.js

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
const protobuf = require('protobufjs');
21+
const descriptor = require('protobufjs/ext/descriptor');
22+
23+
const normalizeTypeName = (typeName) => typeName.replace(/^\./, '');
24+
25+
const createSchemaInfoFromRoot = ({
26+
root,
27+
rootMessageTypeName,
28+
rootFileDescriptorName,
29+
schemaType = 'ProtobufNative',
30+
syntax = 'proto3',
31+
name = rootMessageTypeName,
32+
properties = {},
33+
}) => {
34+
if (!root) {
35+
throw new Error('root is required');
36+
}
37+
if (!rootMessageTypeName) {
38+
throw new Error('rootMessageTypeName is required');
39+
}
40+
if (!rootFileDescriptorName) {
41+
throw new Error('rootFileDescriptorName is required');
42+
}
43+
44+
const normalizedTypeName = normalizeTypeName(rootMessageTypeName);
45+
const rootMessageType = root.lookupType(normalizedTypeName);
46+
const packageName = normalizedTypeName.split('.').slice(0, -1).join('.');
47+
const namespace = packageName ? root.lookup(packageName) : root;
48+
49+
// protobufjs reflection JSON does not retain the source file name. Set it
50+
// before exporting a FileDescriptorSet, mirroring descriptor->file()->name().
51+
namespace.filename = rootFileDescriptorName;
52+
root.resolveAll();
53+
54+
const fileDescriptorSet = root.toDescriptor(syntax);
55+
const fileDescriptorSetBytes = descriptor.FileDescriptorSet.encode(fileDescriptorSet).finish();
56+
57+
return {
58+
schemaType,
59+
name,
60+
schema: JSON.stringify({
61+
fileDescriptorSet: Buffer.from(fileDescriptorSetBytes).toString('base64'),
62+
rootMessageTypeName: normalizeTypeName(rootMessageType.fullName),
63+
rootFileDescriptorName,
64+
}),
65+
properties,
66+
};
67+
};
68+
69+
const createRootFromJson = (rootJson) => protobuf.Root.fromJSON(rootJson);
70+
71+
module.exports = {
72+
createRootFromJson,
73+
createSchemaInfoFromRoot,
74+
};

src/SchemaInfo.cc

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,35 +17,53 @@
1717
* under the License.
1818
*/
1919
#include "SchemaInfo.h"
20+
#include <pulsar/ConsumerConfiguration.h>
21+
#include <pulsar/ProducerConfiguration.h>
2022
#include <map>
2123

2224
static const std::string CFG_SCHEMA_TYPE = "schemaType";
2325
static const std::string CFG_NAME = "name";
2426
static const std::string CFG_SCHEMA = "schema";
2527
static const std::string CFG_PROPS = "properties";
2628

27-
static const std::map<std::string, pulsar_schema_type> SCHEMA_TYPE = {{"None", pulsar_None},
28-
{"String", pulsar_String},
29-
{"Json", pulsar_Json},
30-
{"Protobuf", pulsar_Protobuf},
31-
{"Avro", pulsar_Avro},
32-
{"Boolean", pulsar_Boolean},
33-
{"Int8", pulsar_Int8},
34-
{"Int16", pulsar_Int16},
35-
{"Int32", pulsar_Int32},
36-
{"Int64", pulsar_Int64},
37-
{"Float32", pulsar_Float32},
38-
{"Float64", pulsar_Float64},
39-
{"KeyValue", pulsar_KeyValue},
40-
{"Bytes", pulsar_Bytes},
41-
{"AutoConsume", pulsar_AutoConsume},
42-
{"AutoPublish", pulsar_AutoPublish}};
29+
struct _pulsar_producer_configuration {
30+
pulsar::ProducerConfiguration conf;
31+
};
4332

44-
SchemaInfo::SchemaInfo(const Napi::Object &schemaInfo) : cSchemaType(pulsar_Bytes), name("BYTES"), schema() {
45-
this->cProperties = pulsar_string_map_create();
33+
struct _pulsar_consumer_configuration {
34+
pulsar::ConsumerConfiguration consumerConfiguration;
35+
};
36+
37+
static const std::map<std::string, pulsar::SchemaType> SCHEMA_TYPE = {
38+
{"None", static_cast<pulsar::SchemaType>(0)},
39+
{"String", static_cast<pulsar::SchemaType>(1)},
40+
{"Json", static_cast<pulsar::SchemaType>(2)},
41+
{"Protobuf", static_cast<pulsar::SchemaType>(3)},
42+
{"Avro", static_cast<pulsar::SchemaType>(4)},
43+
{"Boolean", static_cast<pulsar::SchemaType>(5)},
44+
{"Int8", static_cast<pulsar::SchemaType>(6)},
45+
{"Int16", static_cast<pulsar::SchemaType>(7)},
46+
{"Int32", static_cast<pulsar::SchemaType>(8)},
47+
{"Int64", static_cast<pulsar::SchemaType>(9)},
48+
{"Float32", static_cast<pulsar::SchemaType>(10)},
49+
{"Float64", static_cast<pulsar::SchemaType>(11)},
50+
{"KeyValue", static_cast<pulsar::SchemaType>(15)},
51+
{"ProtobufNative", static_cast<pulsar::SchemaType>(20)},
52+
{"Bytes", static_cast<pulsar::SchemaType>(-1)},
53+
{"AutoConsume", static_cast<pulsar::SchemaType>(-3)},
54+
{"AutoPublish", static_cast<pulsar::SchemaType>(-4)}};
55+
56+
SchemaInfo::SchemaInfo(const Napi::Object &schemaInfo)
57+
: schemaType(static_cast<pulsar::SchemaType>(-1)), name("BYTES"), schema() {
4658
if (schemaInfo.Has(CFG_SCHEMA_TYPE) && schemaInfo.Get(CFG_SCHEMA_TYPE).IsString()) {
47-
this->name = schemaInfo.Get(CFG_SCHEMA_TYPE).ToString().Utf8Value();
48-
this->cSchemaType = SCHEMA_TYPE.at(schemaInfo.Get(CFG_SCHEMA_TYPE).ToString().Utf8Value());
59+
std::string typeStr = schemaInfo.Get(CFG_SCHEMA_TYPE).ToString().Utf8Value();
60+
auto it = SCHEMA_TYPE.find(typeStr);
61+
if (it == SCHEMA_TYPE.end()) {
62+
Napi::TypeError::New(schemaInfo.Env(), "Unknown schemaType: " + typeStr).ThrowAsJavaScriptException();
63+
return;
64+
}
65+
this->name = typeStr;
66+
this->schemaType = it->second;
4967
}
5068
if (schemaInfo.Has(CFG_NAME) && schemaInfo.Get(CFG_NAME).IsString()) {
5169
this->name = schemaInfo.Get(CFG_NAME).ToString().Utf8Value();
@@ -60,19 +78,19 @@ SchemaInfo::SchemaInfo(const Napi::Object &schemaInfo) : cSchemaType(pulsar_Byte
6078
for (int i = 0; i < size; i++) {
6179
Napi::String key = arr.Get(i).ToString();
6280
Napi::String value = propObj.Get(key).ToString();
63-
pulsar_string_map_put(this->cProperties, key.Utf8Value().c_str(), value.Utf8Value().c_str());
81+
this->properties[key.Utf8Value()] = value.Utf8Value();
6482
}
6583
}
6684
}
6785

6886
void SchemaInfo::SetProducerSchema(std::shared_ptr<pulsar_producer_configuration_t> cProducerConfiguration) {
69-
pulsar_producer_configuration_set_schema_info(cProducerConfiguration.get(), this->cSchemaType,
70-
this->name.c_str(), this->schema.c_str(), this->cProperties);
87+
cProducerConfiguration->conf.setSchema(
88+
pulsar::SchemaInfo(this->schemaType, this->name, this->schema, this->properties));
7189
}
7290

7391
void SchemaInfo::SetConsumerSchema(std::shared_ptr<pulsar_consumer_configuration_t> cConsumerConfiguration) {
74-
pulsar_consumer_configuration_set_schema_info(cConsumerConfiguration.get(), this->cSchemaType,
75-
this->name.c_str(), this->schema.c_str(), this->cProperties);
92+
cConsumerConfiguration->consumerConfiguration.setSchema(
93+
pulsar::SchemaInfo(this->schemaType, this->name, this->schema, this->properties));
7694
}
7795

78-
SchemaInfo::~SchemaInfo() { pulsar_string_map_free(this->cProperties); }
96+
SchemaInfo::~SchemaInfo() {}

src/SchemaInfo.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@
2020
#ifndef SCHEMA_INFO_H
2121
#define SCHEMA_INFO_H
2222

23+
#include <map>
24+
#include <memory>
2325
#include <napi.h>
24-
#include <pulsar/c/producer_configuration.h>
26+
#include <pulsar/Schema.h>
2527
#include <pulsar/c/consumer_configuration.h>
28+
#include <pulsar/c/producer_configuration.h>
29+
#include <string>
2630

2731
class SchemaInfo {
2832
public:
@@ -32,10 +36,10 @@ class SchemaInfo {
3236
void SetConsumerSchema(std::shared_ptr<pulsar_consumer_configuration_t> cConsumerConfiguration);
3337

3438
private:
35-
pulsar_schema_type cSchemaType;
39+
pulsar::SchemaType schemaType;
3640
std::string name;
3741
std::string schema;
38-
pulsar_string_map_t *cProperties;
42+
std::map<std::string, std::string> properties;
3943
};
4044

4145
#endif

0 commit comments

Comments
 (0)