Skip to content

Commit 51d11db

Browse files
authored
feat: Support table view for C client. (apache#294)
1 parent 342aea4 commit 51d11db

9 files changed

Lines changed: 588 additions & 1 deletion

File tree

include/pulsar/TableView.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class PULSAR_PUBLIC TableView {
5151
* TableView view;
5252
* std::string value;
5353
* while (true) {
54-
* if (view.retrieveValue("key")) {
54+
* if (view.retrieveValue("key", value)) {
5555
* std::cout << "value is updated to: " << value;
5656
* } else {
5757
* // sleep for a while or print the message that value is not updated

include/pulsar/c/client.h

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
#include <pulsar/c/reader_configuration.h>
3131
#include <pulsar/c/result.h>
3232
#include <pulsar/c/string_list.h>
33+
#include <pulsar/c/table_view.h>
34+
#include <pulsar/c/table_view_configuration.h>
3335
#include <pulsar/defines.h>
3436

3537
#ifdef __cplusplus
@@ -47,6 +49,7 @@ typedef void (*pulsar_create_producer_callback)(pulsar_result result, pulsar_pro
4749

4850
typedef void (*pulsar_subscribe_callback)(pulsar_result result, pulsar_consumer_t *consumer, void *ctx);
4951
typedef void (*pulsar_reader_callback)(pulsar_result result, pulsar_reader_t *reader, void *ctx);
52+
typedef void (*pulsar_table_view_callback)(pulsar_result result, pulsar_table_view_t *tableView, void *ctx);
5053
typedef void (*pulsar_get_partitions_callback)(pulsar_result result, pulsar_string_list_t *partitions,
5154
void *ctx);
5255

@@ -172,6 +175,54 @@ PULSAR_PUBLIC void pulsar_client_create_reader_async(pulsar_client_t *client, co
172175
const pulsar_message_id_t *startMessageId,
173176
pulsar_reader_configuration_t *conf,
174177
pulsar_reader_callback callback, void *ctx);
178+
/**
179+
* Create a table view with given {@code table_view_configuration} for specified topic.
180+
*
181+
* The TableView provides a key-value map view of a compacted topic. Messages without keys will
182+
* be ignored.
183+
*
184+
* NOTE:
185+
* When the result in the callback is `ResultOk`, `*c_tableView` will point to the memory that
186+
* is allocated internally. You have to call `pulsar_table_view_free` to free it.
187+
*
188+
* Example:
189+
* ```c
190+
* pulsar_table_view_configuration_t *table_view_conf = pulsar_table_view_configuration_create();
191+
* pulsar_table_view_configuration_set_subscription_name(table_view_conf, sub_name);
192+
* pulsar_table_view_t *table_view;
193+
* pulsar_result result = pulsar_client_create_table_view(client, topic_name, table_view_conf, &table_view);
194+
*
195+
* // do something...
196+
*
197+
* pulsar_table_view_close(table_view);
198+
* pulsar_table_view_free(table_view);
199+
* pulsar_table_view_configuration_free(table_view_conf);
200+
*
201+
* ```
202+
*
203+
* @param topic The name of the topic.
204+
* @param conf The {@code table_view_configuration} pointer.
205+
* @param c_tableView The pointer of the table_view pointer
206+
* @return Returned when the table_view is successfully linked to the topic and the map is built from a
207+
* message that already exists.
208+
*/
209+
PULSAR_PUBLIC pulsar_result pulsar_client_create_table_view(pulsar_client_t *client, const char *topic,
210+
pulsar_table_view_configuration_t *conf,
211+
pulsar_table_view_t **c_tableView);
212+
213+
/**
214+
* Async Create a table view with given {@code table_view_configuration} for specified topic.
215+
* @param topic The name of the topic.
216+
* @param conf The {@code table_view_configuration} pointer.
217+
* @param callback
218+
* 1. When the result in the callback is `ResultOk`, `tableView` in the callback will point to the memory that
219+
* is allocated internally. You have to call `pulsar_table_view_free` to free it.
220+
* 2. If the result in the callback is not `ResultOk`, `tableView` in the callback will be nullptr.
221+
* @param ctx
222+
*/
223+
PULSAR_PUBLIC void pulsar_client_create_table_view_async(pulsar_client_t *client, const char *topic,
224+
pulsar_table_view_configuration_t *conf,
225+
pulsar_table_view_callback callback, void *ctx);
175226

176227
PULSAR_PUBLIC pulsar_result pulsar_client_get_topic_partitions(pulsar_client_t *client, const char *topic,
177228
pulsar_string_list_t **partitions);

include/pulsar/c/table_view.h

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#pragma once
20+
21+
#include <pulsar/defines.h>
22+
23+
#ifdef __cplusplus
24+
extern "C" {
25+
#endif
26+
27+
#include <pulsar/c/message.h>
28+
#include <pulsar/c/messages.h>
29+
#include <pulsar/c/result.h>
30+
#include <stdint.h>
31+
32+
typedef struct _pulsar_table_view pulsar_table_view_t;
33+
34+
typedef void (*pulsar_table_view_action)(const char *key, const void *value, size_t value_size, void *ctx);
35+
typedef void (*pulsar_result_callback)(pulsar_result, void *);
36+
37+
/**
38+
* Move the latest value associated with the key.
39+
*
40+
* NOTE:
41+
* 1. Once the value has been retrieved successfully,
42+
* the associated value will be removed from the table view until next time the value is updated.
43+
* 2. Once the value has been retrieved successfully, `*value` will point to the memory that is allocated
44+
* internally. You have to call `free(value)` to free it.
45+
*
46+
* Example:
47+
*
48+
* ```c
49+
* pulsar_table_view_t *table_view;
50+
* void* value;
51+
* size_t value_size;
52+
* while (true) {
53+
* if (pulsar_table_view_retrieve_value(table_view, "key", &value, &value_size)) {
54+
* for (size_t i = 0; i < value_size; i++) {
55+
* printf("0x%02x%c", ((char*) value)[i], (i + 1 == value_size) ? '\n': ' ');
56+
* }
57+
* } else {
58+
* // sleep for a while or print the message that value is not updated
59+
* }
60+
* }
61+
* free(value);
62+
* ```
63+
*
64+
* @param table_view
65+
* @param key
66+
* @param value the value associated with the key
67+
* @return true if there is an associated value of the key, otherwise false
68+
*/
69+
PULSAR_PUBLIC bool pulsar_table_view_retrieve_value(pulsar_table_view_t *table_view, const char *key,
70+
void **value, size_t *value_size);
71+
72+
/**
73+
* It's similar with `pulsar_table_view_retrieve_value` except the associated value not will be removed from
74+
* the table view.
75+
*
76+
* NOTE:
77+
* Once the value has been get successfully, `*value` will point to the memory that is allocated internally.
78+
* You have to call `free(value)` to free it.
79+
*
80+
* @param table_view
81+
* @param key
82+
* @param value the value associated with the key
83+
* @return true if there is an associated value of the key, otherwise false
84+
*/
85+
PULSAR_PUBLIC bool pulsar_table_view_get_value(pulsar_table_view_t *table_view, const char *key, void **value,
86+
size_t *value_size);
87+
88+
/**
89+
* Check if the key exists in the table view.
90+
* @param table_view
91+
* @param key
92+
* @return true if the key exists in the table view
93+
*/
94+
PULSAR_PUBLIC bool pulsar_table_view_contain_key(pulsar_table_view_t *table_view, const char *key);
95+
96+
/**
97+
* Get the size of the elements.
98+
* @param table_view
99+
* @return
100+
*/
101+
PULSAR_PUBLIC int pulsar_table_view_size(pulsar_table_view_t *table_view);
102+
103+
/**
104+
* Performs the given action for each entry in this map until all entries have been processed or the
105+
* action throws an exception.
106+
*/
107+
PULSAR_PUBLIC void pulsar_table_view_for_each(pulsar_table_view_t *table_view,
108+
pulsar_table_view_action action, void *ctx);
109+
110+
/**
111+
* Performs the given action for each entry in this map until all entries have been processed and
112+
* register the callback, which will be called each time a key-value pair is updated.
113+
*/
114+
PULSAR_PUBLIC void pulsar_table_view_for_each_add_listen(pulsar_table_view_t *table_view,
115+
pulsar_table_view_action action, void *ctx);
116+
117+
/**
118+
* Free the table view.
119+
* @param table_view
120+
*/
121+
PULSAR_PUBLIC void pulsar_table_view_free(pulsar_table_view_t *table_view);
122+
123+
/**
124+
* Close the table view and stop the broker to push more messages
125+
* @param table_view
126+
* @return
127+
*/
128+
PULSAR_PUBLIC pulsar_result pulsar_table_view_close(pulsar_table_view_t *table_view);
129+
130+
/**
131+
* Async close the table view and stop the broker to push more messages
132+
* @param table_view
133+
* @param callback
134+
* @param ctx
135+
*/
136+
PULSAR_PUBLIC void pulsar_table_view_close_async(pulsar_table_view_t *table_view,
137+
pulsar_result_callback callback, void *ctx);
138+
139+
#ifdef __cplusplus
140+
}
141+
#endif
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <pulsar/defines.h>
23+
24+
#include "producer_configuration.h"
25+
26+
#ifdef __cplusplus
27+
extern "C" {
28+
#endif
29+
30+
typedef struct _pulsar_table_view_configuration pulsar_table_view_configuration_t;
31+
32+
PULSAR_PUBLIC pulsar_table_view_configuration_t *pulsar_table_view_configuration_create();
33+
34+
PULSAR_PUBLIC void pulsar_table_view_configuration_free(pulsar_table_view_configuration_t *conf);
35+
36+
PULSAR_PUBLIC void pulsar_table_view_configuration_set_schema_info(
37+
pulsar_table_view_configuration_t *table_view_configuration_t, pulsar_schema_type schemaType,
38+
const char *name, const char *schema, pulsar_string_map_t *properties);
39+
40+
PULSAR_PUBLIC void pulsar_table_view_configuration_set_subscription_name(
41+
pulsar_table_view_configuration_t *table_view_configuration_t, const char *subscription_name);
42+
43+
PULSAR_PUBLIC const char *pulsar_table_view_configuration_get_subscription_name(
44+
pulsar_table_view_configuration_t *table_view_configuration_t);
45+
46+
#ifdef __cplusplus
47+
}
48+
#endif

lib/c/c_Client.cc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,34 @@ void pulsar_client_create_reader_async(pulsar_client_t *client, const char *topi
192192
std::bind(&handle_reader_callback, std::placeholders::_1, std::placeholders::_2, callback, ctx));
193193
}
194194

195+
pulsar_result pulsar_client_create_table_view(pulsar_client_t *client, const char *topic,
196+
pulsar_table_view_configuration_t *conf,
197+
pulsar_table_view_t **c_tableView) {
198+
pulsar::TableView tableView;
199+
pulsar::Result res = client->client->createTableView(topic, conf->tableViewConfiguration, tableView);
200+
if (res == pulsar::ResultOk) {
201+
(*c_tableView) = new pulsar_table_view_t;
202+
(*c_tableView)->tableView = std::move(tableView);
203+
return pulsar_result_Ok;
204+
}
205+
return (pulsar_result)res;
206+
}
207+
208+
void pulsar_client_create_table_view_async(pulsar_client_t *client, const char *topic,
209+
pulsar_table_view_configuration_t *conf,
210+
pulsar_table_view_callback callback, void *ctx) {
211+
client->client->createTableViewAsync(topic, conf->tableViewConfiguration,
212+
[callback, ctx](pulsar::Result result, pulsar::TableView tableView) {
213+
if (result == pulsar::ResultOk) {
214+
auto *c_tableView = new pulsar_table_view_t;
215+
c_tableView->tableView = std::move(tableView);
216+
callback((pulsar_result)result, c_tableView, ctx);
217+
} else {
218+
callback((pulsar_result)result, NULL, ctx);
219+
}
220+
});
221+
}
222+
195223
pulsar_result pulsar_client_get_topic_partitions(pulsar_client_t *client, const char *topic,
196224
pulsar_string_list_t **partitions) {
197225
std::vector<std::string> partitionsList;

lib/c/c_TableView.cc

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <pulsar/c/table_view.h>
21+
#include <string.h>
22+
23+
#include "c_structs.h"
24+
25+
static void *malloc_and_copy(const char *s, size_t slen) {
26+
void *result = (void *)malloc(slen);
27+
if (result == NULL) {
28+
abort();
29+
}
30+
memcpy(result, s, slen);
31+
return result;
32+
}
33+
34+
bool pulsar_table_view_retrieve_value(pulsar_table_view_t *table_view, const char *key, void **value,
35+
size_t *value_size) {
36+
std::string v;
37+
bool result = table_view->tableView.retrieveValue(key, v);
38+
if (result) {
39+
*value = malloc_and_copy(v.c_str(), v.size());
40+
*value_size = v.size();
41+
}
42+
return result;
43+
}
44+
45+
bool pulsar_table_view_get_value(pulsar_table_view_t *table_view, const char *key, void **value,
46+
size_t *value_size) {
47+
std::string v;
48+
bool result = table_view->tableView.getValue(key, v);
49+
if (result) {
50+
*value = malloc_and_copy(v.c_str(), v.size());
51+
*value_size = v.size();
52+
}
53+
return result;
54+
}
55+
56+
bool pulsar_table_view_contain_key(pulsar_table_view_t *table_view, const char *key) {
57+
return table_view->tableView.containsKey(key);
58+
}
59+
60+
int pulsar_table_view_size(pulsar_table_view_t *table_view) { return table_view->tableView.size(); }
61+
62+
void pulsar_table_view_for_each(pulsar_table_view_t *table_view, pulsar_table_view_action action, void *ctx) {
63+
table_view->tableView.forEach([action, ctx](const std::string &key, const std::string &value) {
64+
if (action) {
65+
action(key.c_str(), value.c_str(), value.size(), ctx);
66+
}
67+
});
68+
}
69+
70+
void pulsar_table_view_for_each_add_listen(pulsar_table_view_t *table_view, pulsar_table_view_action action,
71+
void *ctx) {
72+
table_view->tableView.forEachAndListen([action, ctx](const std::string &key, const std::string &value) {
73+
if (action) {
74+
action(key.c_str(), value.c_str(), value.size(), ctx);
75+
}
76+
});
77+
}
78+
79+
void pulsar_table_view_free(pulsar_table_view_t *table_view) { delete table_view; }
80+
81+
pulsar_result pulsar_table_view_close(pulsar_table_view_t *table_view) {
82+
return (pulsar_result)table_view->tableView.close();
83+
}
84+
85+
void pulsar_table_view_close_async(pulsar_table_view_t *table_view, pulsar_result_callback callback,
86+
void *ctx) {
87+
table_view->tableView.closeAsync(
88+
[callback, ctx](pulsar::Result result) { return handle_result_callback(result, callback, ctx); });
89+
}

0 commit comments

Comments
 (0)