4242import org .eclipse .edc .junit .extensions .EmbeddedRuntime ;
4343import org .eclipse .edc .junit .extensions .RuntimeExtension ;
4444import org .eclipse .edc .junit .extensions .RuntimePerClassExtension ;
45- import org .eclipse .edc .spi .types .domain .edr . EndpointDataReference ;
45+ import org .eclipse .edc .spi .types .domain .DataAddress ;
4646import org .eclipse .edc .util .io .Ports ;
4747import org .jetbrains .annotations .NotNull ;
4848import org .jetbrains .annotations .Nullable ;
7070import static java .util .concurrent .TimeUnit .SECONDS ;
7171import static org .assertj .core .api .Assertions .assertThat ;
7272import static org .awaitility .Awaitility .await ;
73+ import static org .eclipse .edc .connector .controlplane .contract .spi .types .negotiation .ContractNegotiationStates .FINALIZED ;
7374import static org .eclipse .edc .connector .controlplane .transfer .spi .types .TransferProcessStates .STARTED ;
7475import static org .eclipse .edc .samples .common .FileTransferCommon .getFileContentFromRelativePath ;
7576import static org .eclipse .edc .samples .common .FileTransferCommon .getFileFromRelativePath ;
@@ -144,8 +145,8 @@ void streamData() throws InterruptedException, JsonProcessingException {
144145 var contractNegotiationId = CONSUMER .negotiateContract (negotiateContractBody );
145146
146147 await ().atMost (TIMEOUT ).untilAsserted (() -> {
147- var contractAgreementId = CONSUMER .getContractAgreementId (contractNegotiationId );
148- assertThat (contractAgreementId ). isNotNull ( );
148+ var state = CONSUMER .getContractNegotiationState (contractNegotiationId );
149+ assertThat (state ). isEqualTo ( FINALIZED . name () );
149150 });
150151
151152 var contractAgreementId = CONSUMER .getContractAgreementId (contractNegotiationId );
@@ -161,19 +162,18 @@ void streamData() throws InterruptedException, JsonProcessingException {
161162 assertThat (state ).isEqualTo (STARTED .name ());
162163 });
163164
164- await ().atMost (TIMEOUT ).untilAsserted (() -> {
165- var state = CONSUMER .getTransferProcessState (transferProcessId );
166- assertThat (state ).isEqualTo (STARTED .name ());
167- });
168-
169165 var producer = createKafkaProducer ();
170166 var message = "message" ;
171167 Executors .newSingleThreadScheduledExecutor ().scheduleAtFixedRate (() -> producer
172168 .send (new ProducerRecord <>(TOPIC , "key" , message )), 0L , 100L , MICROSECONDS );
173169
174170 var endpointDataReference = readEndpointDataReference ();
175171
176- try (var clientConsumer = createKafkaConsumer (endpointDataReference .getEndpoint (), endpointDataReference .getAuthKey (), endpointDataReference .getAuthCode ())) {
172+ var endpoint = endpointDataReference .getStringProperty ("endpoint" );
173+ var authKey = endpointDataReference .getStringProperty ("authKey" );
174+ var authCode = endpointDataReference .getStringProperty ("authCode" );
175+
176+ try (var clientConsumer = createKafkaConsumer (endpoint , authKey , authCode )) {
177177 clientConsumer .subscribe (List .of (endpointDataReference .getProperties ().get (EDC_NAMESPACE + "topic" ).toString ()));
178178
179179 await ().atMost (TIMEOUT ).untilAsserted (() -> {
@@ -186,10 +186,11 @@ void streamData() throws InterruptedException, JsonProcessingException {
186186 producer .close ();
187187 }
188188
189- private EndpointDataReference readEndpointDataReference () throws InterruptedException , JsonProcessingException {
189+ private DataAddress readEndpointDataReference () throws InterruptedException , JsonProcessingException {
190190 var request = edrReceiverServer .takeRequest (TIMEOUT .getSeconds (), SECONDS );
191191 var body = request .getBody ().readString (Charset .defaultCharset ());
192- return new ObjectMapper ().readValue (body , EndpointDataReference .class );
192+ var jsonNode = new ObjectMapper ().readTree (body ).get ("payload" ).get ("dataAddress" );
193+ return new ObjectMapper ().convertValue (jsonNode , DataAddress .class );
193194 }
194195
195196 private AclBinding userCanAccess (String principal , ResourceType resourceType , String resourceName ) {
0 commit comments