@@ -73,6 +73,7 @@ public class KafkaBufferIT {
7373 private PluginSetting pluginSetting ;
7474
7575 private KafkaBufferConfig kafkaBufferConfig ;
76+ private KafkaBufferConfig kafkaBufferCompressionConfig ;
7677 @ Mock
7778 private AcknowledgementSetManager acknowledgementSetManager ;
7879 @ Mock
@@ -111,6 +112,13 @@ void setUp() {
111112
112113 topicConfig = objectMapper .convertValue (topicConfigMap , BufferTopicConfig .class );
113114
115+ final Map <String , Object > topicConfigMapCompression = Map .of (
116+ "name" , topicName ,
117+ "group_id" , "buffergroup-" + RandomStringUtils .randomAlphabetic (6 ),
118+ "create_topic" , true ,
119+ "encryption_key" , "6fib8P/ML7Lh7lUEHCFYCt+bschigjNwmEZUctkP5dw=" // sample key
120+ );
121+
114122 bootstrapServersCommaDelimited = System .getProperty ("tests.kafka.bootstrap_servers" );
115123
116124 LOG .info ("Using Kafka bootstrap servers: {}" , bootstrapServersCommaDelimited );
@@ -122,6 +130,15 @@ void setUp() {
122130 );
123131 kafkaBufferConfig = objectMapper .convertValue (bufferConfigMap , KafkaBufferConfig .class );
124132
133+ final Map <String , Object > bufferCompressionConfigMap = Map .of (
134+ "topics" , List .of (topicConfigMapCompression ),
135+ "bootstrap_servers" , List .of (bootstrapServersCommaDelimited ),
136+ "encryption" , Map .of ("type" , "none" ),
137+ "producer_properties" , Map .of ("compression_type" , "zstd" )
138+ );
139+
140+ kafkaBufferCompressionConfig = objectMapper .convertValue (bufferCompressionConfigMap , KafkaBufferConfig .class );
141+
125142 byteDecoder = null ;
126143 }
127144
@@ -133,6 +150,10 @@ private KafkaBuffer createObjectUnderTest() {
133150 return new KafkaBuffer (pluginSetting , kafkaBufferConfig , acknowledgementSetManager , null , null , null , encryptionSupplier );
134151 }
135152
153+ private KafkaBuffer createObjectUnderTestWithCompression () {
154+ return new KafkaBuffer (pluginSetting , kafkaBufferCompressionConfig , acknowledgementSetManager , null , null , null , encryptionSupplier );
155+ }
156+
136157 @ Test
137158 void write_and_read () throws TimeoutException {
138159 KafkaBuffer objectUnderTest = createObjectUnderTest ();
@@ -160,6 +181,33 @@ void write_and_read() throws TimeoutException {
160181 assertThat (objectUnderTest .getInnerBufferRecordsInFlight (), equalTo (0 ));
161182 }
162183
184+ @ Test
185+ void write_and_read_compression () throws TimeoutException {
186+ KafkaBuffer objectUnderTest = createObjectUnderTestWithCompression ();
187+
188+ Record <Event > record = createRecord ();
189+ objectUnderTest .write (record , 1_000 );
190+
191+ final Map .Entry <Collection <Record <Event >>, CheckpointState > readResult = awaitRead (objectUnderTest );
192+
193+ assertThat (readResult , notNullValue ());
194+ assertThat (readResult .getKey (), notNullValue ());
195+ assertThat (readResult .getKey ().size (), equalTo (1 ));
196+
197+ Record <Event > onlyResult = readResult .getKey ().stream ().iterator ().next ();
198+
199+ assertThat (onlyResult , notNullValue ());
200+ assertThat (onlyResult .getData (), notNullValue ());
201+ // TODO: The metadata is not included. It needs to be included in the Buffer, though not in the Sink. This may be something we make configurable in the consumer/producer - whether to serialize the metadata or not.
202+ //assertThat(onlyResult.getData().getMetadata(), equalTo(record.getData().getMetadata()));
203+ assertThat (onlyResult .getData ().toMap (), equalTo (record .getData ().toMap ()));
204+ assertThat (objectUnderTest .getRecordsInFlight (), equalTo (0 ));
205+ assertThat (objectUnderTest .getInnerBufferRecordsInFlight (), equalTo (1 ));
206+ objectUnderTest .checkpoint (readResult .getValue ());
207+ assertThat (objectUnderTest .getRecordsInFlight (), equalTo (0 ));
208+ assertThat (objectUnderTest .getInnerBufferRecordsInFlight (), equalTo (0 ));
209+ }
210+
163211 @ Test
164212 void write_and_read_max_request_test () throws TimeoutException , NoSuchFieldException , IllegalAccessException {
165213 KafkaProducerProperties kafkaProducerProperties = new KafkaProducerProperties ();
@@ -286,6 +334,32 @@ void writeBytes_and_read() throws Exception {
286334 assertThat (onlyResult .getData ().toMap (), equalTo (inputDataMap ));
287335 }
288336
337+ @ Test
338+ void writeBytes_and_read_with_compression () throws Exception {
339+ byteDecoder = new JsonDecoder ();
340+
341+ final KafkaBuffer objectUnderTest = createObjectUnderTestWithCompression ();
342+
343+ final Map <String , String > inputDataMap = Map .of (UUID .randomUUID ().toString (), UUID .randomUUID ().toString (), UUID .randomUUID ().toString (), UUID .randomUUID ().toString ());
344+ final byte [] bytes = objectMapper .writeValueAsBytes (inputDataMap );
345+ final String key = UUID .randomUUID ().toString ();
346+ objectUnderTest .writeBytes (bytes , key , 1_000 );
347+
348+ final Map .Entry <Collection <Record <Event >>, CheckpointState > readResult = awaitRead (objectUnderTest );
349+
350+ assertThat (readResult , notNullValue ());
351+ assertThat (readResult .getKey (), notNullValue ());
352+ assertThat (readResult .getKey ().size (), equalTo (1 ));
353+
354+ Record <Event > onlyResult = readResult .getKey ().stream ().iterator ().next ();
355+
356+ assertThat (onlyResult , notNullValue ());
357+ assertThat (onlyResult .getData (), notNullValue ());
358+ // TODO: The metadata is not included. It needs to be included in the Buffer, though not in the Sink. This may be something we make configurable in the consumer/producer - whether to serialize the metadata or not.
359+ //assertThat(onlyResult.getData().getMetadata(), equalTo(record.getData().getMetadata()));
360+ assertThat (onlyResult .getData ().toMap (), equalTo (inputDataMap ));
361+ }
362+
289363 @ Test
290364 void write_puts_correctly_formatted_data_in_protobuf_wrapper () throws TimeoutException , IOException {
291365 final KafkaBuffer objectUnderTest = createObjectUnderTest ();
@@ -371,6 +445,31 @@ class Encrypted {
371445
372446 @ BeforeEach
373447 void setUp () throws NoSuchAlgorithmException , InvalidKeyException , NoSuchPaddingException {
448+ random = new Random ();
449+ acknowledgementSetManager = mock (AcknowledgementSetManager .class );
450+ acknowledgementSet = mock (AcknowledgementSet .class );
451+ lenient ().doAnswer ((a ) -> null ).when (acknowledgementSet ).complete ();
452+ lenient ().when (acknowledgementSetManager .create (any (), any (Duration .class ))).thenReturn (acknowledgementSet );
453+ objectMapper = new ObjectMapper ().registerModule (new JavaTimeModule ());
454+
455+ when (pluginSetting .getPipelineName ()).thenReturn (UUID .randomUUID ().toString ());
456+
457+ topicName = "buffer-" + RandomStringUtils .randomAlphabetic (5 );
458+
459+ Map <String , Object > topicConfigMap = new java .util .HashMap <>(Map .of (
460+ "name" , topicName ,
461+ "group_id" , "buffergroup-" + RandomStringUtils .randomAlphabetic (6 ),
462+ "create_topic" , true
463+ ));
464+
465+ topicConfig = objectMapper .convertValue (topicConfigMap , BufferTopicConfig .class );
466+
467+ final Map <String , Object > bufferConfigMap = new java .util .HashMap <>(Map .of (
468+ "topics" , List .of (topicConfigMap ),
469+ "bootstrap_servers" , List .of (bootstrapServersCommaDelimited ),
470+ "encryption" , Map .of ("type" , "none" )
471+ ));
472+
374473 final KeyGenerator aesKeyGenerator = KeyGenerator .getInstance ("AES" );
375474 aesKeyGenerator .init (256 );
376475 final SecretKey secretKey = aesKeyGenerator .generateKey ();
@@ -382,11 +481,11 @@ void setUp() throws NoSuchAlgorithmException, InvalidKeyException, NoSuchPadding
382481 final byte [] base64Bytes = Base64 .getEncoder ().encode (secretKey .getEncoded ());
383482 aesKey = new String (base64Bytes );
384483
385- final Map <String , Object > topicConfigMap = objectMapper .convertValue (topicConfig , Map .class );
386484 topicConfigMap .put ("encryption_key" , aesKey );
387- final Map <String , Object > bufferConfigMap = objectMapper .convertValue (kafkaBufferConfig , Map .class );
388485 bufferConfigMap .put ("topics" , List .of (topicConfigMap ));
389486 kafkaBufferConfig = objectMapper .convertValue (bufferConfigMap , KafkaBufferConfig .class );
487+
488+ byteDecoder = null ;
390489 }
391490
392491 @ Test
0 commit comments