Skip to content

Commit d44d69d

Browse files
authored
refactor: convert provision-http extension to data-plane (#5156)
1 parent 3bc73b7 commit d44d69d

40 files changed

Lines changed: 1128 additions & 251 deletions

File tree

core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/EmbeddedRuntime.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,13 @@ public void shutdown() {
124124
return new TestServiceExtensionContext(monitor, config, serviceMocks);
125125
}
126126

127+
/**
128+
* Utility method that doesn't require the addShutdownHook parameter
129+
*/
130+
public void boot() {
131+
boot(false);
132+
}
133+
127134
@Override
128135
protected @NotNull Monitor createMonitor() {
129136
// disable logs when "quiet" log level is set

core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/AbstractStateEntityManager.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,11 @@ private EntityRetryProcessConfiguration defaultEntityRetryProcessConfiguration()
8282

8383
protected void update(E entity) {
8484
store.save(entity);
85-
monitor.debug(() -> "[%s] %s %s is now in state %s"
85+
var error = entity.getErrorDetail() == null ? "" : ". errorDetail: " + entity.getErrorDetail();
86+
87+
monitor.debug(() -> "[%s] %s %s is now in state %s%s"
8688
.formatted(this.getClass().getSimpleName(), entity.getClass().getSimpleName(),
87-
entity.getId(), entity.stateAsString()));
89+
entity.getId(), entity.stateAsString(), error));
8890
}
8991

9092
protected void breakLease(E entity) {

core/control-plane/control-plane-transfer-manager/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,8 @@ private boolean processInitial(TransferProcess process) {
224224
update(process);
225225
return true;
226226
}
227+
} else {
228+
monitor.warning("control-plane provisioning has been deprecated, please convert your provision extensions to the data-plane model and deploy them there.");
227229
}
228230

229231
process.transitionProvisioning(manifest);
@@ -247,6 +249,9 @@ private boolean processInitial(TransferProcess process) {
247249
}
248250

249251
var manifest = manifestGenerator.generateProviderResourceManifest(process, dataAddress, policy);
252+
if (!manifest.empty()) {
253+
monitor.warning("control-plane provisioning has been deprecated, please convert your provision extensions to the data-plane model and deploy them there.");
254+
}
250255
process.transitionProvisioning(manifest);
251256
}
252257

core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.eclipse.edc.spi.response.StatusResult;
3434
import org.eclipse.edc.spi.result.Result;
3535
import org.eclipse.edc.spi.result.ServiceFailure;
36+
import org.eclipse.edc.spi.result.ServiceResult;
3637
import org.eclipse.edc.spi.result.StoreFailure;
3738
import org.eclipse.edc.spi.types.domain.DataAddress;
3839
import org.eclipse.edc.spi.types.domain.transfer.DataFlowProvisionMessage;
@@ -100,10 +101,16 @@ public Result<Void> validate(DataFlowStartMessage dataRequest) {
100101
return success();
101102
} else {
102103
var transferService = transferServiceRegistry.resolveTransferService(dataRequest);
103-
return transferService != null ?
104-
transferService.validate(dataRequest) :
105-
Result.failure(format("Cannot find a transfer Service that can handle %s source and %s destination",
106-
dataRequest.getSourceDataAddress().getType(), dataRequest.getDestinationDataAddress().getType()));
104+
if (transferService != null) {
105+
return transferService.validate(dataRequest);
106+
}
107+
108+
if (resourceDefinitionGeneratorManager.sourceTypes().contains(dataRequest.getSourceDataAddress().getType())) {
109+
return Result.success();
110+
}
111+
112+
return Result.failure(format("Cannot find a transfer Service that can handle %s source and %s destination",
113+
dataRequest.getSourceDataAddress().getType(), dataRequest.getDestinationDataAddress().getType()));
107114
}
108115
}
109116

@@ -198,7 +205,11 @@ public StatusResult<Void> terminate(String dataFlowId, @Nullable String reason)
198205
}
199206
return stop(dataFlow, reason)
200207
.onSuccess(flow -> {
201-
flow.transitToTerminated(reason);
208+
if (flow.resourcesToBeDeprovisioned().isEmpty()) {
209+
flow.transitToTerminated(reason);
210+
} else {
211+
flow.transitionToDeprovisioning();
212+
}
202213
update(dataFlow);
203214
})
204215
.mapEmpty();
@@ -223,9 +234,9 @@ public StatusResult<Void> restartFlows() {
223234
}
224235

225236
@Override
226-
public StatusResult<Void> resourceProvisioned(ProvisionedResource provisionedResource) {
237+
public ServiceResult<Void> resourceProvisioned(ProvisionedResource provisionedResource) {
227238
return store.findByIdAndLease(provisionedResource.getFlowId())
228-
.flatMap(StatusResult::from)
239+
.flatMap(ServiceResult::from)
229240
.onSuccess(flow -> {
230241
flow.resourceProvisioned(List.of(provisionedResource));
231242
update(flow);
@@ -234,9 +245,9 @@ public StatusResult<Void> resourceProvisioned(ProvisionedResource provisionedRes
234245
}
235246

236247
@Override
237-
public StatusResult<Void> resourceDeprovisioned(DeprovisionedResource deprovisionedResource) {
248+
public ServiceResult<Void> resourceDeprovisioned(DeprovisionedResource deprovisionedResource) {
238249
return store.findByIdAndLease(deprovisionedResource.getFlowId())
239-
.flatMap(StatusResult::from)
250+
.flatMap(ServiceResult::from)
240251
.onSuccess(flow -> {
241252
flow.resourceDeprovisioned(List.of(deprovisionedResource));
242253
update(flow);
@@ -376,17 +387,20 @@ private boolean processProvisioning(DataFlow dataFlow) {
376387

377388
if (provisionedResources.size() != results.size()) {
378389
var failureDetail = results.stream().filter(StatusResult::failed).map(StatusResult::getFailureDetail).collect(joining(","));
390+
monitor.warning("Failed to provision flow " + flow.getId() + ": " + failureDetail);
379391
flow.setErrorDetail(failureDetail);
380392
flow.transitionToProvisioning();
381393
}
382394

383395
update(flow);
384396
})
385397
.onFailure((flow, t) -> {
398+
monitor.warning("Failed to provision flow " + flow.getId(), t);
386399
flow.transitionToProvisioning();
387400
update(flow);
388401
})
389402
.onFinalFailure((flow, e) -> {
403+
monitor.severe("Cannot provision flow " + flow.getId(), e);
390404
flow.transitToFailed("Cannot provision: " + e.getMessage());
391405
update(flow);
392406
})

core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/provision/ResourceDefinitionGeneratorManagerImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,17 @@ public List<ProvisionResource> generateConsumerResourceDefinition(DataFlow dataF
5353
@Override
5454
public List<ProvisionResource> generateProviderResourceDefinition(DataFlow dataFlow) {
5555
return providerGenerators.stream()
56-
.filter(g -> g.supportedType().equals(dataFlow.getDestination().getType()))
56+
.filter(g -> g.supportedType().equals(dataFlow.getSource().getType()))
5757
.map(g -> g.generate(dataFlow))
5858
.filter(Objects::nonNull)
5959
.toList();
6060
}
6161

62+
@Override
63+
public Set<String> sourceTypes() {
64+
return providerGenerators.stream().map(ResourceDefinitionGenerator::supportedType).collect(toSet());
65+
}
66+
6267
@Override
6368
public Set<String> destinationTypes() {
6469
return consumerGenerators.stream().map(ResourceDefinitionGenerator::supportedType).collect(toSet());

0 commit comments

Comments
 (0)