@@ -999,7 +999,27 @@ private class JsonValueRecord implements CassandraRecord {
999999 @ Override
10001000 public byte [] getValue () {
10011001 try {
1002- return (byte []) kvRecord .getValue ().getValue ();
1002+ Object value = kvRecord .getValue ().getValue ();
1003+ // Handle both byte[] (from NativeConverter) and GenericRecord (from GenericConverter)
1004+ if (value instanceof byte []) {
1005+ return (byte []) value ;
1006+ } else if (value instanceof org .apache .pulsar .client .api .schema .GenericRecord ) {
1007+ // GenericRecord from Pulsar - need to serialize to bytes
1008+ org .apache .pulsar .client .api .schema .GenericRecord genericRecord =
1009+ (org .apache .pulsar .client .api .schema .GenericRecord ) value ;
1010+
1011+ // Use the converter to serialize the GenericRecord to bytes
1012+ // The converter's fromConnectData method handles the serialization
1013+ @ SuppressWarnings ("unchecked" )
1014+ Converter <byte [], org .apache .pulsar .client .api .schema .GenericRecord , ?, byte []> converter =
1015+ (Converter <byte [], org .apache .pulsar .client .api .schema .GenericRecord , ?, byte []>)
1016+ kvRecord .converterAndQueryFinal .converter ;
1017+ return converter .fromConnectData (genericRecord );
1018+ } else {
1019+ throw new IllegalStateException ("Unexpected value type: " +
1020+ (value != null ? value .getClass ().getName () : "null" ) +
1021+ ". Expected byte[] or GenericRecord" );
1022+ }
10031023 } catch (Exception err ) {
10041024 throw new RuntimeException (err );
10051025 }
@@ -1013,12 +1033,30 @@ public Schema getSchema() {
10131033 @ Override
10141034 public Optional <String > getKey () {
10151035 Object key = kvRecord .getValue ().getKey ();
1016- if (!(key instanceof byte [])) {
1017- throw new IllegalStateException ("Invalid key type " + key .getClass ().getName ());
1036+ // Handle both byte[] and GenericRecord for keys
1037+ if (key instanceof byte []) {
1038+ // returns a json string in plain text. E.g.: key:[{"a":"38878"}]
1039+ return Optional .of (new String ((byte [])key , StandardCharsets .UTF_8 ));
1040+ } else if (key instanceof org .apache .pulsar .client .api .schema .GenericRecord ) {
1041+ // GenericRecord from Pulsar - need to serialize to bytes
1042+ org .apache .pulsar .client .api .schema .GenericRecord genericRecord =
1043+ (org .apache .pulsar .client .api .schema .GenericRecord ) key ;
1044+
1045+ try {
1046+ // Use the key converter to serialize the GenericRecord to bytes
1047+ @ SuppressWarnings ("unchecked" )
1048+ Converter <byte [], org .apache .pulsar .client .api .schema .GenericRecord , ?, byte []> converter =
1049+ (Converter <byte [], org .apache .pulsar .client .api .schema .GenericRecord , ?, byte []>) keyConverter ;
1050+ byte [] keyBytes = converter .fromConnectData (genericRecord );
1051+ return Optional .of (new String (keyBytes , StandardCharsets .UTF_8 ));
1052+ } catch (Exception e ) {
1053+ throw new RuntimeException ("Failed to serialize key" , e );
1054+ }
1055+ } else {
1056+ throw new IllegalStateException ("Invalid key type: " +
1057+ (key != null ? key .getClass ().getName () : "null" ) +
1058+ ". Expected byte[] or GenericRecord" );
10181059 }
1019-
1020- // returns a json string in plain text. E.g.: key:[{"a":"38878"}]
1021- return Optional .of (new String ((byte [])key , StandardCharsets .UTF_8 ));
10221060 }
10231061
10241062 @ Override
0 commit comments