Commit c64e0a3
committed
[fix][io] JDBC sink: prevent OOM from unbounded queue on connection failure
The JDBC sink's internal queue (incomingList) is unbounded. When the
database connection drops, executeBatch() hangs until the TCP socket
times out. During this period, isFlushing stays true, preventing any
draining, while write() continues accepting records without limit.
This causes OutOfMemoryError in production.
This commit fixes 4 issues:
1. Bounded internal queue: write() now rejects records when queue
exceeds maxQueueSize (configurable, defaults to 10x batchSize),
applying Pulsar-level back-pressure via negative acknowledgment.
2. State check in write(): records are failed immediately when the
sink state is not OPEN (after fatal() or close()).
3. Connection validation and reconnection: ensureConnection() validates
the JDBC connection before each flush and reconnects automatically
on failure, allowing recovery from transient database outages.
4. Scheduled flush cancellation: fatal() and close() now cancel the
periodic flush task to prevent repeated failures on a broken
connection.
Fixes apache/pulsar#250301 parent 314b9de commit c64e0a3
3 files changed
Lines changed: 235 additions & 5 deletions
File tree
- jdbc
- core/src/main/java/org/apache/pulsar/io/jdbc
- sqlite/src/test/java/org/apache/pulsar/io/jdbc
Lines changed: 78 additions & 5 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
33 | 33 | | |
34 | 34 | | |
35 | 35 | | |
| 36 | + | |
36 | 37 | | |
37 | 38 | | |
38 | 39 | | |
| |||
78 | 79 | | |
79 | 80 | | |
80 | 81 | | |
| 82 | + | |
81 | 83 | | |
| 84 | + | |
82 | 85 | | |
| 86 | + | |
| 87 | + | |
83 | 88 | | |
84 | 89 | | |
85 | 90 | | |
| |||
93 | 98 | | |
94 | 99 | | |
95 | 100 | | |
96 | | - | |
| 101 | + | |
97 | 102 | | |
98 | 103 | | |
99 | 104 | | |
100 | | - | |
| 105 | + | |
101 | 106 | | |
102 | 107 | | |
103 | | - | |
| 108 | + | |
104 | 109 | | |
105 | 110 | | |
106 | | - | |
| 111 | + | |
107 | 112 | | |
108 | 113 | | |
109 | 114 | | |
| |||
114 | 119 | | |
115 | 120 | | |
116 | 121 | | |
| 122 | + | |
| 123 | + | |
| 124 | + | |
| 125 | + | |
| 126 | + | |
| 127 | + | |
| 128 | + | |
117 | 129 | | |
118 | 130 | | |
119 | 131 | | |
120 | 132 | | |
121 | 133 | | |
122 | | - | |
| 134 | + | |
| 135 | + | |
123 | 136 | | |
124 | 137 | | |
125 | 138 | | |
| |||
158 | 171 | | |
159 | 172 | | |
160 | 173 | | |
| 174 | + | |
| 175 | + | |
| 176 | + | |
| 177 | + | |
161 | 178 | | |
162 | 179 | | |
163 | 180 | | |
| |||
188 | 205 | | |
189 | 206 | | |
190 | 207 | | |
| 208 | + | |
| 209 | + | |
| 210 | + | |
| 211 | + | |
| 212 | + | |
191 | 213 | | |
192 | 214 | | |
| 215 | + | |
| 216 | + | |
| 217 | + | |
| 218 | + | |
| 219 | + | |
| 220 | + | |
| 221 | + | |
| 222 | + | |
| 223 | + | |
193 | 224 | | |
194 | 225 | | |
195 | 226 | | |
| |||
239 | 270 | | |
240 | 271 | | |
241 | 272 | | |
| 273 | + | |
| 274 | + | |
| 275 | + | |
242 | 276 | | |
243 | 277 | | |
244 | 278 | | |
| |||
259 | 293 | | |
260 | 294 | | |
261 | 295 | | |
| 296 | + | |
| 297 | + | |
262 | 298 | | |
263 | 299 | | |
264 | 300 | | |
| |||
308 | 344 | | |
309 | 345 | | |
310 | 346 | | |
| 347 | + | |
311 | 348 | | |
312 | 349 | | |
313 | 350 | | |
| |||
336 | 373 | | |
337 | 374 | | |
338 | 375 | | |
| 376 | + | |
| 377 | + | |
| 378 | + | |
| 379 | + | |
| 380 | + | |
| 381 | + | |
| 382 | + | |
| 383 | + | |
| 384 | + | |
| 385 | + | |
| 386 | + | |
| 387 | + | |
| 388 | + | |
| 389 | + | |
| 390 | + | |
| 391 | + | |
| 392 | + | |
| 393 | + | |
| 394 | + | |
| 395 | + | |
| 396 | + | |
| 397 | + | |
| 398 | + | |
| 399 | + | |
| 400 | + | |
| 401 | + | |
| 402 | + | |
| 403 | + | |
| 404 | + | |
| 405 | + | |
| 406 | + | |
| 407 | + | |
339 | 408 | | |
340 | 409 | | |
341 | 410 | | |
| |||
404 | 473 | | |
405 | 474 | | |
406 | 475 | | |
| 476 | + | |
| 477 | + | |
| 478 | + | |
| 479 | + | |
407 | 480 | | |
408 | 481 | | |
409 | 482 | | |
| |||
Lines changed: 12 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
130 | 130 | | |
131 | 131 | | |
132 | 132 | | |
| 133 | + | |
| 134 | + | |
| 135 | + | |
| 136 | + | |
| 137 | + | |
| 138 | + | |
| 139 | + | |
| 140 | + | |
| 141 | + | |
| 142 | + | |
| 143 | + | |
| 144 | + | |
133 | 145 | | |
134 | 146 | | |
135 | 147 | | |
| |||
Lines changed: 145 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
932 | 932 | | |
933 | 933 | | |
934 | 934 | | |
| 935 | + | |
| 936 | + | |
| 937 | + | |
| 938 | + | |
| 939 | + | |
| 940 | + | |
| 941 | + | |
| 942 | + | |
| 943 | + | |
| 944 | + | |
| 945 | + | |
| 946 | + | |
| 947 | + | |
| 948 | + | |
| 949 | + | |
| 950 | + | |
| 951 | + | |
| 952 | + | |
| 953 | + | |
| 954 | + | |
| 955 | + | |
| 956 | + | |
| 957 | + | |
| 958 | + | |
| 959 | + | |
| 960 | + | |
| 961 | + | |
| 962 | + | |
| 963 | + | |
| 964 | + | |
| 965 | + | |
| 966 | + | |
| 967 | + | |
| 968 | + | |
| 969 | + | |
| 970 | + | |
| 971 | + | |
| 972 | + | |
| 973 | + | |
| 974 | + | |
| 975 | + | |
| 976 | + | |
| 977 | + | |
| 978 | + | |
| 979 | + | |
| 980 | + | |
| 981 | + | |
| 982 | + | |
| 983 | + | |
| 984 | + | |
| 985 | + | |
| 986 | + | |
| 987 | + | |
| 988 | + | |
| 989 | + | |
| 990 | + | |
| 991 | + | |
| 992 | + | |
| 993 | + | |
| 994 | + | |
| 995 | + | |
| 996 | + | |
| 997 | + | |
| 998 | + | |
| 999 | + | |
| 1000 | + | |
| 1001 | + | |
| 1002 | + | |
| 1003 | + | |
| 1004 | + | |
| 1005 | + | |
| 1006 | + | |
| 1007 | + | |
| 1008 | + | |
| 1009 | + | |
| 1010 | + | |
| 1011 | + | |
| 1012 | + | |
| 1013 | + | |
| 1014 | + | |
| 1015 | + | |
| 1016 | + | |
| 1017 | + | |
| 1018 | + | |
| 1019 | + | |
| 1020 | + | |
| 1021 | + | |
| 1022 | + | |
| 1023 | + | |
| 1024 | + | |
| 1025 | + | |
| 1026 | + | |
| 1027 | + | |
| 1028 | + | |
| 1029 | + | |
| 1030 | + | |
| 1031 | + | |
| 1032 | + | |
| 1033 | + | |
| 1034 | + | |
| 1035 | + | |
| 1036 | + | |
| 1037 | + | |
| 1038 | + | |
| 1039 | + | |
| 1040 | + | |
| 1041 | + | |
| 1042 | + | |
| 1043 | + | |
| 1044 | + | |
| 1045 | + | |
| 1046 | + | |
| 1047 | + | |
| 1048 | + | |
| 1049 | + | |
| 1050 | + | |
| 1051 | + | |
| 1052 | + | |
| 1053 | + | |
| 1054 | + | |
| 1055 | + | |
| 1056 | + | |
| 1057 | + | |
| 1058 | + | |
| 1059 | + | |
| 1060 | + | |
| 1061 | + | |
| 1062 | + | |
| 1063 | + | |
| 1064 | + | |
| 1065 | + | |
| 1066 | + | |
| 1067 | + | |
| 1068 | + | |
| 1069 | + | |
| 1070 | + | |
| 1071 | + | |
| 1072 | + | |
| 1073 | + | |
| 1074 | + | |
| 1075 | + | |
| 1076 | + | |
| 1077 | + | |
| 1078 | + | |
| 1079 | + | |
935 | 1080 | | |
936 | 1081 | | |
937 | 1082 | | |
| |||
0 commit comments