Skip to content

Commit 567faf0

Browse files
authored
Merge pull request #134 from Restream/feature/v5.13.0-support
Add support for v5.13.0 reindexer API
2 parents eddd3bf + e6d4976 commit 567faf0

8 files changed

Lines changed: 26 additions & 15 deletions

File tree

builtin-adapter/BuiltinAdapter.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,9 +282,13 @@ JNIEXPORT jobject JNICALL Java_ru_rt_restream_reindexer_binding_builtin_BuiltinA
282282
jlong rx,
283283
jlong ctxId,
284284
jlong timeout,
285-
jbyteArray data) {
285+
jbyteArray data,
286+
jlongArray versions) {
287+
auto ptVersions = reinterpret_cast<int32_t *>(env->GetLongArrayElements(versions, nullptr));
288+
int ptVersionsCount = env->GetArrayLength(versions);
286289
reindexer_buffer bufferData = rx_buffer(env, data);
287-
jobject res = j_res(env, reindexer_update_query(rx, bufferData, rx_ctx(ctxId, timeout)));
290+
jobject res = j_res(env, reindexer_update_query(rx, bufferData, ptVersions, ptVersionsCount, rx_ctx(ctxId, timeout)));
291+
env->ReleaseLongArrayElements(versions, reinterpret_cast<jlong *>(ptVersions), 0);
288292
env->ReleaseByteArrayElements(data, reinterpret_cast<jbyte *>(bufferData.data), 0);
289293
return res;
290294
}

builtin-adapter/BuiltinAdapter.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ JNIEXPORT jobject JNICALL Java_ru_rt_restream_reindexer_binding_builtin_BuiltinA
9191

9292
JNIEXPORT jobject JNICALL Java_ru_rt_restream_reindexer_binding_builtin_BuiltinAdapter_updateQuery(JNIEnv *, jobject,
9393
jlong, jlong, jlong,
94-
jbyteArray);
94+
jbyteArray,
95+
jlongArray);
9596

9697
JNIEXPORT jobject JNICALL Java_ru_rt_restream_reindexer_binding_builtin_BuiltinAdapter_updateQueryTx(JNIEnv *, jobject,
9798
jlong, jlong,

src/main/java/ru/rt/restream/reindexer/Query.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,8 +1074,7 @@ private long[] prepareQueryAndGetPayloadTypesVersions() {
10741074

10751075
return namespaces.stream()
10761076
.map(ReindexerNamespace::getPayloadType)
1077-
.map(pt -> pt == null ? 0 : pt.getStateToken())
1078-
.mapToLong(Integer::longValue)
1077+
.mapToLong(pt -> pt == null ? 0 : (pt.getVersion() ^ pt.getStateToken()))
10791078
.toArray();
10801079
}
10811080

@@ -1245,7 +1244,11 @@ public void update() {
12451244
if (transactionContext != null) {
12461245
transactionContext.updateQuery(buffer.bytes());
12471246
} else {
1248-
reindexer.getBinding().updateQuery(buffer.bytes());
1247+
// There are no support for inner joins for update-queries in Java binding,
1248+
// so we are using single pt version
1249+
PayloadType pt = namespace.getPayloadType();
1250+
long tmVersion = pt == null ? 0 : (pt.getVersion() ^ pt.getStateToken());
1251+
reindexer.getBinding().updateQuery(buffer.bytes(), new long[]{tmVersion});
12491252
}
12501253
}
12511254

src/main/java/ru/rt/restream/reindexer/binding/Binding.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,9 @@ public interface Binding {
173173
* Invoke update query.
174174
*
175175
* @param queryData encoded query data (selected indexes, predicates, etc)
176+
* @param ptVersions payload type state tokens
176177
*/
177-
void updateQuery(byte[] queryData);
178+
void updateQuery(byte[] queryData, long[] ptVersions);
178179

179180
/**
180181
* Starts a transaction for the given namespace name.

src/main/java/ru/rt/restream/reindexer/binding/builtin/Builtin.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,8 @@ public void deleteQuery(byte[] queryData) {
169169
}
170170

171171
@Override
172-
public void updateQuery(byte[] queryData) {
173-
ReindexerResponse response = adapter.updateQuery(rx, next.getAndIncrement(), timeout.toMillis(), queryData);
172+
public void updateQuery(byte[] queryData, long[] ptVersions) {
173+
ReindexerResponse response = adapter.updateQuery(rx, next.getAndIncrement(), timeout.toMillis(), queryData, ptVersions);
174174
checkResponse(response);
175175
}
176176

src/main/java/ru/rt/restream/reindexer/binding/builtin/BuiltinAdapter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ public native ReindexerResponse openNamespace(long rx, long ctxId, long timeout,
285285
* @param timeout the execution timeout
286286
* @param query the SQL-query string
287287
* @param asJson 'true' if response should be serialized in JSON format, defaults to CJSON
288-
* @param versions the versions
288+
* @param versions the tagsmatcher versions
289289
* @return the {@link ReindexerResponse} to use
290290
*/
291291
public native ReindexerResponse select(long rx, long ctxId, long timeout, String query, boolean asJson,
@@ -319,9 +319,10 @@ public native ReindexerResponse select(long rx, long ctxId, long timeout, String
319319
* @param ctxId the context id
320320
* @param timeout the execution timeout
321321
* @param data the query payload (i.e. predicates, joins etc)
322+
* @param versions the tagsmatcher versions
322323
* @return the {@link ReindexerResponse} to use
323324
*/
324-
public native ReindexerResponse updateQuery(long rx, long ctxId, long timeout, byte[] data);
325+
public native ReindexerResponse updateQuery(long rx, long ctxId, long timeout, byte[] data, long[] versions);
325326

326327
/**
327328
* Executes update query in the transaction.

src/main/java/ru/rt/restream/reindexer/binding/builtin/server/BuiltinServer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ public void deleteQuery(byte[] queryData) {
172172
}
173173

174174
@Override
175-
public void updateQuery(byte[] queryData) {
176-
builtin.updateQuery(queryData);
175+
public void updateQuery(byte[] queryData, long[] ptVersions) {
176+
builtin.updateQuery(queryData, ptVersions);
177177
}
178178

179179
@Override

src/main/java/ru/rt/restream/reindexer/binding/cproto/Cproto.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,9 @@ public void deleteQuery(byte[] queryData) {
165165
}
166166

167167
@Override
168-
public void updateQuery(byte[] queryData) {
169-
rpcCallNoResults(UPDATE_QUERY, queryData);
168+
public void updateQuery(byte[] queryData, long[] ptVersions) {
169+
int flags = Consts.RESULTS_PURE | Consts.RESULTS_WITH_PAYLOAD_TYPES;
170+
rpcCallNoResults(UPDATE_QUERY, queryData, flags, ptVersions);
170171
}
171172

172173
@Override

0 commit comments

Comments
 (0)