1515package org .eclipse .edc .samples .transfer .streaming ;
1616
1717import com .fasterxml .jackson .core .JsonProcessingException ;
18+ import com .fasterxml .jackson .databind .JsonNode ;
1819import com .fasterxml .jackson .databind .ObjectMapper ;
1920import okhttp3 .mockwebserver .MockWebServer ;
2021import org .apache .kafka .clients .CommonClientConfigs ;
4243import org .eclipse .edc .junit .extensions .EmbeddedRuntime ;
4344import org .eclipse .edc .junit .extensions .RuntimeExtension ;
4445import org .eclipse .edc .junit .extensions .RuntimePerClassExtension ;
46+ import org .eclipse .edc .spi .types .domain .DataAddress ;
4547import org .eclipse .edc .spi .types .domain .edr .EndpointDataReference ;
4648import org .eclipse .edc .util .io .Ports ;
4749import org .jetbrains .annotations .NotNull ;
7072import static java .util .concurrent .TimeUnit .SECONDS ;
7173import static org .assertj .core .api .Assertions .assertThat ;
7274import static org .awaitility .Awaitility .await ;
75+ import static org .eclipse .edc .connector .controlplane .contract .spi .types .negotiation .ContractNegotiationStates .FINALIZED ;
7376import static org .eclipse .edc .connector .controlplane .transfer .spi .types .TransferProcessStates .STARTED ;
7477import static org .eclipse .edc .samples .common .FileTransferCommon .getFileContentFromRelativePath ;
7578import static org .eclipse .edc .samples .common .FileTransferCommon .getFileFromRelativePath ;
@@ -144,8 +147,8 @@ void streamData() throws InterruptedException, JsonProcessingException {
144147 var contractNegotiationId = CONSUMER .negotiateContract (negotiateContractBody );
145148
146149 await ().atMost (TIMEOUT ).untilAsserted (() -> {
147- var contractAgreementId = CONSUMER .getContractAgreementId (contractNegotiationId );
148- assertThat (contractAgreementId ). isNotNull ( );
150+ var state = CONSUMER .getContractNegotiationState (contractNegotiationId );
151+ assertThat (state ). isEqualTo ( FINALIZED . name () );
149152 });
150153
151154 var contractAgreementId = CONSUMER .getContractAgreementId (contractNegotiationId );
@@ -161,19 +164,18 @@ void streamData() throws InterruptedException, JsonProcessingException {
161164 assertThat (state ).isEqualTo (STARTED .name ());
162165 });
163166
164- await ().atMost (TIMEOUT ).untilAsserted (() -> {
165- var state = CONSUMER .getTransferProcessState (transferProcessId );
166- assertThat (state ).isEqualTo (STARTED .name ());
167- });
168-
169167 var producer = createKafkaProducer ();
170168 var message = "message" ;
171169 Executors .newSingleThreadScheduledExecutor ().scheduleAtFixedRate (() -> producer
172170 .send (new ProducerRecord <>(TOPIC , "key" , message )), 0L , 100L , MICROSECONDS );
173171
174172 var endpointDataReference = readEndpointDataReference ();
175173
176- try (var clientConsumer = createKafkaConsumer (endpointDataReference .getEndpoint (), endpointDataReference .getAuthKey (), endpointDataReference .getAuthCode ())) {
174+ var endpoint = endpointDataReference .getStringProperty ("endpoint" );
175+ var authKey = endpointDataReference .getStringProperty ("authKey" );
176+ var authCode = endpointDataReference .getStringProperty ("authCode" );
177+
178+ try (var clientConsumer = createKafkaConsumer (endpoint , authKey , authCode )) {
177179 clientConsumer .subscribe (List .of (endpointDataReference .getProperties ().get (EDC_NAMESPACE + "topic" ).toString ()));
178180
179181 await ().atMost (TIMEOUT ).untilAsserted (() -> {
@@ -186,10 +188,11 @@ void streamData() throws InterruptedException, JsonProcessingException {
186188 producer .close ();
187189 }
188190
189- private EndpointDataReference readEndpointDataReference () throws InterruptedException , JsonProcessingException {
191+ private DataAddress readEndpointDataReference () throws InterruptedException , JsonProcessingException {
190192 var request = edrReceiverServer .takeRequest (TIMEOUT .getSeconds (), SECONDS );
191193 var body = request .getBody ().readString (Charset .defaultCharset ());
192- return new ObjectMapper ().readValue (body , EndpointDataReference .class );
194+ var jsonNode = new ObjectMapper ().readTree (body ).get ("payload" ).get ("dataAddress" );
195+ return new ObjectMapper ().convertValue (jsonNode , DataAddress .class );
193196 }
194197
195198 private AclBinding userCanAccess (String principal , ResourceType resourceType , String resourceName ) {
0 commit comments