|
18 | 18 | */ |
19 | 19 | package org.apache.parquet.column.values.alp.benchmark; |
20 | 20 |
|
| 21 | +import java.io.BufferedReader; |
21 | 22 | import java.io.IOException; |
| 23 | +import java.io.InputStream; |
| 24 | +import java.io.InputStreamReader; |
22 | 25 | import java.nio.ByteBuffer; |
23 | | -import java.util.Random; |
| 26 | +import java.util.ArrayList; |
| 27 | +import java.util.List; |
24 | 28 | import org.apache.parquet.bytes.ByteBufferInputStream; |
25 | 29 | import org.apache.parquet.column.values.alp.AlpValuesReaderForDouble; |
26 | 30 | import org.apache.parquet.column.values.alp.AlpValuesReaderForFloat; |
|
29 | 33 | import org.junit.Test; |
30 | 34 |
|
31 | 35 | /** |
32 | | - * Codec-level ALP throughput benchmark reporting MB/s. |
| 36 | + * Codec-level ALP throughput benchmark using real Spotify dataset columns. |
33 | 37 | * |
34 | 38 | * <p>Comparable to C++ encoding_alp_benchmark.cc. Measures encode and decode |
35 | | - * throughput at the codec level (no Parquet pipeline overhead). |
| 39 | + * throughput at the codec level (no Parquet pipeline overhead). Uses the same |
| 40 | + * Spotify audio features dataset as the C++ benchmark for direct comparison. |
| 41 | + * |
| 42 | + * <p>The CSV source has 15K rows per column; values are tiled to 1M for stable |
| 43 | + * measurement. |
36 | 44 | */ |
37 | 45 | public class AlpCodecThroughput { |
38 | 46 |
|
39 | | - private static final int NUM_VALUES = 1_000_000; |
| 47 | + private static final int TARGET_VALUES = 1_000_000; |
40 | 48 | private static final int WARMUP = 10; |
41 | 49 | private static final int MEASURED = 30; |
42 | 50 |
|
43 | | - // Datasets |
44 | | - private static double[] doubleDecimal; |
45 | | - private static double[] doubleInteger; |
46 | | - private static double[] doubleMixed; |
47 | | - private static float[] floatDecimal; |
48 | | - private static float[] floatInteger; |
49 | | - private static float[] floatMixed; |
50 | | - |
51 | | - // Pre-compressed |
52 | | - private static byte[] doubleDecimalComp; |
53 | | - private static byte[] doubleIntegerComp; |
54 | | - private static byte[] doubleMixedComp; |
55 | | - private static byte[] floatDecimalComp; |
56 | | - private static byte[] floatIntegerComp; |
57 | | - private static byte[] floatMixedComp; |
| 51 | + private static final String DOUBLE_CSV = "alp_spotify1_expect.csv"; |
| 52 | + private static final String FLOAT_CSV = "alp_float_spotify1_expect.csv"; |
| 53 | + |
| 54 | + // Spotify column names matching C++ benchmark |
| 55 | + private static final String[] COLUMNS = { |
| 56 | + "valence", "danceability", "energy", "loudness", "speechiness", |
| 57 | + "acousticness", "instrumentalness", "liveness", "tempo" |
| 58 | + }; |
| 59 | + |
| 60 | + private static double[][] doubleColumns; |
| 61 | + private static float[][] floatColumns; |
| 62 | + private static byte[][] doubleCompressed; |
| 63 | + private static byte[][] floatCompressed; |
58 | 64 |
|
59 | 65 | @BeforeClass |
60 | 66 | public static void setup() throws IOException { |
61 | | - Random rng = new Random(42); |
62 | | - |
63 | | - doubleDecimal = new double[NUM_VALUES]; |
64 | | - for (int i = 0; i < NUM_VALUES; i++) { |
65 | | - doubleDecimal[i] = Math.round(rng.nextDouble() * 10000) / 100.0; |
| 67 | + // Load double columns from Spotify CSV |
| 68 | + double[][] rawDoubles = loadDoubleCsv(DOUBLE_CSV); |
| 69 | + doubleColumns = new double[rawDoubles.length][]; |
| 70 | + doubleCompressed = new byte[rawDoubles.length][]; |
| 71 | + for (int c = 0; c < rawDoubles.length; c++) { |
| 72 | + doubleColumns[c] = tile(rawDoubles[c], TARGET_VALUES); |
| 73 | + doubleCompressed[c] = compressDoubles(doubleColumns[c]); |
66 | 74 | } |
67 | 75 |
|
68 | | - doubleInteger = new double[NUM_VALUES]; |
69 | | - for (int i = 0; i < NUM_VALUES; i++) { |
70 | | - doubleInteger[i] = (double) rng.nextInt(100000); |
| 76 | + // Load float columns from Spotify CSV |
| 77 | + float[][] rawFloats = loadFloatCsv(FLOAT_CSV); |
| 78 | + floatColumns = new float[rawFloats.length][]; |
| 79 | + floatCompressed = new byte[rawFloats.length][]; |
| 80 | + for (int c = 0; c < rawFloats.length; c++) { |
| 81 | + floatColumns[c] = tile(rawFloats[c], TARGET_VALUES); |
| 82 | + floatCompressed[c] = compressFloats(floatColumns[c]); |
71 | 83 | } |
| 84 | + } |
72 | 85 |
|
73 | | - doubleMixed = new double[NUM_VALUES]; |
74 | | - for (int i = 0; i < NUM_VALUES; i++) { |
75 | | - doubleMixed[i] = Math.round(rng.nextDouble() * 10000) / 100.0; |
76 | | - } |
77 | | - for (int i = 0; i < NUM_VALUES; i += 50) { |
78 | | - doubleMixed[i] = Double.NaN; |
79 | | - } |
| 86 | + @Test |
| 87 | + public void measureThroughput() throws IOException { |
| 88 | + System.out.println(); |
| 89 | + System.out.printf("=== ALP Codec-Level Throughput (%dK values, Spotify dataset) ===%n", |
| 90 | + TARGET_VALUES / 1000); |
| 91 | + System.out.println(); |
80 | 92 |
|
81 | | - floatDecimal = new float[NUM_VALUES]; |
82 | | - for (int i = 0; i < NUM_VALUES; i++) { |
83 | | - floatDecimal[i] = Math.round(rng.nextFloat() * 10000) / 100.0f; |
| 93 | + // Double columns |
| 94 | + System.out.printf("%-30s %10s %10s %10s %10s%n", |
| 95 | + "Double Column", "Enc MB/s", "Dec MB/s", "Raw KB", "Comp KB"); |
| 96 | + System.out.println("------------------------------" |
| 97 | + + " ---------- ---------- ---------- ----------"); |
| 98 | + for (int c = 0; c < doubleColumns.length; c++) { |
| 99 | + benchDouble(COLUMNS[c], doubleColumns[c], doubleCompressed[c]); |
84 | 100 | } |
85 | 101 |
|
86 | | - floatInteger = new float[NUM_VALUES]; |
87 | | - for (int i = 0; i < NUM_VALUES; i++) { |
88 | | - floatInteger[i] = (float) rng.nextInt(100000); |
89 | | - } |
| 102 | + System.out.println(); |
90 | 103 |
|
91 | | - floatMixed = new float[NUM_VALUES]; |
92 | | - for (int i = 0; i < NUM_VALUES; i++) { |
93 | | - floatMixed[i] = Math.round(rng.nextFloat() * 10000) / 100.0f; |
| 104 | + // Float columns |
| 105 | + System.out.printf("%-30s %10s %10s %10s %10s%n", |
| 106 | + "Float Column", "Enc MB/s", "Dec MB/s", "Raw KB", "Comp KB"); |
| 107 | + System.out.println("------------------------------" |
| 108 | + + " ---------- ---------- ---------- ----------"); |
| 109 | + for (int c = 0; c < floatColumns.length; c++) { |
| 110 | + benchFloat(COLUMNS[c], floatColumns[c], floatCompressed[c]); |
94 | 111 | } |
95 | | - for (int i = 0; i < NUM_VALUES; i += 50) { |
96 | | - floatMixed[i] = Float.NaN; |
| 112 | + |
| 113 | + System.out.println(); |
| 114 | + } |
| 115 | + |
| 116 | + // ========== CSV loading ========== |
| 117 | + |
| 118 | + private static double[][] loadDoubleCsv(String resource) throws IOException { |
| 119 | + try (InputStream is = AlpCodecThroughput.class.getClassLoader().getResourceAsStream(resource)) { |
| 120 | + if (is == null) { |
| 121 | + throw new IOException("Resource not found: " + resource); |
| 122 | + } |
| 123 | + BufferedReader br = new BufferedReader(new InputStreamReader(is)); |
| 124 | + String header = br.readLine(); |
| 125 | + int numCols = header.split(",").length; |
| 126 | + |
| 127 | + List<double[]> rows = new ArrayList<>(); |
| 128 | + String line; |
| 129 | + while ((line = br.readLine()) != null) { |
| 130 | + String[] parts = line.split(","); |
| 131 | + double[] row = new double[numCols]; |
| 132 | + for (int i = 0; i < numCols; i++) { |
| 133 | + row[i] = Double.parseDouble(parts[i]); |
| 134 | + } |
| 135 | + rows.add(row); |
| 136 | + } |
| 137 | + |
| 138 | + // Transpose: rows -> columns |
| 139 | + double[][] columns = new double[numCols][rows.size()]; |
| 140 | + for (int r = 0; r < rows.size(); r++) { |
| 141 | + double[] row = rows.get(r); |
| 142 | + for (int c = 0; c < numCols; c++) { |
| 143 | + columns[c][r] = row[c]; |
| 144 | + } |
| 145 | + } |
| 146 | + return columns; |
97 | 147 | } |
| 148 | + } |
98 | 149 |
|
99 | | - doubleDecimalComp = compressDoubles(doubleDecimal); |
100 | | - doubleIntegerComp = compressDoubles(doubleInteger); |
101 | | - doubleMixedComp = compressDoubles(doubleMixed); |
102 | | - floatDecimalComp = compressFloats(floatDecimal); |
103 | | - floatIntegerComp = compressFloats(floatInteger); |
104 | | - floatMixedComp = compressFloats(floatMixed); |
| 150 | + private static float[][] loadFloatCsv(String resource) throws IOException { |
| 151 | + try (InputStream is = AlpCodecThroughput.class.getClassLoader().getResourceAsStream(resource)) { |
| 152 | + if (is == null) { |
| 153 | + throw new IOException("Resource not found: " + resource); |
| 154 | + } |
| 155 | + BufferedReader br = new BufferedReader(new InputStreamReader(is)); |
| 156 | + String header = br.readLine(); |
| 157 | + int numCols = header.split(",").length; |
| 158 | + |
| 159 | + List<float[]> rows = new ArrayList<>(); |
| 160 | + String line; |
| 161 | + while ((line = br.readLine()) != null) { |
| 162 | + String[] parts = line.split(","); |
| 163 | + float[] row = new float[numCols]; |
| 164 | + for (int i = 0; i < numCols; i++) { |
| 165 | + row[i] = Float.parseFloat(parts[i]); |
| 166 | + } |
| 167 | + rows.add(row); |
| 168 | + } |
| 169 | + |
| 170 | + float[][] columns = new float[numCols][rows.size()]; |
| 171 | + for (int r = 0; r < rows.size(); r++) { |
| 172 | + float[] row = rows.get(r); |
| 173 | + for (int c = 0; c < numCols; c++) { |
| 174 | + columns[c][r] = row[c]; |
| 175 | + } |
| 176 | + } |
| 177 | + return columns; |
| 178 | + } |
105 | 179 | } |
106 | 180 |
|
107 | | - @Test |
108 | | - public void measureThroughput() throws IOException { |
109 | | - System.out.println(); |
110 | | - System.out.println("=== ALP Codec-Level Throughput (1M values) ==="); |
111 | | - System.out.printf("%-30s %10s %10s %10s %10s%n", |
112 | | - "Dataset", "Enc MB/s", "Dec MB/s", "Raw KB", "Comp KB"); |
113 | | - System.out.println("------------------------------" + |
114 | | - " ---------- ---------- ---------- ----------"); |
| 181 | + // ========== Tiling ========== |
115 | 182 |
|
116 | | - benchDouble("double_decimal", doubleDecimal, doubleDecimalComp); |
117 | | - benchDouble("double_integer", doubleInteger, doubleIntegerComp); |
118 | | - benchDouble("double_mixed(2%exc)", doubleMixed, doubleMixedComp); |
119 | | - benchFloat("float_decimal", floatDecimal, floatDecimalComp); |
120 | | - benchFloat("float_integer", floatInteger, floatIntegerComp); |
121 | | - benchFloat("float_mixed(2%exc)", floatMixed, floatMixedComp); |
| 183 | + private static double[] tile(double[] source, int targetSize) { |
| 184 | + double[] result = new double[targetSize]; |
| 185 | + for (int i = 0; i < targetSize; i++) { |
| 186 | + result[i] = source[i % source.length]; |
| 187 | + } |
| 188 | + return result; |
| 189 | + } |
122 | 190 |
|
123 | | - System.out.println(); |
| 191 | + private static float[] tile(float[] source, int targetSize) { |
| 192 | + float[] result = new float[targetSize]; |
| 193 | + for (int i = 0; i < targetSize; i++) { |
| 194 | + result[i] = source[i % source.length]; |
| 195 | + } |
| 196 | + return result; |
124 | 197 | } |
125 | 198 |
|
| 199 | + // ========== Benchmark methods ========== |
| 200 | + |
126 | 201 | private void benchDouble(String name, double[] data, byte[] compressed) throws IOException { |
127 | 202 | long rawBytes = (long) data.length * Double.BYTES; |
128 | 203 |
|
129 | | - // Warmup encode |
130 | 204 | for (int i = 0; i < WARMUP; i++) { |
131 | 205 | compressDoubles(data); |
132 | 206 | } |
133 | | - // Measure encode |
134 | 207 | long encNanos = 0; |
135 | 208 | for (int i = 0; i < MEASURED; i++) { |
136 | 209 | long t0 = System.nanoTime(); |
137 | 210 | compressDoubles(data); |
138 | 211 | encNanos += System.nanoTime() - t0; |
139 | 212 | } |
140 | 213 |
|
141 | | - // Warmup decode |
142 | 214 | for (int i = 0; i < WARMUP; i++) { |
143 | 215 | decompressDoubles(compressed, data.length); |
144 | 216 | } |
145 | | - // Measure decode |
146 | 217 | long decNanos = 0; |
147 | 218 | for (int i = 0; i < MEASURED; i++) { |
148 | 219 | long t0 = System.nanoTime(); |
@@ -187,6 +258,8 @@ private void benchFloat(String name, float[] data, byte[] compressed) throws IOE |
187 | 258 | name, encMBps, decMBps, rawBytes / 1024, compressed.length / 1024); |
188 | 259 | } |
189 | 260 |
|
| 261 | + // ========== Compress / Decompress ========== |
| 262 | + |
190 | 263 | private static byte[] compressDoubles(double[] values) throws IOException { |
191 | 264 | AlpValuesWriter.DoubleAlpValuesWriter writer = new AlpValuesWriter.DoubleAlpValuesWriter(); |
192 | 265 | for (double v : values) { |
|
0 commit comments