|
27 | 27 | import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; |
28 | 28 | import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode; |
29 | 29 | import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; |
30 | | -import org.apache.iotdb.pipe.api.exception.PipeException; |
31 | 30 |
|
32 | 31 | import org.apache.tsfile.utils.Pair; |
33 | 32 | import org.apache.tsfile.utils.PublicBAOS; |
|
42 | 41 | import java.util.HashMap; |
43 | 42 | import java.util.List; |
44 | 43 | import java.util.Map; |
45 | | -import java.util.Objects; |
46 | 44 | import java.util.concurrent.atomic.AtomicLong; |
47 | 45 | import java.util.function.BiConsumer; |
48 | 46 |
|
@@ -104,27 +102,29 @@ public PipeTransferTabletBatchReqV2 toTPipeTransferReq() throws IOException { |
104 | 102 | final String databaseName = insertTablets.getKey(); |
105 | 103 | for (final Map.Entry<String, Pair<Integer, List<Tablet>>> tabletEntry : |
106 | 104 | insertTablets.getValue().entrySet()) { |
107 | | - Tablet batchTablet = null; |
| 105 | + final List<Tablet> batchTablets = new ArrayList<>(); |
108 | 106 | for (final Tablet tablet : tabletEntry.getValue().getRight()) { |
109 | | - if (Objects.isNull(batchTablet)) { |
110 | | - batchTablet = tablet; |
111 | | - } else if (!batchTablet.append(tablet, tabletEntry.getValue().getLeft())) { |
112 | | - throw new PipeException( |
113 | | - "Failed to merge tablets due to inconsistent schema, database: " |
114 | | - + databaseName |
115 | | - + ", tableName: " |
116 | | - + tablet.getTableName()); |
| 107 | + boolean success = false; |
| 108 | + for (final Tablet batchTablet : batchTablets) { |
| 109 | + if (batchTablet.append(tablet, tabletEntry.getValue().getLeft())) { |
| 110 | + success = true; |
| 111 | + break; |
| 112 | + } |
| 113 | + } |
| 114 | + if (!success) { |
| 115 | + batchTablets.add(tablet); |
117 | 116 | } |
118 | 117 | } |
119 | | - assert batchTablet != null; |
120 | | - try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); |
121 | | - final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { |
122 | | - batchTablet.serialize(outputStream); |
123 | | - ReadWriteIOUtils.write(true, outputStream); |
124 | | - tabletBuffers.add( |
125 | | - ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size())); |
| 118 | + for (final Tablet batchTablet : batchTablets) { |
| 119 | + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); |
| 120 | + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { |
| 121 | + batchTablet.serialize(outputStream); |
| 122 | + ReadWriteIOUtils.write(true, outputStream); |
| 123 | + tabletBuffers.add( |
| 124 | + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size())); |
| 125 | + } |
| 126 | + tabletDataBases.add(databaseName); |
126 | 127 | } |
127 | | - tabletDataBases.add(databaseName); |
128 | 128 | } |
129 | 129 | } |
130 | 130 |
|
|
0 commit comments