Skip to content

Commit 10cef1f

Browse files
committed
load all kryo versions
1 parent 683b148 commit 10cef1f

2 files changed

Lines changed: 70 additions & 12 deletions

File tree

cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/KryoRegister.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
import java.util.Arrays;
2323
import java.util.Collections;
2424
import java.util.LinkedHashMap;
25+
import java.util.LinkedHashSet;
26+
import java.util.List;
2527
import java.util.Map;
28+
import java.util.Objects;
2629
import java.util.Set;
2730
import java.util.stream.Collectors;
2831

@@ -31,7 +34,6 @@
3134

3235
import com.esotericsoftware.kryo.Kryo;
3336
import com.esotericsoftware.kryo.Serializer;
34-
import org.apache.cassandra.bridge.BaseCassandraBridgeFactory;
3537
import org.apache.cassandra.bridge.BigNumberConfigImpl;
3638
import org.apache.cassandra.bridge.CassandraBridgeFactory;
3739
import org.apache.cassandra.bridge.CassandraVersion;
@@ -142,26 +144,33 @@ public static void setup(@NotNull SparkConf configuration)
142144
LOGGER.info("Setting up Kryo");
143145
configuration.set(SPARK_SERIALIZER, "org.apache.spark.serializer.KryoSerializer");
144146

145-
// Add KryoRegister to SparkConf serialization if not already there
147+
// Preserve any pre-existing (e.g. user-supplied) registrators and keep them first.
148+
// LinkedHashSet gives a stable, predictable registration order on the driver; the same
149+
// resulting spark.kryo.registrator string is then propagated to all executors via SparkConf.
146150
Set<String> registratorsSet = Arrays.stream(configuration.get(SPARK_REGISTRATORS, "").split(","))
147151
.filter(string -> string != null && !string.isEmpty())
148-
.collect(Collectors.toSet());
149-
150-
// TODO: Find a better way to initialize Kryo serializer, instead of relaying
151-
// on Cassandra version specified as parameter of Spark job. Can we get Cassandra version from Sidecar?
152-
CassandraVersion cassandraVersion = BaseCassandraBridgeFactory.getCassandraVersion(configuration.get(CASSANDRA_VERSION, "4.0.0"));
153-
Class<?> registratorClass = KRYO_REGISTRATORS.get(cassandraVersion);
154-
if (registratorClass == null)
152+
.collect(Collectors.toCollection(LinkedHashSet::new));
153+
154+
// SSTable based bridge selection feature selects the bridge version, which may differ
155+
// from cassandra.version; registering every loadable bridge's registrator ensures Spark
156+
// can serialize objects for whichever bridge is chosen. Only implemented (bundled) versions
157+
// are used, so we never attempt to load a bridge JAR that is not available.
158+
List<Class<?>> registratorClasses = Arrays.stream(CassandraVersion.implementedVersions())
159+
.map(KRYO_REGISTRATORS::get)
160+
.filter(Objects::nonNull)
161+
.collect(Collectors.toList());
162+
if (registratorClasses.isEmpty())
155163
{
156-
throw new IllegalArgumentException("Kryo registrator not configured for Cassandra version: " + cassandraVersion);
164+
throw new IllegalStateException("No Kryo registrators configured for implemented Cassandra versions: "
165+
+ Arrays.toString(CassandraVersion.implementedVersions()));
157166
}
158167

159-
registratorsSet.add(registratorClass.getName());
168+
registratorClasses.forEach(registratorClass -> registratorsSet.add(registratorClass.getName()));
160169
String registratorsString = String.join(",", registratorsSet);
161170
LOGGER.info("Setting kryo registrators: " + registratorsString);
162171
configuration.set(SPARK_REGISTRATORS, registratorsString);
163172

164-
configuration.registerKryoClasses(new Class<?>[]{registratorClass});
173+
configuration.registerKryoClasses(registratorClasses.toArray(new Class<?>[0]));
165174
}
166175

167176
public static class V40 extends KryoRegister

cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoRegisterTest.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,14 @@
1919

2020
package org.apache.cassandra.spark;
2121

22+
import java.util.Arrays;
23+
import java.util.List;
24+
import java.util.stream.Collectors;
25+
2226
import org.junit.jupiter.api.Test;
2327

2428
import org.apache.cassandra.bridge.CassandraVersion;
29+
import org.apache.spark.SparkConf;
2530

2631
import static org.assertj.core.api.Assertions.assertThat;
2732
import static org.assertj.core.api.Assertions.assertThatNoException;
@@ -92,4 +97,48 @@ void testValidateWithNullCassandraVersionString()
9297
.describedAs("Validation should work with null cassandraVersion string")
9398
.isThrownBy(() -> KryoRegister.validateKryoRegistratorExists(CassandraVersion.FOURZERO, null));
9499
}
100+
101+
@Test
102+
void testSetupRegistersAllImplementedVersions()
103+
{
104+
SparkConf conf = new SparkConf();
105+
KryoRegister.setup(conf);
106+
107+
List<String> expected = Arrays.stream(CassandraVersion.implementedVersions())
108+
.map(KryoRegister.KRYO_REGISTRATORS::get)
109+
.map(Class::getName)
110+
.collect(Collectors.toList());
111+
112+
// setup() must register a registrator for every implemented (bundled) bridge version,
113+
// independent of spark.cassandra_analytics.cassandra.version, so serialization works for
114+
// whichever bridge the SSTable-version analyzer selects at runtime.
115+
assertThat(expected).isNotEmpty();
116+
List<String> registrators = Arrays.asList(conf.get("spark.kryo.registrator").split(","));
117+
assertThat(registrators).containsAll(expected);
118+
}
119+
120+
@Test
121+
void testSetupDoesNotDependOnCassandraVersionConfig()
122+
{
123+
// Even with a cassandra.version that differs from the bridge that may be selected,
124+
// setup() registers all implemented versions and never throws based on that config.
125+
SparkConf conf = new SparkConf()
126+
.set("spark.cassandra_analytics.cassandra.version", "5.0.0");
127+
assertThatNoException().isThrownBy(() -> KryoRegister.setup(conf));
128+
assertThat(conf.get("spark.serializer")).isEqualTo("org.apache.spark.serializer.KryoSerializer");
129+
}
130+
131+
@Test
132+
void testSetupPreservesExistingRegistrators()
133+
{
134+
SparkConf conf = new SparkConf()
135+
.set("spark.kryo.registrator", "com.example.CustomRegistrator");
136+
KryoRegister.setup(conf);
137+
138+
List<String> registrators = Arrays.asList(conf.get("spark.kryo.registrator").split(","));
139+
assertThat(registrators).contains("com.example.CustomRegistrator");
140+
assertThat(registrators.get(0))
141+
.describedAs("pre-existing registrators should be kept first (deterministic order)")
142+
.isEqualTo("com.example.CustomRegistrator");
143+
}
95144
}

0 commit comments

Comments
 (0)