Skip to content

Commit 86b6677

Browse files
committed
Avoid ByteBuffer wrapping in cql3.selection.Selector.InputRow to reduce memory allocation rate
Current ReadCommand logic returns ArrayCell values, so when we retrieve a cell value as a ByteBuffer we allocate ByteBuffer instances patch by Dmitry Konstantinov; reviewed by TBD for CASSANDRA-21362
1 parent 6eb2d5d commit 86b6677

25 files changed

Lines changed: 336 additions & 138 deletions

File tree

src/java/org/apache/cassandra/cql3/ResultSet.java

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,27 @@ public class ResultSet
4646
public static final Codec codec = new Codec();
4747

4848
public final ResultMetadata metadata;
49-
public final List<List<ByteBuffer>> rows;
49+
public final List<List<byte[]>> rows;
5050

5151
public ResultSet(ResultMetadata resultMetadata)
5252
{
53-
this(resultMetadata, new ArrayList<List<ByteBuffer>>());
53+
this(resultMetadata, new ArrayList<List<byte[]>>());
5454
}
5555

56-
public ResultSet(ResultMetadata resultMetadata, List<List<ByteBuffer>> rows)
56+
public ResultSet(ResultMetadata resultMetadata, List<List<byte[]>> rows)
5757
{
5858
this.metadata = resultMetadata;
5959
this.rows = rows;
6060
}
6161

62+
public static ResultSet fromByteBufferRows(ResultMetadata resultMetadata, List<List<ByteBuffer>> bbRows)
63+
{
64+
List<List<byte[]>> converted = new ArrayList<>(bbRows.size());
65+
for (List<ByteBuffer> bbRow : bbRows)
66+
converted.add(convertByteBufferList(bbRow));
67+
return new ResultSet(resultMetadata, converted);
68+
}
69+
6270
public int size()
6371
{
6472
return rows.size();
@@ -69,21 +77,40 @@ public boolean isEmpty()
6977
return size() == 0;
7078
}
7179

72-
public void addRow(List<ByteBuffer> row)
80+
public void addRow(List<byte[]> row)
7381
{
7482
assert row.size() == metadata.valueCount();
7583
rows.add(row);
7684
}
7785

78-
public void addColumnValue(ByteBuffer value)
86+
public void addByteBufferRow(List<ByteBuffer> row)
87+
{
88+
assert row.size() == metadata.valueCount();
89+
rows.add(convertByteBufferList(row));
90+
}
91+
92+
private static List<byte[]> convertByteBufferList(List<ByteBuffer> row)
93+
{
94+
List<byte[]> converted = new ArrayList<>(row.size());
95+
for (ByteBuffer bb : row)
96+
converted.add(ByteBufferUtil.getArrayUnsafeNullable(bb));
97+
return converted;
98+
}
99+
100+
public void addColumnValue(byte[] value)
79101
{
80102
if (rows.isEmpty() || lastRow().size() == metadata.valueCount())
81-
rows.add(new ArrayList<ByteBuffer>(metadata.valueCount()));
103+
rows.add(new ArrayList<byte[]>(metadata.valueCount()));
82104

83105
lastRow().add(value);
84106
}
85107

