Skip to content

Commit 2f8f730

Browse files
authored
Merge pull request #649 from alex268/master
Fixed lost onPartitionSessionClosed event
2 parents f2c9615 + 44d9e98 commit 2f8f730

4 files changed

Lines changed: 211 additions & 6 deletions

File tree

topic/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,17 @@
4040
<artifactId>zstd-jni</artifactId>
4141
<version>1.5.7-2</version>
4242
</dependency>
43+
4344
<dependency>
4445
<groupId>junit</groupId>
4546
<artifactId>junit</artifactId>
4647
<scope>test</scope>
4748
</dependency>
49+
<dependency>
50+
<groupId>tech.ydb</groupId>
51+
<artifactId>ydb-sdk-table</artifactId>
52+
<scope>test</scope>
53+
</dependency>
4854
<dependency>
4955
<groupId>tech.ydb.test</groupId>
5056
<artifactId>ydb-junit4-support</artifactId>

topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,8 @@ protected void onStopPartitionSessionRequest(YdbTopic.StreamReadMessage.StopPart
251251
logger.info("[{}] Received force StopPartitionSessionRequest for {} ", streamId, rps.getPartition());
252252
rps.stop();
253253
}
254+
255+
reader.handleClosePartitionSession(partition);
254256
return;
255257
}
256258

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
package tech.ydb.topic;
2+
3+
import java.util.Iterator;
4+
import java.util.Queue;
5+
import java.util.concurrent.ConcurrentLinkedQueue;
6+
import java.util.concurrent.CountDownLatch;
7+
import java.util.concurrent.TimeUnit;
8+
9+
import org.junit.AfterClass;
10+
import org.junit.Assert;
11+
import org.junit.BeforeClass;
12+
import org.junit.ClassRule;
13+
import org.junit.Rule;
14+
import org.junit.Test;
15+
import org.junit.rules.Timeout;
16+
17+
import tech.ydb.core.Status;
18+
import tech.ydb.table.SessionRetryContext;
19+
import tech.ydb.table.TableClient;
20+
import tech.ydb.table.query.Params;
21+
import tech.ydb.table.transaction.TxControl;
22+
import tech.ydb.test.junit4.GrpcTransportRule;
23+
import tech.ydb.topic.read.AsyncReader;
24+
import tech.ydb.topic.read.events.DataReceivedEvent;
25+
import tech.ydb.topic.read.events.PartitionSessionClosedEvent;
26+
import tech.ydb.topic.read.events.ReadEventHandler;
27+
import tech.ydb.topic.read.events.ReaderClosedEvent;
28+
import tech.ydb.topic.read.events.StartPartitionSessionEvent;
29+
import tech.ydb.topic.read.events.StopPartitionSessionEvent;
30+
import tech.ydb.topic.read.impl.events.SessionStartedEvent;
31+
import tech.ydb.topic.settings.ReadEventHandlersSettings;
32+
import tech.ydb.topic.settings.ReaderSettings;
33+
import tech.ydb.topic.settings.TopicReadSettings;
34+
35+
/**
36+
*
37+
* @author Aleksandr Gorshenin
38+
*/
39+
public class ChangefeedTopicTest {
40+
private final static String TEST_TABLE = "changefeed_table";
41+
private final static String TEST_CHANGEFEED = "updates";
42+
private final static String TEST_CONSUMER = "consumer";
43+
44+
@ClassRule
45+
public final static GrpcTransportRule ydbTransport = new GrpcTransportRule();
46+
47+
@Rule
48+
public final Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
49+
50+
private static TopicClient topicClient;
51+
private static TableClient tableClient;
52+
53+
@BeforeClass
54+
public static void initClients() {
55+
tableClient = TableClient.newClient(ydbTransport).build();
56+
topicClient = TopicClient.newClient(ydbTransport).build();
57+
}
58+
59+
@AfterClass
60+
public static void closeClients() {
61+
topicClient.close();
62+
tableClient.close();
63+
}
64+
65+
private void prepareChangefeed() {
66+
SessionRetryContext retryCtx = SessionRetryContext.create(tableClient).build();
67+
String tablePath = ydbTransport.getDatabase() + "/" + TEST_TABLE;
68+
String changefeedPath = tablePath + "/" + TEST_CHANGEFEED;
69+
70+
retryCtx.supplyStatus(s -> s.executeSchemeQuery(
71+
"CREATE TABLE `" + tablePath + "` ("
72+
+ "id Uint32, value Text, PRIMARY KEY (id)) "
73+
+ "WITH ("
74+
+ " AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2, "
75+
+ " PARTITION_AT_KEYS = (100));"
76+
)).join().expectSuccess("cannot create table");
77+
78+
retryCtx.supplyStatus(s -> s.executeSchemeQuery(
79+
"ALTER TABLE `" + tablePath + "` "
80+
+ "ADD CHANGEFEED " + TEST_CHANGEFEED
81+
+ " WITH (FORMAT = 'JSON', MODE = 'NEW_IMAGE', INITIAL_SCAN = true, VIRTUAL_TIMESTAMPS = true);"
82+
)).join().expectSuccess("cannot alter table");
83+
84+
retryCtx.supplyStatus(s -> s.executeSchemeQuery(""
85+
+ "ALTER TOPIC `" + changefeedPath + "` ADD CONSUMER " + TEST_CONSUMER
86+
)).join().expectSuccess("cannot alter changefeed");
87+
88+
retryCtx.supplyResult(s -> s.executeDataQuery(
89+
"INSERT INTO `" + tablePath + "` (id, value) VALUES (1, '1'), (1000, '2');",
90+
TxControl.serializableRw(), Params.empty()
91+
)).join().getStatus().expectSuccess("cannot insert data");
92+
}
93+
94+
private Status dropChangefeed() {
95+
SessionRetryContext retryCtx = SessionRetryContext.create(tableClient).build();
96+
return retryCtx.supplyStatus(s -> s.dropTable(ydbTransport.getDatabase() + "/" + TEST_TABLE)).join();
97+
}
98+
99+
@Test
100+
public void changefeedReadTest() throws Exception {
101+
prepareChangefeed();
102+
103+
String changefeedPath = ydbTransport.getDatabase() + "/" + TEST_TABLE + "/" + TEST_CHANGEFEED;
104+
ReaderSettings rs = ReaderSettings.newBuilder()
105+
.addTopic(TopicReadSettings.newBuilder().setPath(changefeedPath).build())
106+
.setConsumerName(TEST_CONSUMER)
107+
.build();
108+
109+
TestHandler handler1 = new TestHandler(2, 1);
110+
AsyncReader reader1 = topicClient.createAsyncReader(rs, ReadEventHandlersSettings.newBuilder()
111+
.setEventHandler(handler1).build());
112+
113+
reader1.init().join();
114+
handler1.waitData();
115+
handler1.assertEvents("INIT", "START", "START", "DATA1", "DATA1");
116+
117+
TestHandler handler2 = new TestHandler(1, 1);
118+
AsyncReader reader2 = topicClient.createAsyncReader(rs, ReadEventHandlersSettings.newBuilder()
119+
.setEventHandler(handler2).build());
120+
121+
reader2.init().join();
122+
handler2.waitData();
123+
124+
handler1.assertEvents("INIT", "START", "START", "DATA1", "DATA1", "STOP");
125+
handler2.assertEvents("INIT", "START", "DATA1");
126+
127+
dropChangefeed().expectSuccess("cannot drop changefeed");
128+
129+
handler1.waitClosed();
130+
handler1.assertEvents("INIT", "START", "START", "DATA1", "DATA1", "STOP", "CLOSED");
131+
132+
handler2.waitClosed();
133+
handler2.assertEvents("INIT", "START", "DATA1", "CLOSED");
134+
135+
reader1.shutdown().join();
136+
handler1.assertEvents("INIT", "START", "START", "DATA1", "DATA1", "STOP", "CLOSED", "READER CLOSED");
137+
138+
reader2.shutdown().join();
139+
handler2.assertEvents("INIT", "START", "DATA1", "CLOSED", "READER CLOSED");
140+
}
141+
142+
private static class TestHandler implements ReadEventHandler {
143+
private final CountDownLatch dataLatch;
144+
private final CountDownLatch closedLatch;
145+
private final Queue<String> events = new ConcurrentLinkedQueue<>();
146+
147+
public TestHandler(int expectedData, int expectedClosed) {
148+
this.dataLatch = new CountDownLatch(expectedData);
149+
this.closedLatch = new CountDownLatch(expectedClosed);
150+
}
151+
152+
public void assertEvents(String... expected) {
153+
Iterator<String> it = events.iterator();
154+
for (String ex : expected) {
155+
Assert.assertTrue("Expected " + ex + " but have nothing", it.hasNext());
156+
Assert.assertEquals(ex, it.next());
157+
}
158+
}
159+
160+
public void waitData() throws InterruptedException {
161+
Assert.assertTrue("Timed out waiting for data events", dataLatch.await(1, TimeUnit.MINUTES));
162+
}
163+
164+
public void waitClosed() throws InterruptedException {
165+
Assert.assertTrue("Timed out waiting for closed events", closedLatch.await(1, TimeUnit.MINUTES));
166+
}
167+
168+
@Override
169+
public void onMessages(DataReceivedEvent dre) {
170+
events.add("DATA" + dre.getMessages().size());
171+
dataLatch.countDown();
172+
}
173+
174+
@Override
175+
public void onSessionStarted(SessionStartedEvent event) {
176+
events.add("INIT");
177+
}
178+
179+
@Override
180+
public void onStartPartitionSession(StartPartitionSessionEvent event) {
181+
events.add("START");
182+
event.confirm();
183+
}
184+
185+
@Override
186+
public void onStopPartitionSession(StopPartitionSessionEvent event) {
187+
events.add("STOP");
188+
event.confirm();
189+
}
190+
191+
@Override
192+
public void onPartitionSessionClosed(PartitionSessionClosedEvent event) {
193+
events.add("CLOSED");
194+
closedLatch.countDown();
195+
}
196+
197+
@Override
198+
public void onReaderClosed(ReaderClosedEvent event) {
199+
events.add("READER CLOSED");
200+
}
201+
}
202+
}

