diff --git a/server/ee/apps/scheduler-app/build.gradle.kts b/server/ee/apps/scheduler-app/build.gradle.kts index 7c60bb69936..bbfc4d0a1e7 100644 --- a/server/ee/apps/scheduler-app/build.gradle.kts +++ b/server/ee/apps/scheduler-app/build.gradle.kts @@ -31,6 +31,7 @@ dependencies { implementation(project(":server:ee:libs:config:observability-config")) implementation(project(":server:ee:libs:core:discovery:discovery-redis")) implementation(project(":server:ee:libs:platform:platform-component:platform-component-remote-client")) + implementation(project(":server:ee:libs:platform:platform-connection:platform-connection-remote-client")) implementation(project(":server:ee:libs:platform:platform-scheduler:platform-scheduler-impl")) implementation(project(":server:ee:libs:platform:platform-scheduler:platform-scheduler-remote-rest")) implementation(project(":server:ee:libs:platform:platform-workflow:platform-workflow-execution:platform-workflow-execution-remote-client")) diff --git a/server/ee/libs/platform/platform-component/platform-component-remote-client/src/main/java/com/bytechef/ee/platform/component/remote/client/facade/RemoteConnectionDefinitionFacadeClient.java b/server/ee/libs/platform/platform-component/platform-component-remote-client/src/main/java/com/bytechef/ee/platform/component/remote/client/facade/RemoteConnectionDefinitionFacadeClient.java deleted file mode 100644 index df8ff8681cb..00000000000 --- a/server/ee/libs/platform/platform-component/platform-component-remote-client/src/main/java/com/bytechef/ee/platform/component/remote/client/facade/RemoteConnectionDefinitionFacadeClient.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2025 ByteChef - * - * Licensed under the ByteChef Enterprise license (the "Enterprise License"); - * you may not use this file except in compliance with the Enterprise License. - */ - -package com.bytechef.ee.platform.component.remote.client.facade; - -import com.bytechef.platform.component.ComponentConnection; -import com.bytechef.platform.component.facade.ConnectionDefinitionFacade; -import org.springframework.stereotype.Component; - -/** - * @version ee - * - * @author Ivica Cardic - */ -@Component -public class RemoteConnectionDefinitionFacadeClient implements ConnectionDefinitionFacade { - - @Override - public ComponentConnection executeConnectionRefresh(Long connectionId) { - throw new UnsupportedOperationException(); - } -} diff --git a/server/ee/libs/platform/platform-component/platform-component-remote-client/src/main/java/com/bytechef/ee/platform/component/remote/client/service/RemoteConnectionDefinitionServiceClient.java b/server/ee/libs/platform/platform-component/platform-component-remote-client/src/main/java/com/bytechef/ee/platform/component/remote/client/service/RemoteConnectionDefinitionServiceClient.java index 4972bc17562..bff526f75a2 100644 --- a/server/ee/libs/platform/platform-component/platform-component-remote-client/src/main/java/com/bytechef/ee/platform/component/remote/client/service/RemoteConnectionDefinitionServiceClient.java +++ b/server/ee/libs/platform/platform-component/platform-component-remote-client/src/main/java/com/bytechef/ee/platform/component/remote/client/service/RemoteConnectionDefinitionServiceClient.java @@ -78,7 +78,7 @@ public Optional executeBaseUri( } @Override - public Context createConnectionRefreshContext(String componentName, ComponentConnection componentConnection) { + public ComponentConnection executeConnectionRefresh(ComponentConnection componentConnection) { throw new UnsupportedOperationException(); } diff --git a/server/ee/libs/platform/platform-connection/platform-connection-remote-client/src/main/java/com/bytechef/ee/platform/connection/remote/client/fasade/RemoteConnectionFacadeClient.java b/server/ee/libs/platform/platform-connection/platform-connection-remote-client/src/main/java/com/bytechef/ee/platform/connection/remote/client/fasade/RemoteConnectionFacadeClient.java index b3816d596c3..0b73ce86a2c 100644 --- a/server/ee/libs/platform/platform-connection/platform-connection-remote-client/src/main/java/com/bytechef/ee/platform/connection/remote/client/fasade/RemoteConnectionFacadeClient.java +++ b/server/ee/libs/platform/platform-connection/platform-connection-remote-client/src/main/java/com/bytechef/ee/platform/connection/remote/client/fasade/RemoteConnectionFacadeClient.java @@ -32,6 +32,11 @@ public void delete(Long id) { throw new UnsupportedOperationException(); } + @Override + public Long executeConnectionRefresh(Long connectionId) { + throw new UnsupportedOperationException(); + } + @Override public ConnectionDTO getConnection(Long id) { throw new UnsupportedOperationException(); diff --git a/server/ee/libs/platform/platform-scheduler/platform-scheduler-aws/build.gradle.kts b/server/ee/libs/platform/platform-scheduler/platform-scheduler-aws/build.gradle.kts index 2ad7f67730a..feea5290f8f 100644 --- a/server/ee/libs/platform/platform-scheduler/platform-scheduler-aws/build.gradle.kts +++ b/server/ee/libs/platform/platform-scheduler/platform-scheduler-aws/build.gradle.kts @@ -5,9 +5,11 @@ dependencies { implementation("io.awspring.cloud:spring-cloud-aws-sqs") implementation(project(":server:libs:config:app-config")) implementation(project(":server:libs:core:commons:commons-util")) + implementation(project(":server:libs:core:tenant:tenant-api")) implementation(project(":server:libs:platform:platform-scheduler:platform-scheduler-api")) implementation(project(":server:libs:platform:platform-workflow:platform-workflow-coordinator:platform-workflow-coordinator-api")) implementation(project(":server:libs:platform:platform-api")) + implementation(project(":server:libs:platform:platform-connection:platform-connection-api")) implementation(project(":server:ee:libs:core:cloud:cloud-aws")) implementation(project(":server:ee:libs:core:message:message-broker:message-broker-aws")) diff --git a/server/ee/libs/platform/platform-scheduler/platform-scheduler-aws/src/main/java/com/bytechef/ee/platform/scheduler/aws/AwsConnectionRefreshScheduler.java b/server/ee/libs/platform/platform-scheduler/platform-scheduler-aws/src/main/java/com/bytechef/ee/platform/scheduler/aws/AwsConnectionRefreshScheduler.java new file mode 100644 index 00000000000..fa8780e8ec4 --- /dev/null +++ b/server/ee/libs/platform/platform-scheduler/platform-scheduler-aws/src/main/java/com/bytechef/ee/platform/scheduler/aws/AwsConnectionRefreshScheduler.java @@ -0,0 +1,115 @@ +/* + * Copyright 2025 ByteChef + * + * Licensed under the ByteChef Enterprise license (the "Enterprise License"); + * you may not use this file except in compliance with the Enterprise License. + */ + +package com.bytechef.ee.platform.scheduler.aws; + +import static com.bytechef.ee.platform.scheduler.aws.constant.AwsConnectionRefreshSchedulerConstants.CONNECTION_REFRESH; +import static com.bytechef.ee.platform.scheduler.aws.constant.AwsConnectionRefreshSchedulerConstants.SCHEDULER_CONNECTION_REFRESH_QUEUE; + +import com.bytechef.config.ApplicationProperties; +import com.bytechef.ee.platform.scheduler.aws.constant.AwsTriggerSchedulerConstants; +import com.bytechef.platform.scheduler.ConnectionRefreshScheduler; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.time.Duration; +import java.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.scheduler.SchedulerClient; +import software.amazon.awssdk.services.scheduler.model.ConflictException; +import software.amazon.awssdk.services.scheduler.model.FlexibleTimeWindowMode; +import software.amazon.awssdk.services.scheduler.model.Target; + +/** + * @version ee + * + * @author Nikolina Spehar + */ +public class AwsConnectionRefreshScheduler implements ConnectionRefreshScheduler { + + private static final Logger log = LoggerFactory.getLogger(AwsConnectionRefreshScheduler.class); + + private final SchedulerClient schedulerClient; + private final String sqsArn; + private final String roleArn; + + @SuppressFBWarnings("EI") + public AwsConnectionRefreshScheduler(ApplicationProperties.Cloud.Aws aws, SchedulerClient schedulerClient) { + this.schedulerClient = schedulerClient; + + String accountId = aws.getAccountId(); + + this.sqsArn = "arn:aws:sqs:" + aws.getRegion() + ":" + accountId; + this.roleArn = "arn:aws:iam::" + accountId + ":role/schedule-role"; + } + + @Override + public void cancelConnectionRefresh(Long connectionId, String tenantId) { + try { + schedulerClient.deleteSchedule(request -> request.clientToken(tenantId + connectionId) + .groupName(CONNECTION_REFRESH) + .name(CONNECTION_REFRESH + tenantId + connectionId)); + } catch (RuntimeException e) { + if (log.isDebugEnabled()) { + log.debug("Dynamic Webhook Trigger Refresh not defined"); + } + } + } + + @Override + public void scheduleConnectionRefresh(Long connectionId, Instant expiry, String tenantId) { + String scheduleName = CONNECTION_REFRESH + tenantId + connectionId; + String clientToken = tenantId + connectionId; + + Instant now = Instant.now(); + + Duration between = Duration.between(now, expiry); + + long secondsUntilExpiry = between.getSeconds(); + + long minutesUntilExpiry = secondsUntilExpiry / 60; + + long refreshMinutes = Math.max(1, minutesUntilExpiry - 5); + + String scheduleExpression = "rate(" + refreshMinutes + " minutes)"; + + log.info( + "Scheduling connection refresh for connection: {}, tenant: {}, expiry time: {}, refresh time: {}, " + + "schedule expression: {}, SQS ARN: {}, role ARN: {}", + connectionId, tenantId, expiry, refreshMinutes, scheduleExpression, + sqsArn + ":" + SCHEDULER_CONNECTION_REFRESH_QUEUE, roleArn); + + Target sqsTarget = Target.builder() + .roleArn(roleArn) + .arn(sqsArn + ":" + SCHEDULER_CONNECTION_REFRESH_QUEUE) + .input(tenantId + AwsTriggerSchedulerConstants.SPLITTER + connectionId) + .build(); + + try { + schedulerClient.createSchedule(request -> request.clientToken(clientToken) + .groupName(CONNECTION_REFRESH) + .name(scheduleName) + .scheduleExpression(scheduleExpression) + .target(sqsTarget) + .flexibleTimeWindow(mode -> mode.mode(FlexibleTimeWindowMode.OFF)) + .startDate(now.plus(Duration.ofMinutes(1)))); + + log.info("Schedule created successfully."); + } catch (ConflictException e) { + log.info("Schedule already exists, updating..."); + + schedulerClient.updateSchedule(request -> request.clientToken(clientToken) + .groupName(CONNECTION_REFRESH) + .name(scheduleName) + .scheduleExpression(scheduleExpression) + .target(sqsTarget) + .flexibleTimeWindow(mode -> mode.mode(FlexibleTimeWindowMode.OFF)) + .startDate(now.plus(Duration.ofMinutes(1)))); + + log.info("Schedule updated successfully."); + } + } +} diff --git a/server/ee/libs/platform/platform-scheduler/platform-scheduler-aws/src/main/java/com/bytechef/ee/platform/scheduler/aws/config/AwsConnectionRefreshSchedulerConfiguration.java b/server/ee/libs/platform/platform-scheduler/platform-scheduler-aws/src/main/java/com/bytechef/ee/platform/scheduler/aws/config/AwsConnectionRefreshSchedulerConfiguration.java new file mode 100644 index 00000000000..92e1f7b5ff6 --- /dev/null +++ b/server/ee/libs/platform/platform-scheduler/platform-scheduler-aws/src/main/java/com/bytechef/ee/platform/scheduler/aws/config/AwsConnectionRefreshSchedulerConfiguration.java @@ -0,0 +1,73 @@ +/* + * Copyright 2025 ByteChef + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytechef.ee.platform.scheduler.aws.config; + +import com.bytechef.config.ApplicationProperties; +import com.bytechef.ee.platform.scheduler.aws.AwsConnectionRefreshScheduler; +import com.bytechef.ee.platform.scheduler.aws.listener.ConnectionRefreshListener; +import com.bytechef.platform.annotation.ConditionalOnEEVersion; +import com.bytechef.platform.connection.facade.ConnectionFacade; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.regions.providers.AwsRegionProvider; +import software.amazon.awssdk.services.scheduler.SchedulerClient; + +/** + * @author Nikolina Spehar + */ +@Configuration +@ConditionalOnProperty( + prefix = "bytechef", name = "coordinator.connection.scheduler.provider", havingValue = "aws") +@ConditionalOnEEVersion +public class AwsConnectionRefreshSchedulerConfiguration { + + private static final Logger log = LoggerFactory.getLogger(AwsConnectionRefreshSchedulerConfiguration.class); + + private final ApplicationProperties applicationProperties; + + AwsConnectionRefreshSchedulerConfiguration(ApplicationProperties applicationProperties) { + if (log.isInfoEnabled()) { + log.info("Connection refresh scheduler provider type enabled: aws"); + } + + this.applicationProperties = applicationProperties; + } + + @Bean + AwsConnectionRefreshScheduler awsConnectionRefreshScheduler( + AwsCredentialsProvider awsCredentialsProvider, AwsRegionProvider awsRegionProvider) { + + ApplicationProperties.Cloud.Aws aws = applicationProperties.getCloud() + .getAws(); + + SchedulerClient schedulerClient = SchedulerClient.builder() + .credentialsProvider(awsCredentialsProvider) + .region(awsRegionProvider.getRegion()) + .build(); + + return new AwsConnectionRefreshScheduler(aws, schedulerClient); + } + + @Bean + ConnectionRefreshListener connectionRefreshListener(ConnectionFacade connectionFacade) { + return new ConnectionRefreshListener(connectionFacade); + } +} diff --git a/server/ee/libs/platform/platform-scheduler/platform-scheduler-aws/src/main/java/com/bytechef/ee/platform/scheduler/aws/constant/AwsConnectionRefreshSchedulerConstants.java b/server/ee/libs/platform/platform-scheduler/platform-scheduler-aws/src/main/java/com/bytechef/ee/platform/scheduler/aws/constant/AwsConnectionRefreshSchedulerConstants.java new file mode 100644 index 00000000000..ebefa86b959 --- /dev/null +++ b/server/ee/libs/platform/platform-scheduler/platform-scheduler-aws/src/main/java/com/bytechef/ee/platform/scheduler/aws/constant/AwsConnectionRefreshSchedulerConstants.java @@ -0,0 +1,19 @@ +/* + * Copyright 2025 ByteChef + * + * Licensed under the ByteChef Enterprise license (the "Enterprise License"); + * you may not use this file except in compliance with the Enterprise License. + */ + +package com.bytechef.ee.platform.scheduler.aws.constant; + +/** + * @version ee + * + * @author Nikolina Spehar + */ +public class AwsConnectionRefreshSchedulerConstants { + + public static final String CONNECTION_REFRESH = "ConnectionRefresh"; + public static final String SCHEDULER_CONNECTION_REFRESH_QUEUE = "scheduler_connection_refresh_queue"; +} diff --git a/server/ee/libs/platform/platform-scheduler/platform-scheduler-aws/src/main/java/com/bytechef/ee/platform/scheduler/aws/listener/ConnectionRefreshListener.java b/server/ee/libs/platform/platform-scheduler/platform-scheduler-aws/src/main/java/com/bytechef/ee/platform/scheduler/aws/listener/ConnectionRefreshListener.java new file mode 100644 index 00000000000..066ea0e9eb5 --- /dev/null +++ b/server/ee/libs/platform/platform-scheduler/platform-scheduler-aws/src/main/java/com/bytechef/ee/platform/scheduler/aws/listener/ConnectionRefreshListener.java @@ -0,0 +1,40 @@ +/* + * Copyright 2025 ByteChef + * + * Licensed under the ByteChef Enterprise license (the "Enterprise License"); + * you may not use this file except in compliance with the Enterprise License. + */ + +package com.bytechef.ee.platform.scheduler.aws.listener; + +import static com.bytechef.ee.platform.scheduler.aws.constant.AwsConnectionRefreshSchedulerConstants.SCHEDULER_CONNECTION_REFRESH_QUEUE; +import static com.bytechef.ee.platform.scheduler.aws.constant.AwsTriggerSchedulerConstants.SPLITTER; + +import com.bytechef.platform.connection.facade.ConnectionFacade; +import com.bytechef.tenant.TenantContext; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.awspring.cloud.sqs.annotation.SqsListener; + +/** + * @version ee + * + * @author Nikolina Spehar + */ +public class ConnectionRefreshListener { + + private final ConnectionFacade connectionFacade; + + @SuppressFBWarnings("EI") + public ConnectionRefreshListener(ConnectionFacade connectionFacade) { + this.connectionFacade = connectionFacade; + } + + @SqsListener(SCHEDULER_CONNECTION_REFRESH_QUEUE) + public void onSchedule(String message) { + String[] split = message.split(SPLITTER); + String tenantId = split[0]; + Long connectionId = Long.valueOf(split[1]); + + TenantContext.runWithTenantId(tenantId, () -> connectionFacade.executeConnectionRefresh(connectionId)); + } +} diff --git a/server/ee/libs/platform/platform-scheduler/platform-scheduler-remote-client/src/main/java/com/bytechef/ee/platform/scheduler/remote/client/RemoteConnectionRefreshSchedulerClient.java b/server/ee/libs/platform/platform-scheduler/platform-scheduler-remote-client/src/main/java/com/bytechef/ee/platform/scheduler/remote/client/RemoteConnectionRefreshSchedulerClient.java index 07785652df7..ee8dbb87468 100644 --- a/server/ee/libs/platform/platform-scheduler/platform-scheduler-remote-client/src/main/java/com/bytechef/ee/platform/scheduler/remote/client/RemoteConnectionRefreshSchedulerClient.java +++ b/server/ee/libs/platform/platform-scheduler/platform-scheduler-remote-client/src/main/java/com/bytechef/ee/platform/scheduler/remote/client/RemoteConnectionRefreshSchedulerClient.java @@ -32,25 +32,25 @@ public RemoteConnectionRefreshSchedulerClient(LoadBalancedRestClient loadBalance } @Override - public void cancelConnectionRefresh(Long connectionId) { + public void cancelConnectionRefresh(Long connectionId, String tenantId) { loadBalancedRestClient.post( uriBuilder -> uriBuilder .host(SCHEDULER_APP) .path(CONNECTION_REFRESH_SCHEDULER + "/cancel-connection-refresh") .build(), - connectionId); + new ScheduleConnectionRefreshRequest(connectionId, Instant.now(), tenantId)); } @Override - public void scheduleConnectionRefresh(Long connectionId, Instant expiry) { + public void scheduleConnectionRefresh(Long connectionId, Instant expiry, String tenantId) { loadBalancedRestClient.post( uriBuilder -> uriBuilder .host(SCHEDULER_APP) .path(CONNECTION_REFRESH_SCHEDULER + "/schedule-connection-refresh") .build(), - new ScheduleConnectionRefreshRequest(connectionId, expiry)); + new ScheduleConnectionRefreshRequest(connectionId, expiry, tenantId)); } - private record ScheduleConnectionRefreshRequest(Long connectionId, Instant expiry) { + private record ScheduleConnectionRefreshRequest(Long connectionId, Instant expiry, String tenantId) { } } diff --git a/server/ee/libs/platform/platform-scheduler/platform-scheduler-remote-rest/src/main/java/com/bytechef/ee/platform/scheduler/remote/web/rest/RemoteConnectionRefreshSchedulerController.java b/server/ee/libs/platform/platform-scheduler/platform-scheduler-remote-rest/src/main/java/com/bytechef/ee/platform/scheduler/remote/web/rest/RemoteConnectionRefreshSchedulerController.java new file mode 100644 index 00000000000..1f070bdc1fb --- /dev/null +++ b/server/ee/libs/platform/platform-scheduler/platform-scheduler-remote-rest/src/main/java/com/bytechef/ee/platform/scheduler/remote/web/rest/RemoteConnectionRefreshSchedulerController.java @@ -0,0 +1,71 @@ +/* + * Copyright 2025 ByteChef + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytechef.ee.platform.scheduler.remote.web.rest; + +import com.bytechef.platform.scheduler.ConnectionRefreshScheduler; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import jakarta.validation.Valid; +import java.time.Instant; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +/** + * @author Nikolina Spehar + */ +@RestController +@RequestMapping("/remote/connection-refresh-scheduler") +public class RemoteConnectionRefreshSchedulerController { + + private final ConnectionRefreshScheduler connectionRefreshScheduler; + + public RemoteConnectionRefreshSchedulerController(ConnectionRefreshScheduler connectionRefreshScheduler) { + this.connectionRefreshScheduler = connectionRefreshScheduler; + } + + @RequestMapping( + method = RequestMethod.POST, + value = "/cancel-connection-refresh", + consumes = { + "application/json" + }) + void cancelConnectionRefresh( + @Valid @RequestBody ScheduleConnectionRefreshRequest scheduleConnectionRefreshRequest) { + connectionRefreshScheduler.cancelConnectionRefresh( + scheduleConnectionRefreshRequest.connectionId, scheduleConnectionRefreshRequest.tenantId); + } + + @RequestMapping( + method = RequestMethod.POST, + value = "/schedule-connection-refresh", + consumes = { + "application/json" + }) + void scheduleConnectionRefresh( + @Valid @RequestBody ScheduleConnectionRefreshRequest scheduleConnectionRefreshRequest) { + connectionRefreshScheduler.scheduleConnectionRefresh( + scheduleConnectionRefreshRequest.connectionId, + scheduleConnectionRefreshRequest.expiry, + scheduleConnectionRefreshRequest.tenantId); + } + + @SuppressFBWarnings("EI") + private record ScheduleConnectionRefreshRequest(Long connectionId, Instant expiry, String tenantId) { + } + +} diff --git a/server/ee/libs/platform/platform-workflow/platform-workflow-execution/platform-workflow-execution-remote-client/src/main/java/com/bytechef/ee/platform/workflow/execution/remote/client/facade/RemoteConnectionLifecycleFacadeClient.java b/server/ee/libs/platform/platform-workflow/platform-workflow-execution/platform-workflow-execution-remote-client/src/main/java/com/bytechef/ee/platform/workflow/execution/remote/client/facade/RemoteConnectionLifecycleFacadeClient.java index ef478d30fcf..8c964970e32 100644 --- a/server/ee/libs/platform/platform-workflow/platform-workflow-execution/platform-workflow-execution-remote-client/src/main/java/com/bytechef/ee/platform/workflow/execution/remote/client/facade/RemoteConnectionLifecycleFacadeClient.java +++ b/server/ee/libs/platform/platform-workflow/platform-workflow-execution/platform-workflow-execution-remote-client/src/main/java/com/bytechef/ee/platform/workflow/execution/remote/client/facade/RemoteConnectionLifecycleFacadeClient.java @@ -34,31 +34,34 @@ public RemoteConnectionLifecycleFacadeClient(LoadBalancedRestClient loadBalanced @Override public void scheduleConnectionRefresh( - Long connectionId, Map parameters, AuthorizationType authorizationType) { + Long connectionId, Map parameters, AuthorizationType authorizationType, String tenantId) { loadBalancedRestClient.post( uriBuilder -> uriBuilder .host(EXECUTION_APP) .path(CONNECTION_LIFECYCLE_FACADE + "/schedule-connection-refresh") .build(), - new ScheduleConnectionRefreshRequest(connectionId, parameters, authorizationType)); + new ScheduleConnectionRefreshRequest(connectionId, parameters, authorizationType, tenantId)); } @Override - public void deleteScheduledConnectionRefresh(Long connectionId, AuthorizationType authorizationType) { + public void deleteScheduledConnectionRefresh( + Long connectionId, AuthorizationType authorizationType, String tenantId) { + loadBalancedRestClient.post( uriBuilder -> uriBuilder .host(EXECUTION_APP) .path(CONNECTION_LIFECYCLE_FACADE + "/delete-scheduled-connection-refresh") .build(), - new DeleteScheduledConnectionRefreshRequest(connectionId, authorizationType)); + new DeleteScheduledConnectionRefreshRequest(connectionId, authorizationType, tenantId)); } @SuppressFBWarnings("EI") private record ScheduleConnectionRefreshRequest( - Long connectionId, Map parameters, AuthorizationType authorizationType) { + Long connectionId, Map parameters, AuthorizationType authorizationType, String tenantId) { } - private record DeleteScheduledConnectionRefreshRequest(Long connectionId, AuthorizationType authorizationType) { + private record DeleteScheduledConnectionRefreshRequest( + Long connectionId, AuthorizationType authorizationType, String tenantId) { } } diff --git a/server/libs/platform/platform-component/platform-component-api/src/main/java/com/bytechef/platform/component/facade/ConnectionDefinitionFacade.java b/server/ee/libs/platform/platform-workflow/platform-workflow-execution/platform-workflow-execution-remote-rest/src/main/java/com/bytechef/ee/platform/workflow/execution/remote/web/rest/facade/RemoteConnectionLifecycleFacadeController.java similarity index 70% rename from server/libs/platform/platform-component/platform-component-api/src/main/java/com/bytechef/platform/component/facade/ConnectionDefinitionFacade.java rename to server/ee/libs/platform/platform-workflow/platform-workflow-execution/platform-workflow-execution-remote-rest/src/main/java/com/bytechef/ee/platform/workflow/execution/remote/web/rest/facade/RemoteConnectionLifecycleFacadeController.java index 2eccad69b3a..8062673531a 100644 --- a/server/libs/platform/platform-component/platform-component-api/src/main/java/com/bytechef/platform/component/facade/ConnectionDefinitionFacade.java +++ b/server/ee/libs/platform/platform-workflow/platform-workflow-execution/platform-workflow-execution-remote-rest/src/main/java/com/bytechef/ee/platform/workflow/execution/remote/web/rest/facade/RemoteConnectionLifecycleFacadeController.java @@ -14,15 +14,10 @@ * limitations under the License. */ -package com.bytechef.platform.component.facade; - -import com.bytechef.platform.component.ComponentConnection; -import javax.annotation.Nullable; +package com.bytechef.ee.platform.workflow.execution.remote.web.rest.facade; /** * @author Nikolina Spehar */ -public interface ConnectionDefinitionFacade { - - ComponentConnection executeConnectionRefresh(@Nullable Long connectionId); +public class RemoteConnectionLifecycleFacadeController { } diff --git a/server/libs/automation/automation-configuration/automation-configuration-service/src/main/java/com/bytechef/automation/configuration/facade/WorkspaceConnectionFacadeImpl.java b/server/libs/automation/automation-configuration/automation-configuration-service/src/main/java/com/bytechef/automation/configuration/facade/WorkspaceConnectionFacadeImpl.java index 21641b7b6a5..bb4b5bc5d33 100644 --- a/server/libs/automation/automation-configuration/automation-configuration-service/src/main/java/com/bytechef/automation/configuration/facade/WorkspaceConnectionFacadeImpl.java +++ b/server/libs/automation/automation-configuration/automation-configuration-service/src/main/java/com/bytechef/automation/configuration/facade/WorkspaceConnectionFacadeImpl.java @@ -25,6 +25,7 @@ import com.bytechef.platform.connection.facade.ConnectionFacade; import com.bytechef.platform.constant.PlatformType; import com.bytechef.platform.workflow.execution.facade.ConnectionLifecycleFacade; +import com.bytechef.tenant.TenantContext; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.List; import org.springframework.stereotype.Service; @@ -45,8 +46,7 @@ public class WorkspaceConnectionFacadeImpl implements WorkspaceConnectionFacade @SuppressFBWarnings("EI") public WorkspaceConnectionFacadeImpl( - ConnectionFacade connectionFacade, - ConnectionLifecycleFacade connectionLifecycleFacade, + ConnectionFacade connectionFacade, ConnectionLifecycleFacade connectionLifecycleFacade, ProjectDeploymentWorkflowService projectDeploymentWorkflowService, WorkflowTestConfigurationService workflowTestConfigurationService, WorkspaceConnectionService workspaceConnectionService) { @@ -61,13 +61,14 @@ public WorkspaceConnectionFacadeImpl( @Override public long create(long workspaceId, ConnectionDTO connectionDTO) { long connectionId = connectionFacade.create(connectionDTO, PlatformType.AUTOMATION); + String tenantId = TenantContext.getCurrentTenantId(); workspaceConnectionService.create(connectionId, workspaceId); ConnectionDTO connection = connectionFacade.getConnection(connectionId); connectionLifecycleFacade.scheduleConnectionRefresh( - connectionId, connection.parameters(), connection.authorizationType()); + connectionId, connection.parameters(), connection.authorizationType(), tenantId); return connectionId; } @@ -75,9 +76,11 @@ public long create(long workspaceId, ConnectionDTO connectionDTO) { @Override public void delete(long connectionId) { ConnectionDTO connection = connectionFacade.getConnection(connectionId); + String tenantId = TenantContext.getCurrentTenantId(); try { - connectionLifecycleFacade.deleteScheduledConnectionRefresh(connectionId, connection.authorizationType()); + connectionLifecycleFacade.deleteScheduledConnectionRefresh( + connectionId, connection.authorizationType(), tenantId); } catch (RuntimeException e) { throw new RuntimeException(e); diff --git a/server/libs/config/app-config/src/main/java/com/bytechef/config/ApplicationProperties.java b/server/libs/config/app-config/src/main/java/com/bytechef/config/ApplicationProperties.java index 008663bad4e..12a13961235 100644 --- a/server/libs/config/app-config/src/main/java/com/bytechef/config/ApplicationProperties.java +++ b/server/libs/config/app-config/src/main/java/com/bytechef/config/ApplicationProperties.java @@ -2075,6 +2075,9 @@ public static class Coordinator { /** Trigger coordination configuration */ private Trigger trigger = new Trigger(); + /** Connection cordinator configuration */ + private Connection connection = new Connection(); + public boolean isEnabled() { return enabled; } @@ -2087,6 +2090,10 @@ public Trigger getTrigger() { return trigger; } + public Connection getConnection() { + return connection; + } + public void setEnabled(boolean enabled) { this.enabled = enabled; } @@ -2099,6 +2106,10 @@ public void setTrigger(Trigger trigger) { this.trigger = trigger; } + public void setConnection(Connection connection) { + this.connection = connection; + } + /** * Task coordination configuration for workflow tasks. */ @@ -2188,6 +2199,50 @@ public void setTaskExecutionErrorEvents(int taskExecutionErrorEvents) { } } + /** + * Connection coordination configuration for connection refresh. + */ + public static class Connection { + + /** Scheduler configuration */ + private Scheduler scheduler = new Scheduler(); + + public Scheduler getScheduler() { + return scheduler; + } + + public void setScheduler(Scheduler scheduler) { + this.scheduler = scheduler; + } + + /** + * Scheduler configuration for connection refresh. + */ + public static class Scheduler { + + /** + * Available scheduler providers. + */ + public enum Provider { + /** AWS EventBridge Scheduler */ + AWS, + /** Quartz Scheduler */ + QUARTZ + } + + /** Scheduler provider */ + private Trigger.Scheduler.Provider provider = Trigger.Scheduler.Provider.QUARTZ; + + public Trigger.Scheduler.Provider getProvider() { + return provider; + } + + public void setProvider(Trigger.Scheduler.Provider provider) { + this.provider = provider; + } + } + } + /** * Trigger coordination configuration for workflow triggers. */ diff --git a/server/libs/platform/platform-component/platform-component-api/src/main/java/com/bytechef/platform/component/service/ConnectionDefinitionService.java b/server/libs/platform/platform-component/platform-component-api/src/main/java/com/bytechef/platform/component/service/ConnectionDefinitionService.java index 0e3b728e861..786ed74617f 100644 --- a/server/libs/platform/platform-component/platform-component-api/src/main/java/com/bytechef/platform/component/service/ConnectionDefinitionService.java +++ b/server/libs/platform/platform-component/platform-component-api/src/main/java/com/bytechef/platform/component/service/ConnectionDefinitionService.java @@ -53,7 +53,7 @@ ApplyResponse executeAuthorizationApply( Optional executeBaseUri(String componentName, ComponentConnection componentConnection, Context context); - Context createConnectionRefreshContext(String componentName, ComponentConnection componentConnection); + ComponentConnection executeConnectionRefresh(ComponentConnection componentConnection); RefreshTokenResponse executeRefresh( String componentName, int connectionVersion, AuthorizationType authorizationType, diff --git a/server/libs/platform/platform-component/platform-component-service/src/main/java/com/bytechef/platform/component/aspect/TokenRefreshHandler.java b/server/libs/platform/platform-component/platform-component-service/src/main/java/com/bytechef/platform/component/aspect/TokenRefreshHandler.java index 41f91b9c429..d7a4f656f7d 100644 --- a/server/libs/platform/platform-component/platform-component-service/src/main/java/com/bytechef/platform/component/aspect/TokenRefreshHandler.java +++ b/server/libs/platform/platform-component/platform-component-service/src/main/java/com/bytechef/platform/component/aspect/TokenRefreshHandler.java @@ -185,7 +185,7 @@ public ComponentConnection refreshCredentials(ComponentConnection componentConne * * @param connectionId the ID of the connection to mark as invalid */ - public void markCredentialsInvalid(long connectionId) { + private void markCredentialsInvalid(long connectionId) { connectionService.updateConnectionCredentialStatus(connectionId, Connection.CredentialStatus.INVALID); } } diff --git a/server/libs/platform/platform-component/platform-component-service/src/main/java/com/bytechef/platform/component/facade/ConnectionDefinitionFacadeImpl.java b/server/libs/platform/platform-component/platform-component-service/src/main/java/com/bytechef/platform/component/facade/ConnectionDefinitionFacadeImpl.java deleted file mode 100644 index 03ebdfac415..00000000000 --- a/server/libs/platform/platform-component/platform-component-service/src/main/java/com/bytechef/platform/component/facade/ConnectionDefinitionFacadeImpl.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2025 ByteChef - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytechef.platform.component.facade; - -import com.bytechef.component.definition.Context; -import com.bytechef.platform.component.ComponentConnection; -import com.bytechef.platform.component.aspect.TokenRefreshHandler; -import com.bytechef.platform.component.service.ConnectionDefinitionService; -import com.bytechef.platform.connection.domain.Connection; -import com.bytechef.platform.connection.service.ConnectionService; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import org.jspecify.annotations.Nullable; -import org.springframework.stereotype.Service; - -/** - * @author Nikolina Spehar - */ -@Service("connectionDefinitionFacade") -public class ConnectionDefinitionFacadeImpl implements ConnectionDefinitionFacade { - - private final ConnectionService connectionService; - private final ConnectionDefinitionService connectionDefinitionService; - private final TokenRefreshHandler tokenRefreshHandler; - - @SuppressFBWarnings("EI2") - public ConnectionDefinitionFacadeImpl( - ConnectionService connectionService, ConnectionDefinitionService connectionDefinitionService, - TokenRefreshHandler tokenRefreshHandler) { - - this.connectionService = connectionService; - this.connectionDefinitionService = connectionDefinitionService; - this.tokenRefreshHandler = tokenRefreshHandler; - } - - @Override - public ComponentConnection executeConnectionRefresh(@Nullable Long connectionId) { - - ComponentConnection componentConnection = getComponentConnection(connectionId); - - Context context = connectionDefinitionService.createConnectionRefreshContext( - componentConnection.getComponentName(), componentConnection); - - return tokenRefreshHandler.refreshCredentials(componentConnection, context); - } - - private ComponentConnection getComponentConnection(Long connectionId) { - ComponentConnection componentConnection = null; - - if (connectionId != null) { - Connection connection = connectionService.getConnection(connectionId); - - componentConnection = new ComponentConnection( - connection.getComponentName(), connection.getConnectionVersion(), connectionId, - connection.getParameters(), connection.getAuthorizationType()); - } - - return componentConnection; - } -} diff --git a/server/libs/platform/platform-component/platform-component-service/src/main/java/com/bytechef/platform/component/service/ConnectionDefinitionServiceImpl.java b/server/libs/platform/platform-component/platform-component-service/src/main/java/com/bytechef/platform/component/service/ConnectionDefinitionServiceImpl.java index ab8d02841f2..f0dc9cd483a 100644 --- a/server/libs/platform/platform-component/platform-component-service/src/main/java/com/bytechef/platform/component/service/ConnectionDefinitionServiceImpl.java +++ b/server/libs/platform/platform-component/platform-component-service/src/main/java/com/bytechef/platform/component/service/ConnectionDefinitionServiceImpl.java @@ -53,6 +53,7 @@ import com.bytechef.platform.component.annotation.WithTokenRefresh; import com.bytechef.platform.component.annotation.WithTokenRefresh.ComponentNameParam; import com.bytechef.platform.component.annotation.WithTokenRefresh.ConnectionParam; +import com.bytechef.platform.component.aspect.TokenRefreshHandler; import com.bytechef.platform.component.context.ContextFactory; import com.bytechef.platform.component.definition.ParametersFactory; import com.bytechef.platform.component.definition.ScriptComponentDefinition; @@ -92,12 +93,15 @@ public class ConnectionDefinitionServiceImpl implements ConnectionDefinitionServ private final ComponentDefinitionRegistry componentDefinitionRegistry; private final ContextFactory contextFactory; + private final TokenRefreshHandler tokenRefreshHandler; public ConnectionDefinitionServiceImpl( - @Lazy ComponentDefinitionRegistry componentDefinitionRegistry, ContextFactory contextFactory) { + @Lazy ComponentDefinitionRegistry componentDefinitionRegistry, ContextFactory contextFactory, + @Lazy TokenRefreshHandler tokenRefreshHandler) { this.componentDefinitionRegistry = componentDefinitionRegistry; this.contextFactory = contextFactory; + this.tokenRefreshHandler = tokenRefreshHandler; } @Override @@ -163,19 +167,6 @@ public Optional executeBaseUri( return executeBaseUriInternal(componentName, componentConnection, context); } - private Optional executeBaseUriInternal( - String componentName, ComponentConnection componentConnection, Context context) { - - com.bytechef.component.definition.ConnectionDefinition connectionDefinition = - componentDefinitionRegistry.getConnectionDefinition(componentName, componentConnection.getVersion()); - - BaseUriFunction baseUriFunction = connectionDefinition.getBaseUri() - .orElse((connectionParameters, context1) -> getDefaultBaseUri(connectionParameters)); - - return Optional.ofNullable( - baseUriFunction.apply(ParametersFactory.create(componentConnection.parameters()), context)); - } - @Override public ProviderException executeProcessErrorResponse( String componentName, int componentVersion, int connectionVersion, @Nullable String componentOperationName, @@ -197,10 +188,11 @@ public ProviderException executeProcessErrorResponse( } @Override - public Context createConnectionRefreshContext( - String componentName, ComponentConnection componentConnection) { + public ComponentConnection executeConnectionRefresh(ComponentConnection componentConnection) { + Context context = contextFactory.createContext( + componentConnection.getComponentName(), componentConnection); - return contextFactory.createContext(componentName, componentConnection); + return tokenRefreshHandler.refreshCredentials(componentConnection, context); } @Override @@ -397,6 +389,19 @@ private AuthorizationCallbackResponse executeAuthorizationCallback( } } + private Optional executeBaseUriInternal( + String componentName, ComponentConnection componentConnection, Context context) { + + com.bytechef.component.definition.ConnectionDefinition connectionDefinition = + componentDefinitionRegistry.getConnectionDefinition(componentName, componentConnection.getVersion()); + + BaseUriFunction baseUriFunction = connectionDefinition.getBaseUri() + .orElse((connectionParameters, context1) -> getDefaultBaseUri(connectionParameters)); + + return Optional.ofNullable( + baseUriFunction.apply(ParametersFactory.create(componentConnection.parameters()), context)); + } + private List getConnectableComponentDefinitions( String componentName, Integer componentVersion) { diff --git a/server/libs/platform/platform-connection/platform-connection-api/src/main/java/com/bytechef/platform/connection/facade/ConnectionFacade.java b/server/libs/platform/platform-connection/platform-connection-api/src/main/java/com/bytechef/platform/connection/facade/ConnectionFacade.java index 0a338c2d85a..3751d3c8933 100644 --- a/server/libs/platform/platform-connection/platform-connection-api/src/main/java/com/bytechef/platform/connection/facade/ConnectionFacade.java +++ b/server/libs/platform/platform-connection/platform-connection-api/src/main/java/com/bytechef/platform/connection/facade/ConnectionFacade.java @@ -30,6 +30,8 @@ public interface ConnectionFacade { void delete(Long id); + Long executeConnectionRefresh(Long connectionId); + ConnectionDTO getConnection(Long id); List getConnections(List connectionIds, PlatformType type); @@ -43,4 +45,5 @@ List getConnections( void update(long id, List tags); void update(long id, String name, List tags, int version); + } diff --git a/server/libs/platform/platform-connection/platform-connection-service/src/main/java/com/bytechef/platform/connection/facade/ConnectionFacadeImpl.java b/server/libs/platform/platform-connection/platform-connection-service/src/main/java/com/bytechef/platform/connection/facade/ConnectionFacadeImpl.java index 3c4729ee31c..6c472fbdb5d 100644 --- a/server/libs/platform/platform-connection/platform-connection-service/src/main/java/com/bytechef/platform/connection/facade/ConnectionFacadeImpl.java +++ b/server/libs/platform/platform-connection/platform-connection-service/src/main/java/com/bytechef/platform/connection/facade/ConnectionFacadeImpl.java @@ -147,6 +147,23 @@ public void delete(Long id) { // .forEach(tagService::delete); } + @Override + public Long executeConnectionRefresh(Long connectionId) { + Connection connection = connectionService.getConnection(connectionId); + + ComponentConnection componentConnection = new ComponentConnection( + connection.getComponentName(), connection.getConnectionVersion(), connection.getId(), + connection.getParameters(), connection.getAuthorizationType()); + + connectionDefinitionService.executeConnectionRefresh(componentConnection); + + connection = connectionService.getConnection(connectionId); + + Map parameters = connection.getParameters(); + + return (Long) parameters.get("expires_in"); + } + @Override @Transactional(readOnly = true) public ConnectionDTO getConnection(Long id) { @@ -316,8 +333,7 @@ private String getBaseUri( componentName, connectionVersion, connection.getId(), parameters, connection.getAuthorizationType()); - uri = connectionDefinitionService - .executeBaseUri(componentName, componentConnection) + uri = connectionDefinitionService.executeBaseUri(componentName, componentConnection) .orElse(null); } catch (IllegalStateException e) { if (logger.isDebugEnabled()) { diff --git a/server/libs/platform/platform-scheduler/platform-scheduler-api/src/main/java/com/bytechef/platform/scheduler/ConnectionRefreshScheduler.java b/server/libs/platform/platform-scheduler/platform-scheduler-api/src/main/java/com/bytechef/platform/scheduler/ConnectionRefreshScheduler.java index 81df2cdea27..794c35697eb 100644 --- a/server/libs/platform/platform-scheduler/platform-scheduler-api/src/main/java/com/bytechef/platform/scheduler/ConnectionRefreshScheduler.java +++ b/server/libs/platform/platform-scheduler/platform-scheduler-api/src/main/java/com/bytechef/platform/scheduler/ConnectionRefreshScheduler.java @@ -23,7 +23,7 @@ */ public interface ConnectionRefreshScheduler { - void cancelConnectionRefresh(Long connectionId); + void cancelConnectionRefresh(Long connectionId, String tenantId); - void scheduleConnectionRefresh(Long connectionId, Instant expiry); + void scheduleConnectionRefresh(Long connectionId, Instant expiry, String tenantId); } diff --git a/server/libs/platform/platform-scheduler/platform-scheduler-impl/build.gradle.kts b/server/libs/platform/platform-scheduler/platform-scheduler-impl/build.gradle.kts index bb5d3fd7adc..a40a162bdd2 100644 --- a/server/libs/platform/platform-scheduler/platform-scheduler-impl/build.gradle.kts +++ b/server/libs/platform/platform-scheduler/platform-scheduler-impl/build.gradle.kts @@ -10,6 +10,7 @@ dependencies { implementation(project(":server:libs:config:app-config")) implementation(project(":server:libs:core:commons:commons-util")) implementation(project(":server:libs:core:tenant:tenant-api")) + implementation(project(":server:libs:platform:platform-connection:platform-connection-api")) implementation(project(":server:libs:platform:platform-workflow:platform-workflow-coordinator:platform-workflow-coordinator-api")) diff --git a/server/libs/platform/platform-scheduler/platform-scheduler-impl/src/main/java/com/bytechef/platform/scheduler/QuartzConnectionRefreshScheduler.java b/server/libs/platform/platform-scheduler/platform-scheduler-impl/src/main/java/com/bytechef/platform/scheduler/QuartzConnectionRefreshScheduler.java index b3409251133..b084d150582 100644 --- a/server/libs/platform/platform-scheduler/platform-scheduler-impl/src/main/java/com/bytechef/platform/scheduler/QuartzConnectionRefreshScheduler.java +++ b/server/libs/platform/platform-scheduler/platform-scheduler-impl/src/main/java/com/bytechef/platform/scheduler/QuartzConnectionRefreshScheduler.java @@ -38,6 +38,8 @@ */ public class QuartzConnectionRefreshScheduler implements ConnectionRefreshScheduler { + private static final String CONNECTION_OAUTH_2_TOKEN_REFRESH = "ConnectionOauth2TokenRefresh"; + private final Scheduler scheduler; private static final Logger log = LoggerFactory.getLogger(QuartzConnectionRefreshScheduler.class); @@ -48,19 +50,20 @@ public QuartzConnectionRefreshScheduler(Scheduler scheduler) { } @Override - public void cancelConnectionRefresh(Long connectionId) { - deleteJob(connectionId, "ConnectionOauth2TokenRefresh"); + public void cancelConnectionRefresh(Long connectionId, String tenantId) { + deleteJob(connectionId, tenantId); } @Override - public void scheduleConnectionRefresh(Long connectionId, Instant tokenExpirationTime) { + public void scheduleConnectionRefresh(Long connectionId, Instant tokenExpirationTime, String tenantId) { JobDetail jobDetail = JobBuilder.newJob(ConnectionOAuth2TokenRefreshJob.class) - .withIdentity(JobKey.jobKey(connectionId.toString(), "ConnectionOauth2TokenRefresh")) + .withIdentity(JobKey.jobKey(tenantId + connectionId, CONNECTION_OAUTH_2_TOKEN_REFRESH)) .usingJobData("connectionId", connectionId) + .usingJobData("tenantId", tenantId) .build(); Trigger trigger = TriggerBuilder.newTrigger() - .withIdentity(TriggerKey.triggerKey(connectionId.toString(), "ConnectionOauth2TokenRefresh")) + .withIdentity(TriggerKey.triggerKey(tenantId + connectionId, CONNECTION_OAUTH_2_TOKEN_REFRESH)) .withDescription("Connection OAuth2 token refresh for " + connectionId) .startAt(Date.from(tokenExpirationTime.minus(Duration.ofMinutes(5)))) .build(); @@ -68,20 +71,25 @@ public void scheduleConnectionRefresh(Long connectionId, Instant tokenExpiration schedule(jobDetail, trigger); } - private void deleteJob(Long connectionId, String triggerKey) { + private void deleteJob(Long connectionId, String tenantId) { try { - JobKey jobKey = JobKey.jobKey(connectionId.toString(), triggerKey); + JobKey jobKey = JobKey.jobKey(tenantId + connectionId, CONNECTION_OAUTH_2_TOKEN_REFRESH); if (scheduler.checkExists(jobKey) && scheduler.deleteJob(jobKey)) { - log.trace("Refresh token job removed for connectionId: {}, triggerKey: {}", connectionId, triggerKey); + log.trace( + "Refresh token job removed for connectionId: {}, tenantId: {}, triggerKey: {}", connectionId, + tenantId, CONNECTION_OAUTH_2_TOKEN_REFRESH); return; } - log.error("Refresh token job not found for connectionId: {}, triggerKey: {}", connectionId, triggerKey); + log.error( + "Refresh token job not found for connectionId: {}, tenantId: {}, triggerKey: {}", connectionId, + tenantId, CONNECTION_OAUTH_2_TOKEN_REFRESH); } catch (SchedulerException e) { log.error( - "Unable to delete refresh token job for connectionId: {}, triggerKey: {}", connectionId, triggerKey); + "Unable to delete refresh token job for connectionId: {}, tenantId: {}, triggerKey: {}", + connectionId, tenantId, CONNECTION_OAUTH_2_TOKEN_REFRESH); } } diff --git a/server/libs/platform/platform-scheduler/platform-scheduler-impl/src/main/java/com/bytechef/platform/scheduler/config/QuartzConnectionRefreshSchedulerConfiguration.java b/server/libs/platform/platform-scheduler/platform-scheduler-impl/src/main/java/com/bytechef/platform/scheduler/config/QuartzConnectionRefreshSchedulerConfiguration.java index 14daf40ce23..95124ca7a4f 100644 --- a/server/libs/platform/platform-scheduler/platform-scheduler-impl/src/main/java/com/bytechef/platform/scheduler/config/QuartzConnectionRefreshSchedulerConfiguration.java +++ b/server/libs/platform/platform-scheduler/platform-scheduler-impl/src/main/java/com/bytechef/platform/scheduler/config/QuartzConnectionRefreshSchedulerConfiguration.java @@ -16,7 +16,6 @@ package com.bytechef.platform.scheduler.config; -import com.bytechef.config.ApplicationProperties; import com.bytechef.platform.scheduler.ConnectionRefreshScheduler; import com.bytechef.platform.scheduler.QuartzConnectionRefreshScheduler; import org.quartz.Scheduler; @@ -57,9 +56,7 @@ JobFactory jobFactoryConnection() { } @Bean - ConnectionRefreshScheduler quartzConnectionRefreshScheduler( - ApplicationProperties applicationProperties, @Lazy Scheduler scheduler) { - + ConnectionRefreshScheduler quartzConnectionRefreshScheduler(@Lazy Scheduler scheduler) { return new QuartzConnectionRefreshScheduler(scheduler); } diff --git a/server/libs/platform/platform-scheduler/platform-scheduler-impl/src/main/java/com/bytechef/platform/scheduler/job/ConnectionOAuth2TokenRefreshJob.java b/server/libs/platform/platform-scheduler/platform-scheduler-impl/src/main/java/com/bytechef/platform/scheduler/job/ConnectionOAuth2TokenRefreshJob.java index 6351a6112a6..3f9078a24a8 100644 --- a/server/libs/platform/platform-scheduler/platform-scheduler-impl/src/main/java/com/bytechef/platform/scheduler/job/ConnectionOAuth2TokenRefreshJob.java +++ b/server/libs/platform/platform-scheduler/platform-scheduler-impl/src/main/java/com/bytechef/platform/scheduler/job/ConnectionOAuth2TokenRefreshJob.java @@ -16,20 +16,20 @@ package com.bytechef.platform.scheduler.job; -import com.bytechef.platform.component.ComponentConnection; -import com.bytechef.platform.component.facade.ConnectionDefinitionFacade; +import com.bytechef.platform.connection.facade.ConnectionFacade; +import com.bytechef.tenant.TenantContext; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.Duration; import java.time.Instant; import java.util.Date; -import java.util.Map; import org.quartz.Job; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.Scheduler; import org.quartz.SchedulerException; +import org.quartz.Trigger; import org.quartz.TriggerBuilder; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** @@ -38,35 +38,35 @@ @Component public class ConnectionOAuth2TokenRefreshJob implements Job { - @Autowired - private ConnectionDefinitionFacade connectionDefinitionFacade; + private final ConnectionFacade connectionFacade; - public ConnectionOAuth2TokenRefreshJob() { + @SuppressFBWarnings("EI") + public ConnectionOAuth2TokenRefreshJob(ConnectionFacade connectionFacade) { + this.connectionFacade = connectionFacade; } @Override public void execute(JobExecutionContext context) throws JobExecutionException { JobDataMap jobDataMap = context.getMergedJobDataMap(); Long connectionId = jobDataMap.getLong("connectionId"); + String tenantId = jobDataMap.getString("tenantId"); - ComponentConnection componentConnection = connectionDefinitionFacade.executeConnectionRefresh(connectionId); + Long expiresIn = TenantContext.callWithTenantId( + tenantId, () -> connectionFacade.executeConnectionRefresh(connectionId)); Scheduler scheduler = context.getScheduler(); try { - Map parameters = componentConnection.getParameters(); + Instant now = Instant.now(); - Long expiresIn = (Long) parameters.get("expires_in"); + Instant nextTriggerTime = now.plusSeconds(expiresIn); - Instant nextTriggerTime = Instant.now() - .plusSeconds(expiresIn); + Trigger trigger = context.getTrigger(); scheduler.rescheduleJob( - context.getTrigger() - .getKey(), + trigger.getKey(), TriggerBuilder.newTrigger() - .withIdentity(context.getTrigger() - .getKey()) + .withIdentity(trigger.getKey()) .withDescription("Connection OAuth2 token refresh for " + connectionId) .startAt(Date.from(nextTriggerTime.minus(Duration.ofMinutes(5)))) .build()); diff --git a/server/libs/platform/platform-workflow/platform-workflow-execution/platform-workflow-execution-api/src/main/java/com/bytechef/platform/workflow/execution/facade/ConnectionLifecycleFacade.java b/server/libs/platform/platform-workflow/platform-workflow-execution/platform-workflow-execution-api/src/main/java/com/bytechef/platform/workflow/execution/facade/ConnectionLifecycleFacade.java index 7a1800c9818..4469ca7099a 100644 --- a/server/libs/platform/platform-workflow/platform-workflow-execution/platform-workflow-execution-api/src/main/java/com/bytechef/platform/workflow/execution/facade/ConnectionLifecycleFacade.java +++ b/server/libs/platform/platform-workflow/platform-workflow-execution/platform-workflow-execution-api/src/main/java/com/bytechef/platform/workflow/execution/facade/ConnectionLifecycleFacade.java @@ -24,7 +24,8 @@ */ public interface ConnectionLifecycleFacade { - void scheduleConnectionRefresh(Long connectionId, Map parameters, AuthorizationType authorizationType); + void scheduleConnectionRefresh( + Long connectionId, Map parameters, AuthorizationType authorizationType, String tenantId); - void deleteScheduledConnectionRefresh(Long connectionId, AuthorizationType authorizationType); + void deleteScheduledConnectionRefresh(Long connectionId, AuthorizationType authorizationType, String tenantId); } diff --git a/server/libs/platform/platform-workflow/platform-workflow-execution/platform-workflow-execution-service/src/main/java/com/bytechef/platform/workflow/execution/facade/ConnectionLifecycleFacadeImpl.java b/server/libs/platform/platform-workflow/platform-workflow-execution/platform-workflow-execution-service/src/main/java/com/bytechef/platform/workflow/execution/facade/ConnectionLifecycleFacadeImpl.java index 1fec491d597..41411c7b211 100644 --- a/server/libs/platform/platform-workflow/platform-workflow-execution/platform-workflow-execution-service/src/main/java/com/bytechef/platform/workflow/execution/facade/ConnectionLifecycleFacadeImpl.java +++ b/server/libs/platform/platform-workflow/platform-workflow-execution/platform-workflow-execution-service/src/main/java/com/bytechef/platform/workflow/execution/facade/ConnectionLifecycleFacadeImpl.java @@ -42,7 +42,7 @@ public ConnectionLifecycleFacadeImpl(ConnectionRefreshScheduler connectionRefres @Override public void scheduleConnectionRefresh( - Long connectionId, Map parameters, AuthorizationType authorizationType) { + Long connectionId, Map parameters, AuthorizationType authorizationType, String tenantId) { try { if (authorizationType == AuthorizationType.OAUTH2_AUTHORIZATION_CODE || @@ -54,7 +54,7 @@ public void scheduleConnectionRefresh( .plusSeconds(expiresIn); if (expiry != null) { - connectionRefreshScheduler.scheduleConnectionRefresh(connectionId, expiry); + connectionRefreshScheduler.scheduleConnectionRefresh(connectionId, expiry, tenantId); } } } catch (Exception e) { @@ -64,11 +64,13 @@ public void scheduleConnectionRefresh( } @Override - public void deleteScheduledConnectionRefresh(Long connectionId, AuthorizationType authorizationType) { + public void deleteScheduledConnectionRefresh( + Long connectionId, AuthorizationType authorizationType, String tenantId) { + if (authorizationType == AuthorizationType.OAUTH2_AUTHORIZATION_CODE || authorizationType == AuthorizationType.OAUTH2_AUTHORIZATION_CODE_PKCE) { - connectionRefreshScheduler.cancelConnectionRefresh(connectionId); + connectionRefreshScheduler.cancelConnectionRefresh(connectionId, tenantId); } } }