1+ /*
2+ * Copyright OpenSearch Contributors
3+ * SPDX-License-Identifier: Apache-2.0
4+ *
5+ * The OpenSearch Contributors require contributions made to
6+ * this file be licensed under the Apache-2.0 license or a
7+ * compatible open source license.
8+ */
9+
10+ package org .opensearch .dataprepper .plugins .codec .json ;
11+
12+ import com .fasterxml .jackson .core .type .TypeReference ;
13+ import com .fasterxml .jackson .databind .ObjectMapper ;
14+ import org .junit .jupiter .api .BeforeEach ;
15+ import org .junit .jupiter .api .Test ;
16+ import org .junit .jupiter .api .extension .ExtendWith ;
17+ import org .junit .jupiter .params .ParameterizedTest ;
18+ import org .junit .jupiter .params .provider .ValueSource ;
19+ import org .mockito .Mock ;
20+ import org .mockito .junit .jupiter .MockitoExtension ;
21+ import org .opensearch .dataprepper .event .TestEventFactory ;
22+ import org .opensearch .dataprepper .model .codec .OutputCodec ;
23+ import org .opensearch .dataprepper .model .event .EventBuilder ;
24+ import org .opensearch .dataprepper .model .event .EventFactory ;
25+ import org .opensearch .dataprepper .model .sink .OutputCodecContext ;
26+
27+ import java .io .ByteArrayOutputStream ;
28+ import java .io .IOException ;
29+ import java .io .OutputStream ;
30+ import java .util .Arrays ;
31+ import java .util .LinkedHashMap ;
32+ import java .util .List ;
33+ import java .util .Map ;
34+ import java .util .UUID ;
35+ import java .util .stream .Collectors ;
36+ import java .util .stream .IntStream ;
37+
38+ import static org .hamcrest .CoreMatchers .equalTo ;
39+ import static org .hamcrest .MatcherAssert .assertThat ;
40+ import static org .mockito .Mockito .mock ;
41+ import static org .mockito .Mockito .verifyNoInteractions ;
42+
43+ @ ExtendWith (MockitoExtension .class )
44+ class NdjsonOutputCodecTest {
45+ @ Mock
46+ private NdjsonOutputConfig config ;
47+
48+ @ Mock
49+ private OutputCodecContext codecContext ;
50+
51+ private EventFactory eventFactory ;
52+
53+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper ();
54+ private static final TypeReference <Map <String , Object >> MAP_TYPE_REFERENCE = new TypeReference <>() {
55+ };
56+
57+ @ BeforeEach
58+ void setUp () {
59+ eventFactory = TestEventFactory .getTestEventFactory ();
60+ }
61+
62+ private NdjsonOutputCodec createObjectUnderTest () {
63+ return new NdjsonOutputCodec (config );
64+ }
65+
66+ @ Test
67+ void start_does_not_write_to_OutputStream () throws IOException {
68+ final NdjsonOutputCodec objectUnderTest = createObjectUnderTest ();
69+
70+ final OutputStream outputStream = mock (OutputStream .class );
71+
72+ objectUnderTest .start (outputStream , null , codecContext );
73+
74+ verifyNoInteractions (outputStream );
75+ }
76+
77+ @ Test
78+ void writer_does_not_write_to_OutputStream () throws IOException {
79+ final NdjsonOutputCodec objectUnderTest = createObjectUnderTest ();
80+
81+ final OutputStream outputStream = mock (OutputStream .class );
82+
83+ objectUnderTest .createWriter (outputStream , null , codecContext );
84+
85+ verifyNoInteractions (outputStream );
86+ }
87+
88+ @ Test
89+ void write_single () throws IOException {
90+ final NdjsonOutputCodec objectUnderTest = createObjectUnderTest ();
91+
92+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream ();
93+
94+ final Map <String , Object > eventMap = generateEventMap ();
95+ objectUnderTest .start (outputStream , null , codecContext );
96+ objectUnderTest .writeEvent (eventFactory .eventBuilder (EventBuilder .class ).withData (eventMap ).build (), outputStream );
97+ objectUnderTest .complete (outputStream );
98+
99+ final Map <?, ?> serializedMap = OBJECT_MAPPER .readValue (outputStream .toByteArray (), Map .class );
100+
101+ assertThat (serializedMap , equalTo (eventMap ));
102+ }
103+
104+ @ Test
105+ void write_single_using_writer () throws IOException {
106+ final NdjsonOutputCodec objectUnderTest = createObjectUnderTest ();
107+
108+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream ();
109+
110+ final Map <String , Object > eventMap = generateEventMap ();
111+ final OutputCodec .Writer writer = objectUnderTest .createWriter (outputStream , null , codecContext );
112+ writer .writeEvent (eventFactory .eventBuilder (EventBuilder .class ).withData (eventMap ).build ());
113+ writer .complete ();
114+
115+ final Map <?, ?> serializedMap = OBJECT_MAPPER .readValue (outputStream .toByteArray (), Map .class );
116+
117+ assertThat (serializedMap , equalTo (eventMap ));
118+ }
119+
120+ @ ParameterizedTest
121+ @ ValueSource (ints = {2 , 100 })
122+ void write_multiple (final int numberOfEvents ) throws IOException {
123+ final NdjsonOutputCodec objectUnderTest = createObjectUnderTest ();
124+
125+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream ();
126+
127+ final List <Map <String , Object >> eventMaps = IntStream .range (0 , numberOfEvents )
128+ .mapToObj (i -> generateEventMap ())
129+ .collect (Collectors .toList ());
130+ objectUnderTest .start (outputStream , null , codecContext );
131+
132+ eventMaps .stream ()
133+ .map (eventMap -> eventFactory .eventBuilder (EventBuilder .class ).withData (eventMap ).build ())
134+ .forEach (event -> {
135+ try {
136+ objectUnderTest .writeEvent (event , outputStream );
137+ } catch (IOException e ) {
138+ throw new RuntimeException (e );
139+ }
140+ });
141+
142+ objectUnderTest .complete (outputStream );
143+
144+ final String jsonLinesCombined = new String (outputStream .toByteArray ());
145+
146+ final String [] jsonLines = jsonLinesCombined .split ("\n " );
147+
148+ assertThat (jsonLines .length , equalTo (numberOfEvents ));
149+
150+ for (int i = 0 ; i < numberOfEvents ; i ++) {
151+ final Map <String , Object > eventMap = eventMaps .get (i );
152+ final String jsonLine = jsonLines [i ];
153+ final Map <?, ?> serializedMap = OBJECT_MAPPER .readValue (jsonLine , Map .class );
154+
155+ assertThat (serializedMap , equalTo (eventMap ));
156+ }
157+ }
158+
159+ @ ParameterizedTest
160+ @ ValueSource (ints = {2 , 100 })
161+ void write_multiple_using_writer (final int numberOfEvents ) throws IOException {
162+ final NdjsonOutputCodec objectUnderTest = createObjectUnderTest ();
163+
164+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream ();
165+
166+ final List <Map <String , Object >> eventMaps = IntStream .range (0 , numberOfEvents )
167+ .mapToObj (i -> generateEventMap ())
168+ .collect (Collectors .toList ());
169+ final OutputCodec .Writer writer = objectUnderTest .createWriter (outputStream , null , codecContext );
170+
171+ eventMaps .stream ()
172+ .map (eventMap -> eventFactory .eventBuilder (EventBuilder .class ).withData (eventMap ).build ())
173+ .forEach (event -> {
174+ try {
175+ writer .writeEvent (event );
176+ } catch (IOException e ) {
177+ throw new RuntimeException (e );
178+ }
179+ });
180+
181+ writer .complete ();
182+
183+ final String jsonLinesCombined = new String (outputStream .toByteArray ());
184+
185+ final String [] jsonLines = jsonLinesCombined .split ("\n " );
186+
187+ assertThat (jsonLines .length , equalTo (numberOfEvents ));
188+
189+ for (int i = 0 ; i < numberOfEvents ; i ++) {
190+ final Map <String , Object > eventMap = eventMaps .get (i );
191+ final String jsonLine = jsonLines [i ];
192+ final Map <?, ?> serializedMap = OBJECT_MAPPER .readValue (jsonLine , Map .class );
193+
194+ assertThat (serializedMap , equalTo (eventMap ));
195+ }
196+ }
197+
198+
199+ private static Map <String , Object > generateEventMap () {
200+ final Map <String , Object > jsonObject = new LinkedHashMap <>();
201+ for (int i = 0 ; i < 1 ; i ++) {
202+ jsonObject .put (UUID .randomUUID ().toString (), UUID .randomUUID ().toString ());
203+ }
204+ jsonObject .put (UUID .randomUUID ().toString (), Arrays .asList (UUID .randomUUID ().toString (), UUID .randomUUID ().toString (), UUID .randomUUID ().toString ()));
205+
206+ return jsonObject ;
207+ }
208+
209+ }
0 commit comments