Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/ee/apps/scheduler-app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
implementation(project(":server:ee:libs:config:observability-config"))
implementation(project(":server:ee:libs:core:discovery:discovery-redis"))
implementation(project(":server:ee:libs:platform:platform-component:platform-component-remote-client"))
implementation(project(":server:ee:libs:platform:platform-connection:platform-connection-remote-client"))
implementation(project(":server:ee:libs:platform:platform-scheduler:platform-scheduler-impl"))
implementation(project(":server:ee:libs:platform:platform-scheduler:platform-scheduler-remote-rest"))
implementation(project(":server:ee:libs:platform:platform-workflow:platform-workflow-execution:platform-workflow-execution-remote-client"))
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public Optional<String> executeBaseUri(
}

@Override
public Context createConnectionRefreshContext(String componentName, ComponentConnection componentConnection) {
public ComponentConnection executeConnectionRefresh(ComponentConnection componentConnection) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public void delete(Long id) {
throw new UnsupportedOperationException();
}

@Override
public Long executeConnectionRefresh(Long connectionId) {
throw new UnsupportedOperationException();
}

@Override
public ConnectionDTO getConnection(Long id) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ dependencies {
implementation("io.awspring.cloud:spring-cloud-aws-sqs")
implementation(project(":server:libs:config:app-config"))
implementation(project(":server:libs:core:commons:commons-util"))
implementation(project(":server:libs:core:tenant:tenant-api"))
implementation(project(":server:libs:platform:platform-scheduler:platform-scheduler-api"))
implementation(project(":server:libs:platform:platform-workflow:platform-workflow-coordinator:platform-workflow-coordinator-api"))
implementation(project(":server:libs:platform:platform-api"))
implementation(project(":server:libs:platform:platform-connection:platform-connection-api"))

implementation(project(":server:ee:libs:core:cloud:cloud-aws"))
implementation(project(":server:ee:libs:core:message:message-broker:message-broker-aws"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright 2025 ByteChef
*
* Licensed under the ByteChef Enterprise license (the "Enterprise License");
* you may not use this file except in compliance with the Enterprise License.
*/

package com.bytechef.ee.platform.scheduler.aws;

import static com.bytechef.ee.platform.scheduler.aws.constant.AwsConnectionRefreshSchedulerConstants.CONNECTION_REFRESH;
import static com.bytechef.ee.platform.scheduler.aws.constant.AwsConnectionRefreshSchedulerConstants.SCHEDULER_CONNECTION_REFRESH_QUEUE;

import com.bytechef.config.ApplicationProperties;
import com.bytechef.ee.platform.scheduler.aws.constant.AwsTriggerSchedulerConstants;
import com.bytechef.platform.scheduler.ConnectionRefreshScheduler;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.time.Duration;
import java.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.scheduler.SchedulerClient;
import software.amazon.awssdk.services.scheduler.model.ConflictException;
import software.amazon.awssdk.services.scheduler.model.FlexibleTimeWindowMode;
import software.amazon.awssdk.services.scheduler.model.Target;

/**
* @version ee
*
* @author Nikolina Spehar
*/
public class AwsConnectionRefreshScheduler implements ConnectionRefreshScheduler {

private static final Logger log = LoggerFactory.getLogger(AwsConnectionRefreshScheduler.class);

private final SchedulerClient schedulerClient;
private final String sqsArn;
private final String roleArn;

@SuppressFBWarnings("EI")
public AwsConnectionRefreshScheduler(ApplicationProperties.Cloud.Aws aws, SchedulerClient schedulerClient) {
this.schedulerClient = schedulerClient;

String accountId = aws.getAccountId();

this.sqsArn = "arn:aws:sqs:" + aws.getRegion() + ":" + accountId;
this.roleArn = "arn:aws:iam::" + accountId + ":role/schedule-role";
}

@Override
public void cancelConnectionRefresh(Long connectionId, String tenantId) {
try {
schedulerClient.deleteSchedule(request -> request.clientToken(tenantId + connectionId)
.groupName(CONNECTION_REFRESH)
.name(CONNECTION_REFRESH + tenantId + connectionId));
} catch (RuntimeException e) {
if (log.isDebugEnabled()) {
log.debug("Dynamic Webhook Trigger Refresh not defined");
}
}
}

@Override
public void scheduleConnectionRefresh(Long connectionId, Instant expiry, String tenantId) {
String scheduleName = CONNECTION_REFRESH + tenantId + connectionId;
String clientToken = tenantId + connectionId;

Instant now = Instant.now();

Duration between = Duration.between(now, expiry);

long secondsUntilExpiry = between.getSeconds();

long minutesUntilExpiry = secondsUntilExpiry / 60;

long refreshMinutes = Math.max(1, minutesUntilExpiry - 5);

String scheduleExpression = "rate(" + refreshMinutes + " minutes)";

log.info(
"Scheduling connection refresh for connection: {}, tenant: {}, expiry time: {}, refresh time: {}, " +
"schedule expression: {}, SQS ARN: {}, role ARN: {}",
connectionId, tenantId, expiry, refreshMinutes, scheduleExpression,
sqsArn + ":" + SCHEDULER_CONNECTION_REFRESH_QUEUE, roleArn);

Target sqsTarget = Target.builder()
.roleArn(roleArn)
.arn(sqsArn + ":" + SCHEDULER_CONNECTION_REFRESH_QUEUE)
.input(tenantId + AwsTriggerSchedulerConstants.SPLITTER + connectionId)
.build();

try {
schedulerClient.createSchedule(request -> request.clientToken(clientToken)
.groupName(CONNECTION_REFRESH)
.name(scheduleName)
.scheduleExpression(scheduleExpression)
.target(sqsTarget)
.flexibleTimeWindow(mode -> mode.mode(FlexibleTimeWindowMode.OFF))
.startDate(now.plus(Duration.ofMinutes(1))));

log.info("Schedule created successfully.");
} catch (ConflictException e) {
log.info("Schedule already exists, updating...");

schedulerClient.updateSchedule(request -> request.clientToken(clientToken)
.groupName(CONNECTION_REFRESH)
.name(scheduleName)
.scheduleExpression(scheduleExpression)
.target(sqsTarget)
.flexibleTimeWindow(mode -> mode.mode(FlexibleTimeWindowMode.OFF))
.startDate(now.plus(Duration.ofMinutes(1))));

log.info("Schedule updated successfully.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2025 ByteChef
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytechef.ee.platform.scheduler.aws.config;

import com.bytechef.config.ApplicationProperties;
import com.bytechef.ee.platform.scheduler.aws.AwsConnectionRefreshScheduler;
import com.bytechef.ee.platform.scheduler.aws.listener.ConnectionRefreshListener;
import com.bytechef.platform.annotation.ConditionalOnEEVersion;
import com.bytechef.platform.connection.facade.ConnectionFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.providers.AwsRegionProvider;
import software.amazon.awssdk.services.scheduler.SchedulerClient;

/**
* @author Nikolina Spehar
*/
@Configuration
@ConditionalOnProperty(
prefix = "bytechef", name = "coordinator.connection.scheduler.provider", havingValue = "aws")
@ConditionalOnEEVersion
public class AwsConnectionRefreshSchedulerConfiguration {

private static final Logger log = LoggerFactory.getLogger(AwsConnectionRefreshSchedulerConfiguration.class);

private final ApplicationProperties applicationProperties;

AwsConnectionRefreshSchedulerConfiguration(ApplicationProperties applicationProperties) {
if (log.isInfoEnabled()) {
log.info("Connection refresh scheduler provider type enabled: aws");
}

this.applicationProperties = applicationProperties;
}

@Bean
AwsConnectionRefreshScheduler awsConnectionRefreshScheduler(
AwsCredentialsProvider awsCredentialsProvider, AwsRegionProvider awsRegionProvider) {

ApplicationProperties.Cloud.Aws aws = applicationProperties.getCloud()
.getAws();

SchedulerClient schedulerClient = SchedulerClient.builder()
.credentialsProvider(awsCredentialsProvider)
.region(awsRegionProvider.getRegion())
.build();

return new AwsConnectionRefreshScheduler(aws, schedulerClient);
}

@Bean
ConnectionRefreshListener connectionRefreshListener(ConnectionFacade connectionFacade) {
return new ConnectionRefreshListener(connectionFacade);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2025 ByteChef
*
* Licensed under the ByteChef Enterprise license (the "Enterprise License");
* you may not use this file except in compliance with the Enterprise License.
*/

package com.bytechef.ee.platform.scheduler.aws.constant;

/**
* @version ee
*
* @author Nikolina Spehar
*/
public class AwsConnectionRefreshSchedulerConstants {

public static final String CONNECTION_REFRESH = "ConnectionRefresh";
public static final String SCHEDULER_CONNECTION_REFRESH_QUEUE = "scheduler_connection_refresh_queue";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2025 ByteChef
*
* Licensed under the ByteChef Enterprise license (the "Enterprise License");
* you may not use this file except in compliance with the Enterprise License.
*/

package com.bytechef.ee.platform.scheduler.aws.listener;

import static com.bytechef.ee.platform.scheduler.aws.constant.AwsConnectionRefreshSchedulerConstants.SCHEDULER_CONNECTION_REFRESH_QUEUE;
import static com.bytechef.ee.platform.scheduler.aws.constant.AwsTriggerSchedulerConstants.SPLITTER;

import com.bytechef.platform.connection.facade.ConnectionFacade;
import com.bytechef.tenant.TenantContext;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.awspring.cloud.sqs.annotation.SqsListener;

/**
* @version ee
*
* @author Nikolina Spehar
*/
public class ConnectionRefreshListener {

private final ConnectionFacade connectionFacade;

@SuppressFBWarnings("EI")
public ConnectionRefreshListener(ConnectionFacade connectionFacade) {
this.connectionFacade = connectionFacade;
}

@SqsListener(SCHEDULER_CONNECTION_REFRESH_QUEUE)
public void onSchedule(String message) {
String[] split = message.split(SPLITTER);
String tenantId = split[0];
Long connectionId = Long.valueOf(split[1]);

TenantContext.runWithTenantId(tenantId, () -> connectionFacade.executeConnectionRefresh(connectionId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,25 @@ public RemoteConnectionRefreshSchedulerClient(LoadBalancedRestClient loadBalance
}

@Override
public void cancelConnectionRefresh(Long connectionId) {
public void cancelConnectionRefresh(Long connectionId, String tenantId) {
loadBalancedRestClient.post(
uriBuilder -> uriBuilder
.host(SCHEDULER_APP)
.path(CONNECTION_REFRESH_SCHEDULER + "/cancel-connection-refresh")
.build(),
connectionId);
new ScheduleConnectionRefreshRequest(connectionId, Instant.now(), tenantId));
}

@Override
public void scheduleConnectionRefresh(Long connectionId, Instant expiry) {
public void scheduleConnectionRefresh(Long connectionId, Instant expiry, String tenantId) {
loadBalancedRestClient.post(
uriBuilder -> uriBuilder
.host(SCHEDULER_APP)
.path(CONNECTION_REFRESH_SCHEDULER + "/schedule-connection-refresh")
.build(),
new ScheduleConnectionRefreshRequest(connectionId, expiry));
new ScheduleConnectionRefreshRequest(connectionId, expiry, tenantId));
}

private record ScheduleConnectionRefreshRequest(Long connectionId, Instant expiry) {
private record ScheduleConnectionRefreshRequest(Long connectionId, Instant expiry, String tenantId) {
}
}
Loading
Loading