topic/src/test/java/tech/ydb/topic/TopicReadersIntegrationTest.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
package tech.ydb.topic;
22

3-
4-
5-
import tech.ydb.topic.YdbTopicsIntegrationTest;
6-
73
import java.util.concurrent.CompletableFuture;
84
import java.util.concurrent.ExecutorService;
95
import java.util.concurrent.Executors;
@@ -24,7 +20,6 @@
2420
import org.slf4j.LoggerFactory;
2521

2622
import tech.ydb.test.junit4.GrpcTransportRule;
27-
import tech.ydb.topic.TopicClient;
2823
import tech.ydb.topic.description.Consumer;
2924
import tech.ydb.topic.read.AsyncReader;
3025
import tech.ydb.topic.read.Message;
@@ -39,9 +34,9 @@
3934
import tech.ydb.topic.settings.StartPartitionSessionSettings;
4035
import tech.ydb.topic.settings.TopicReadSettings;
4136
import tech.ydb.topic.settings.WriterSettings;
37+
import tech.ydb.topic.utils.HideLoggers;
4238
import tech.ydb.topic.utils.HideLoggersRule;
4339
import tech.ydb.topic.write.SyncWriter;
44-
import tech.ydb.topic.utils.HideLoggers;
4540

4641
/**
4742
*

0 commit comments

Comments
 (0)