86-
private List<ByteBuffer> lastRow()
108+
public void addColumnValue(ByteBuffer value)
109+
{
110+
addColumnValue(ByteBufferUtil.getArrayUnsafeNullable(value));
111+
}
112+
113+
private List<byte[]> lastRow()
87114
{
88115
return rows.get(rows.size() - 1);
89116
}
@@ -110,11 +137,11 @@ public String toString()
110137
{
111138
StringBuilder sb = new StringBuilder();
112139
sb.append(metadata).append('\n');
113-
for (List<ByteBuffer> row : rows)
140+
for (List<byte[]> row : rows)
114141
{
115142
for (int i = 0; i < row.size(); i++)
116143
{
117-
ByteBuffer v = row.get(i);
144+
byte[] v = row.get(i);
118145
if (v == null)
119146
{
120147
sb.append(" | null");
@@ -123,9 +150,9 @@ public String toString()
123150
{
124151
sb.append(" | ");
125152
if (metadata.flags.contains(Flag.NO_METADATA))
126-
sb.append("0x").append(ByteBufferUtil.bytesToHex(v));
153+
sb.append("0x").append(ByteBufferUtil.bytesToHex(ByteBuffer.wrap(v)));
127154
else
128-
sb.append(metadata.names.get(i).type.getString(v));
155+
sb.append(metadata.names.get(i).type.getString(ByteBuffer.wrap(v)));
129156
}
130157
}
131158
sb.append('\n');
@@ -151,12 +178,12 @@ public ResultSet decode(ByteBuf body, ProtocolVersion version)
151178
{
152179
ResultMetadata m = ResultMetadata.codec.decode(body, version);
153180
int rowCount = body.readInt();
154-
ResultSet rs = new ResultSet(m, new ArrayList<List<ByteBuffer>>(rowCount));
181+
ResultSet rs = new ResultSet(m, new ArrayList<List<byte[]>>(rowCount));
155182

156183
// rows
157184
int totalValues = rowCount * m.columnCount;
158185
for (int i = 0; i < totalValues; i++)
159-
rs.addColumnValue(CBUtil.readValue(body));
186+
rs.addColumnValue(CBUtil.readValueAsBytes(body));
160187

161188
return rs;
162189
}
@@ -165,7 +192,7 @@ public void encode(ResultSet rs, ByteBuf dest, ProtocolVersion version)
165192
{
166193
ResultMetadata.codec.encode(rs.metadata, dest, version);
167194
dest.writeInt(rs.rows.size());
168-
for (List<ByteBuffer> row : rs.rows)
195+
for (List<byte[]> row : rs.rows)
169196
{
170197
// Note that we do only want to serialize only the first columnCount values, even if the row
171198
// as more: see comment on ResultMetadata.names field.
@@ -177,7 +204,7 @@ public void encode(ResultSet rs, ByteBuf dest, ProtocolVersion version)
177204
public int encodedSize(ResultSet rs, ProtocolVersion version)
178205
{
179206
int size = ResultMetadata.codec.encodedSize(rs.metadata, version) + 4;
180-
for (List<ByteBuffer> row : rs.rows)
207+
for (List<byte[]> row : rs.rows)
181208
{
182209
for (int i = 0; i < rs.metadata.columnCount; i++)
183210
size += CBUtil.sizeOfValue(row.get(i));

src/java/org/apache/cassandra/cql3/UntypedResultSet.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public Iterator<Row> iterator()
130130
{
131131
return new AbstractIterator<Row>()
132132
{
133-
final Iterator<List<ByteBuffer>> iter = cqlRows.rows.iterator();
133+
final Iterator<List<byte[]>> iter = cqlRows.rows.iterator();
134134

135135
protected Row computeNext()
136136
{
@@ -176,7 +176,7 @@ public Iterator<Row> iterator()
176176
{
177177
return new AbstractIterator<Row>()
178178
{
179-
private Iterator<List<ByteBuffer>> currentPage;
179+
private Iterator<List<byte[]>> currentPage;
180180

181181
protected Row computeNext()
182182
{
@@ -242,7 +242,7 @@ public Iterator<Row> iterator()
242242
{
243243
return new AbstractIterator<Row>()
244244
{
245-
private Iterator<List<ByteBuffer>> currentPage;
245+
private Iterator<List<byte[]>> currentPage;
246246

247247
protected Row computeNext()
248248
{
@@ -275,11 +275,27 @@ public static class Row
275275
@Nonnull
276276
private final List<ColumnSpecification> columns;
277277

278-
public Row(@Nonnull List<ColumnSpecification> names, @Nonnull List<ByteBuffer> columns)
278+
public Row(@Nonnull List<ColumnSpecification> names, @Nonnull List<byte[]> columns)
279279
{
280280
this.columns = ImmutableList.copyOf(names);
281281
for (int i = 0; i < names.size(); i++)
282-
data.put(names.get(i).name.toString(), columns.get(i));
282+
{
283+
byte[] v = columns.get(i);
284+
data.put(names.get(i).name.toString(), v == null ? null : ByteBuffer.wrap(v));
285+
}
286+
}
287+
288+
public static Row fromByteBuffers(@Nonnull List<ColumnSpecification> names, @Nonnull List<ByteBuffer> columns)
289+
{
290+
Row row = new Row(names);
291+
for (int i = 0; i < names.size(); i++)
292+
row.data.put(names.get(i).name.toString(), columns.get(i));
293+
return row;
294+
}
295+
296+
private Row(@Nonnull List<ColumnSpecification> names)
297+
{
298+
this.columns = ImmutableList.copyOf(names);
283299
}
284300

285301
public boolean has(String column)

src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,12 @@ public ResultSetBuilder(ResultMetadata metadata, FunctionContext context, Select
7575
selectors.prepare(context);
7676
}
7777

78-
private void addSize(List<ByteBuffer> row)
78+
private void addSize(List<byte[]> row)
7979
{
8080
for (int i=0, isize=row.size(); i<isize; i++)
8181
{
82-
ByteBuffer value = row.get(i);
83-
size += value != null ? value.remaining() : 0;
82+
byte[] value = row.get(i);
83+
size += value != null ? value.length : 0;
8484
}
8585
}
8686

@@ -109,6 +109,11 @@ public void add(ByteBuffer v)
109109
inputRow.add(v);
110110
}
111111

112+
public void add(byte[] v)
113+
{
114+
inputRow.add(v);
115+
}
116+
112117
public void add(Cell<?> c, long nowInSec)
113118
{
114119
inputRow.add(c, nowInSec);
@@ -172,9 +177,9 @@ public ResultSet build()
172177
return resultSet;
173178
}
174179

175-
private List<ByteBuffer> getOutputRow()
180+
private List<byte[]> getOutputRow()
176181
{
177-
List<ByteBuffer> row = selectors.getOutputRow();
182+
List<byte[]> row = selectors.getOutputRow();
178183
addSize(row);
179184
return row;
180185
}

src/java/org/apache/cassandra/cql3/selection/Selection.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
*/
1818
package org.apache.cassandra.cql3.selection;
1919

20-
import java.nio.ByteBuffer;
2120
import java.util.ArrayList;
2221
import java.util.Arrays;
2322
import java.util.Collections;
@@ -40,11 +39,13 @@
4039
import org.apache.cassandra.cql3.functions.Function;
4140
import org.apache.cassandra.cql3.selection.Selector.InputRow;
4241
import org.apache.cassandra.db.filter.ColumnFilter;
42+
import org.apache.cassandra.db.marshal.ByteArrayAccessor;
4343
import org.apache.cassandra.db.marshal.UTF8Type;
4444
import org.apache.cassandra.exceptions.InvalidRequestException;
4545
import org.apache.cassandra.schema.ColumnMetadata;
4646
import org.apache.cassandra.schema.TableMetadata;
4747
import org.apache.cassandra.transport.ProtocolVersion;
48+
import org.apache.cassandra.utils.ByteBufferUtil;
4849
import org.apache.cassandra.utils.JsonUtils;
4950

5051
import static org.apache.cassandra.utils.LocalizeString.toLowerCaseLocalized;
@@ -319,22 +320,22 @@ public String toString()
319320
.toString();
320321
}
321322

322-
private static List<ByteBuffer> rowToJson(List<ByteBuffer> row,
323-
ProtocolVersion protocolVersion,
324-
ResultSet.ResultMetadata metadata,
325-
List<ColumnMetadata> orderingColumns)
323+
private static List<byte[]> rowToJson(List<byte[]> row,
324+
ProtocolVersion protocolVersion,
325+
ResultSet.ResultMetadata metadata,
326+
List<ColumnMetadata> orderingColumns)
326327
{
327-
ByteBuffer[] jsonRow = new ByteBuffer[orderingColumns.size() + 1];
328+
byte[][] jsonRow = new byte[orderingColumns.size() + 1][];
328329
StringBuilder sb = new StringBuilder("{");
329330
for (int i = 0; i < metadata.names.size(); i++)
330331
{
331332
ColumnSpecification spec = metadata.names.get(i);
332-
ByteBuffer buffer = row.get(i);
333+
byte[] value = row.get(i);
333334

334335
// If it is an ordering column we need to keep it in case we need it for post ordering
335336
int index = orderingColumns.indexOf(spec);
336337
if (index >= 0)
337-
jsonRow[index + 1] = buffer;
338+
jsonRow[index + 1] = value;
338339

339340
// If the column is only used for ordering we can stop here.
340341
if (i >= metadata.getColumnCount())
@@ -350,14 +351,14 @@ private static List<ByteBuffer> rowToJson(List<ByteBuffer> row,
350351
sb.append('"');
351352
sb.append(JsonUtils.quoteAsJsonString(columnName));
352353
sb.append("\": ");
353-
if (buffer == null)
354+
if (value == null)
354355
sb.append("null");
355356
else
356-
sb.append(spec.type.toJSONString(buffer, protocolVersion));
357+
sb.append(spec.type.toJSONString(value, ByteArrayAccessor.instance, protocolVersion));
357358
}
358359
sb.append("}");
359360

360-
jsonRow[0] = UTF8Type.instance.getSerializer().serialize(sb.toString());
361+
jsonRow[0] = ByteBufferUtil.getArrayUnsafeNullable(UTF8Type.instance.getSerializer().serialize(sb.toString()));
361362
return Arrays.asList(jsonRow);
362363
}
363364

@@ -403,14 +404,14 @@ default void prepare(FunctionContext context) {}
403404
*/
404405
void addInputRow(InputRow input);
405406

406-
List<ByteBuffer> getOutputRow();
407+
List<byte[]> getOutputRow();
407408

408409
void reset();
409410
}
410411

411412
public static class SimpleSelectors implements Selectors
412413
{
413-
protected List<ByteBuffer> current;
414+
protected List<byte[]> current;
414415

415416
@Override
416417
public void addInputRow(InputRow input)
@@ -419,7 +420,7 @@ public void addInputRow(InputRow input)
419420
}
420421

421422
@Override
422-
public List<ByteBuffer> getOutputRow()
423+
public List<byte[]> getOutputRow()
423424
{
424425
return current;
425426
}
@@ -503,7 +504,7 @@ public Selectors newSelectors(QueryOptions options)
503504
return new SimpleSelectors()
504505
{
505506
@Override
506-
public List<ByteBuffer> getOutputRow()
507+
public List<byte[]> getOutputRow()
507508
{
508509
if (isJson)
509510
return rowToJson(current, options.getProtocolVersion(), metadata, orderingColumns);
@@ -594,12 +595,12 @@ public boolean hasProcessing()
594595
return true;
595596
}
596597

597-
public List<ByteBuffer> getOutputRow()
598+
public List<byte[]> getOutputRow()
598599
{
599-
List<ByteBuffer> outputRow = new ArrayList<>(selectors.size());
600+
List<byte[]> outputRow = new ArrayList<>(selectors.size());
600601

601602
for (Selector selector: selectors)
602-
outputRow.add(selector.getOutput(options.getProtocolVersion()));
603+
outputRow.add(selector.getOutputAsBytes(options.getProtocolVersion()));
603604

604605
return isJson ? rowToJson(outputRow, options.getProtocolVersion(), metadata, orderingColumns) : outputRow;
605606
}

0 commit comments

Comments
 (0)