Skip to content

Commit 1664b2b

Browse files
authored
CASSANALYTICS-36: Bulk Reader should dynamically size the Spark job based on estimated table size (#118)
Patch by Francisco Guerrero; reviewed by Doug Rohrer, Yifan Cai for CASSANALYTICS-36
1 parent a85ff02 commit 1664b2b

10 files changed

Lines changed: 531 additions & 6 deletions

File tree

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
1.0.0
2+
* Bulk Reader should dynamically size the Spark job based on estimated table size (CASSANALYTICS-36)
23
* Allow getting cassandra role in Spark options for use in Sidecar requests for RBAC (CASSANALYTICS-61)
34
* Fix NPE in the deserialized CassandraClusterInfoGroup (CASSANALYTICS-59)
45
* Replace NotImplementedException with UnsupportedOperationException in SparkType (CASSANALYTICS-55)
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.cassandra.spark.data;
21+
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import org.apache.cassandra.spark.data.partitioner.ConsistencyLevel;
26+
27+
/**
28+
* Dynamic {@link Sizing} implementation that uses table size, minimum number of replicas, maximum partition size,
29+
* and available Spark cores to determine the effective number of executor cores to use during the spark job execution.
30+
*
31+
* <p>This class is typically used when the table size is relatively small (few GBs). When reading small datasets,
32+
* this class will allocate a limited number of resources to read the table. This in turn helps reduce the cost of
33+
* coordinating a large number of executor cores when the dataset does not justify using the entire spark cluster
34+
* for reading.
35+
*/
36+
public class DynamicSizing implements Sizing
37+
{
38+
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicSizing.class);
39+
40+
private final ReplicationFactor replicationFactor;
41+
private final int maxPartitionSize;
42+
private final int availableCores;
43+
private final String keyspace;
44+
private final String table;
45+
private final String dc;
46+
private final TableSizeProvider tableSizeProvider;
47+
private final ConsistencyLevel consistencyLevel;
48+
49+
/**
50+
* Constructs a new Sizing object.
51+
*
52+
* @param tableSizeProvider the table size provider
53+
* @param consistencyLevel the consistency level for the read operation
54+
* @param replicationFactor the replication factor for the keyspace
55+
* @param keyspace the Cassandra keyspace
56+
* @param table the Cassandra table
57+
* @param datacenter the Cassandra datacenter
58+
* @param maxPartitionSize the maximum partition size desired
59+
* @param availableCores the maximum number of cores available
60+
*/
61+
public DynamicSizing(TableSizeProvider tableSizeProvider,
62+
ConsistencyLevel consistencyLevel,
63+
ReplicationFactor replicationFactor,
64+
String keyspace,
65+
String table,
66+
String datacenter,
67+
int maxPartitionSize,
68+
int availableCores)
69+
{
70+
this.tableSizeProvider = tableSizeProvider;
71+
this.consistencyLevel = consistencyLevel;
72+
this.replicationFactor = replicationFactor;
73+
this.keyspace = keyspace;
74+
this.table = table;
75+
this.dc = datacenter;
76+
this.maxPartitionSize = maxPartitionSize;
77+
this.availableCores = availableCores;
78+
}
79+
80+
/**
81+
* Returns the effective number of cores to be used during the spark execution.
82+
* The value is calculated by getting the table size * the number of replicas
83+
* we will use to read the data and then dividing it by the maximum partition
84+
* size in GB. For example, assume we have a table with 7.25 GB of data, and
85+
* assume a maximum partition size of 2.5 GB. Also, assume that a consistency
86+
* level of {@code LOCAL_QUORUM} and replication factor of 3. The number of
87+
* cores is calculated by the following formula:
88+
*
89+
* <pre>
90+
* totalTableSize * minReplicas
91+
* effectiveNumberOfCores = Math.ceil( --------------------------------- )
92+
* maxPartitionSize
93+
* </pre>
94+
*
95+
* <p>In the example above, we have:
96+
*
97+
* <pre>
98+
* 7.25 GB * 2
99+
* effectiveNumberOfCores = --------------- = 5.8 ~&gt; 6 cores
100+
* 2.5 GB
101+
* </pre>
102+
*
103+
* <p>This method is guaranteed to return at least 1 core and at most {@code availableCores}
104+
*
105+
* @return the effective number of cores to be used during the spark execution
106+
*/
107+
@Override
108+
public int getEffectiveNumberOfCores()
109+
{
110+
double tableSizeInGiB = ((double) tableSizeProvider.tableSizeInBytes(keyspace, table, dc)
111+
/ (double) (1024 /* KiB */ * 1024 /* MiB */ * 1024 /* GiB */));
112+
double minReplicas = consistencyLevel.blockFor(replicationFactor, dc);
113+
114+
// Guarantee at least one core and at most availableCores
115+
int effectiveNumberOfCores = Math.min(Math.max(1, (int) Math.ceil(tableSizeInGiB * minReplicas / maxPartitionSize)), availableCores);
116+
117+
LOGGER.info("Using Dynamic Sizing. tableSize {}GiB, minReplicas {}, maxPartitionSize {}GiB, availableCores {}, effectiveNumberOfCores {}",
118+
tableSizeInGiB, minReplicas, maxPartitionSize, availableCores, effectiveNumberOfCores);
119+
120+
return effectiveNumberOfCores;
121+
}
122+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.cassandra.spark.data;
21+
22+
/**
23+
* Defines an interface to provide the size of a table in a given keyspace
24+
*/
25+
public interface TableSizeProvider
26+
{
27+
/**
28+
* Returns the total used space for {@code table} across the datacenter.
29+
*
30+
* @param keyspace the keyspace where the table lives
31+
* @param table the table to get the size from
32+
* @param datacenter the datacenter
33+
* @return the total used space for {@code table} across the datacenter
34+
*/
35+
long tableSizeInBytes(String keyspace, String table, String datacenter);
36+
}

cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/DefaultSizingTest.java renamed to cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/DefaultSizingTest.java

File renamed without changes.
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.cassandra.spark.data;
21+
22+
import java.util.Map;
23+
import java.util.stream.Stream;
24+
25+
import org.junit.jupiter.params.ParameterizedTest;
26+
import org.junit.jupiter.params.provider.Arguments;
27+
import org.junit.jupiter.params.provider.MethodSource;
28+
29+
import org.apache.cassandra.spark.data.partitioner.ConsistencyLevel;
30+
31+
import static org.assertj.core.api.Assertions.assertThat;
32+
33+
/**
34+
* Unit tests for the {@link DynamicSizing} class
35+
*/
36+
class DynamicSizingTest
37+
{
38+
public static final long TEN_GIB = 10L * 1024L * 1024L * 1024L;
39+
private static final ReplicationFactor RF = new ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
40+
Map.of("datacenter1", 3));
41+
42+
@ParameterizedTest
43+
@MethodSource("scenarios")
44+
void testSizingScenario(SizingScenario scenario)
45+
{
46+
TableSizeProvider tableSizeProvider = (keyspace, table, datacenter) -> scenario.tableSizeInBytes;
47+
Sizing sizing = new DynamicSizing(tableSizeProvider,
48+
ConsistencyLevel.LOCAL_QUORUM,
49+
RF,
50+
"big-data",
51+
"customers",
52+
"datacenter1",
53+
scenario.maxPartitionSize,
54+
scenario.numCores);
55+
assertThat(sizing.getEffectiveNumberOfCores()).as("Number of cores does not match").isEqualTo(scenario.expectedNumberOfCores);
56+
}
57+
58+
static Stream<Arguments> scenarios()
59+
{
60+
return Stream.of(
61+
Arguments.arguments(new SizingScenario(1000, 5, TEN_GIB, 4)),
62+
Arguments.arguments(new SizingScenario(1000, 1, TEN_GIB, 20)),
63+
Arguments.arguments(new SizingScenario(1000, 1, TEN_GIB, 20)),
64+
Arguments.arguments(new SizingScenario(1000, 5, TEN_GIB, 4)),
65+
Arguments.arguments(new SizingScenario(1000, 5, TEN_GIB, 4))
66+
);
67+
}
68+
69+
static class SizingScenario
70+
{
71+
private final int numCores;
72+
private final int maxPartitionSize;
73+
private final long tableSizeInBytes;
74+
private final int expectedNumberOfCores;
75+
76+
SizingScenario(int numCores, int maxPartitionSize, long tableSizeInBytes, int expectedNumberOfCores)
77+
{
78+
this.numCores = numCores;
79+
this.maxPartitionSize = maxPartitionSize;
80+
this.tableSizeInBytes = tableSizeInBytes;
81+
this.expectedNumberOfCores = expectedNumberOfCores;
82+
}
83+
84+
@Override
85+
public String toString()
86+
{
87+
return "Scenario{" +
88+
"numCores=" + numCores +
89+
", maxPartitionSize=" + maxPartitionSize +
90+
", tableSizeInBytes=" + tableSizeInBytes +
91+
", expectedNumberOfCores=" + expectedNumberOfCores +
92+
'}';
93+
}
94+
}
95+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.cassandra.spark.common;
21+
22+
import java.util.concurrent.CompletableFuture;
23+
24+
import o.a.c.sidecar.client.shaded.common.response.RingResponse;
25+
import org.apache.cassandra.clients.Sidecar;
26+
import org.apache.cassandra.sidecar.client.SidecarClient;
27+
import org.apache.cassandra.spark.data.ClientConfig;
28+
import org.apache.cassandra.spark.data.DefaultSizing;
29+
import org.apache.cassandra.spark.data.DynamicSizing;
30+
import org.apache.cassandra.spark.data.ReplicationFactor;
31+
import org.apache.cassandra.spark.data.SidecarTableSizeProvider;
32+
import org.apache.cassandra.spark.data.Sizing;
33+
import org.apache.cassandra.spark.data.TableSizeProvider;
34+
import org.apache.cassandra.spark.data.partitioner.ConsistencyLevel;
35+
36+
import static org.apache.cassandra.spark.data.ClientConfig.SIZING_DEFAULT;
37+
import static org.apache.cassandra.spark.data.ClientConfig.SIZING_DYNAMIC;
38+
39+
/**
40+
* A factory class that creates {@link Sizing} based on the client-supplied configuration
41+
*/
42+
public class SizingFactory
43+
{
44+
/**
45+
* Private constructor that prevents unnecessary instantiation
46+
*
47+
* @throws IllegalStateException when called
48+
*/
49+
private SizingFactory()
50+
{
51+
throw new IllegalStateException(getClass() + " is a static utility class and shall not be instantiated");
52+
}
53+
54+
/**
55+
* Returns the {@link Sizing} object based on the {@code sizing} option provided by the user,
56+
* or {@link DefaultSizing} as the default sizing
57+
*
58+
* @param replicationFactor the replication factor
59+
* @param options the {@link ClientConfig} options
60+
* @param consistencyLevel the ConsistencyLevel to use
61+
* @param keyspace the keyspace
62+
* @param table the table
63+
* @param datacenter the DataCenter to use
64+
* @param sidecarClient the sidecar client instance to use
65+
* @param sidecarClientConfig the configuration to use with the sidecar client
66+
* @param ringFuture a future representing the result of getting the current ring from the sidecar
67+
* @return the {@link Sizing} object based on the {@code sizing} option provided by the user
68+
*/
69+
public static Sizing create(ReplicationFactor replicationFactor,
70+
ClientConfig options,
71+
ConsistencyLevel consistencyLevel,
72+
String keyspace,
73+
String table,
74+
String datacenter,
75+
SidecarClient sidecarClient,
76+
Sidecar.ClientConfig sidecarClientConfig,
77+
CompletableFuture<RingResponse> ringFuture)
78+
{
79+
if (SIZING_DYNAMIC.equalsIgnoreCase(options.sizing()))
80+
{
81+
TableSizeProvider tableSizeProvider = getTableSizeProvider(sidecarClient, sidecarClientConfig, ringFuture);
82+
return new DynamicSizing(tableSizeProvider, consistencyLevel, replicationFactor,
83+
keyspace, table, datacenter,
84+
options.maxPartitionSize(), options.numCores());
85+
}
86+
else if (options.sizing() == null || options.sizing().isEmpty() || SIZING_DEFAULT.equalsIgnoreCase(options.sizing()))
87+
{
88+
return new DefaultSizing(options.numCores());
89+
}
90+
throw new RuntimeException(String.format("Invalid sizing option provided '%s'", options.sizing()));
91+
}
92+
93+
protected static TableSizeProvider getTableSizeProvider(SidecarClient sidecarClient,
94+
Sidecar.ClientConfig sidecarClientConfig,
95+
CompletableFuture<RingResponse> ringFuture)
96+
{
97+
return new SidecarTableSizeProvider(sidecarClient, sidecarClientConfig, ringFuture);
98+
}
99+
}

0 commit comments

Comments
 (0)