Skip to content

Commit 2bf7a90

Browse files
authored
Extend scheduler interface for Multitenancy (#3014) (#3034)
Signed-off-by: Louis Chu <clingzhi@amazon.com> (cherry picked from commit 37188bd)
1 parent 9915e73 commit 2bf7a90

13 files changed

Lines changed: 146 additions & 67 deletions

File tree

.github/workflows/integ-tests-with-security.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ jobs:
4747
4848
- name: Upload test reports
4949
if: ${{ always() }}
50-
uses: actions/upload-artifact@v2
50+
uses: actions/upload-artifact@v4
5151
continue-on-error: true
5252
with:
5353
name: test-reports-${{ matrix.os }}-${{ matrix.java }}
@@ -79,7 +79,7 @@ jobs:
7979

8080
- name: Upload test reports
8181
if: ${{ always() }}
82-
uses: actions/upload-artifact@v2
82+
uses: actions/upload-artifact@v4
8383
continue-on-error: true
8484
with:
8585
name: test-reports-${{ matrix.os }}-${{ matrix.java }}

.github/workflows/sql-test-and-build-workflow.yml

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,15 @@ jobs:
7676
token: ${{ secrets.CODECOV_TOKEN }}
7777

7878
- name: Upload Artifacts
79-
uses: actions/upload-artifact@v2
79+
uses: actions/upload-artifact@v4
80+
continue-on-error: true
8081
with:
8182
name: opensearch-sql-ubuntu-latest
8283
path: opensearch-sql-builds
8384

8485
- name: Upload test reports
85-
if: always()
86-
uses: actions/upload-artifact@v2
86+
if: ${{ always() }}
87+
uses: actions/upload-artifact@v4
8788
continue-on-error: true
8889
with:
8990
name: test-reports
@@ -130,7 +131,27 @@ jobs:
130131
cp -r ./plugin/build/distributions/*.zip opensearch-sql-builds/
131132
132133
- name: Upload Artifacts
133-
uses: actions/upload-artifact@v2
134+
uses: actions/upload-artifact@v4
135+
continue-on-error: true
134136
with:
135137
name: opensearch-sql-${{ matrix.entry.os }}
136138
path: opensearch-sql-builds
139+
140+
- name: Upload test reports
141+
if: ${{ always() && matrix.entry.os == 'ubuntu-latest' }}
142+
uses: actions/upload-artifact@v4
143+
continue-on-error: true
144+
with:
145+
name: test-reports-${{ matrix.entry.os }}-${{ matrix.entry.java }}
146+
path: |
147+
sql/build/reports/**
148+
ppl/build/reports/**
149+
core/build/reports/**
150+
common/build/reports/**
151+
opensearch/build/reports/**
152+
integ-test/build/reports/**
153+
protocol/build/reports/**
154+
legacy/build/reports/**
155+
plugin/build/reports/**
156+
doctest/build/testclusters/docTestCluster-0/logs/*
157+
integ-test/build/testclusters/*/logs/*

async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ void runOp(
6262
this.flintIndexMetadataService.updateIndexToManualRefresh(
6363
flintIndexMetadata.getOpensearchIndexName(), flintIndexOptions, asyncQueryRequestContext);
6464
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
65-
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
65+
asyncQueryScheduler.unscheduleJob(
66+
flintIndexMetadata.getOpensearchIndexName(), asyncQueryRequestContext);
6667
} else {
6768
cancelStreamingJob(flintIndexStateModel);
6869
}

async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ void runOp(
5454
"Performing drop index operation for index: {}",
5555
flintIndexMetadata.getOpensearchIndexName());
5656
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
57-
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
57+
asyncQueryScheduler.unscheduleJob(
58+
flintIndexMetadata.getOpensearchIndexName(), asyncQueryRequestContext);
5859
} else {
5960
cancelStreamingJob(flintIndexStateModel);
6061
}

async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
16
package org.opensearch.sql.spark.scheduler;
27

8+
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
39
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;
410

511
/** Scheduler interface for scheduling asynchronous query jobs. */
@@ -13,10 +19,13 @@ public interface AsyncQueryScheduler {
1319
* task
1420
*
1521
* @param asyncQuerySchedulerRequest The request containing job configuration details
22+
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
1623
* @throws IllegalArgumentException if a job with the same name already exists
1724
* @throws RuntimeException if there's an error during job creation
1825
*/
19-
void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);
26+
void scheduleJob(
27+
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
28+
AsyncQueryRequestContext asyncQueryRequestContext);
2029

2130
/**
2231
* Updates an existing job with new parameters. This method modifies the configuration of an
@@ -26,10 +35,13 @@ public interface AsyncQueryScheduler {
2635
* scheduled job - Updating resource allocations for a job
2736
*
2837
* @param asyncQuerySchedulerRequest The request containing updated job configuration
38+
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
2939
* @throws IllegalArgumentException if the job to be updated doesn't exist
3040
* @throws RuntimeException if there's an error during the update process
3141
*/
32-
void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);
42+
void updateJob(
43+
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
44+
AsyncQueryRequestContext asyncQueryRequestContext);
3345

3446
/**
3547
* Unschedules a job by marking it as disabled and updating its last update time. This method is
@@ -41,8 +53,11 @@ public interface AsyncQueryScheduler {
4153
* re-enabling of the job in the future
4254
*
4355
* @param jobId The unique identifier of the job to unschedule
56+
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
57+
* @throws IllegalArgumentException if the job to be unscheduled doesn't exist
58+
* @throws RuntimeException if there's an error during the unschedule process
4459
*/
45-
void unscheduleJob(String jobId);
60+
void unscheduleJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext);
4661

4762
/**
4863
* Removes a job completely from the scheduler. This method permanently deletes the job and all
@@ -52,6 +67,9 @@ public interface AsyncQueryScheduler {
5267
* created jobs - Freeing up resources by deleting unused job configurations
5368
*
5469
* @param jobId The unique identifier of the job to remove
70+
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
71+
* @throws IllegalArgumentException if the job to be removed doesn't exist
72+
* @throws RuntimeException if there's an error during the remove process
5573
*/
56-
void removeJob(String jobId);
74+
void removeJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext);
5775
}

async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/model/AsyncQuerySchedulerRequest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@
77

88
import java.time.Instant;
99
import lombok.AllArgsConstructor;
10+
import lombok.Builder;
1011
import lombok.Data;
1112
import lombok.NoArgsConstructor;
1213
import org.opensearch.sql.spark.rest.model.LangType;
1314

1415
/** Represents a job request for a scheduled task. */
1516
@Data
17+
@Builder
1618
@NoArgsConstructor
1719
@AllArgsConstructor
1820
public class AsyncQuerySchedulerRequest {

async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ public void createDropIndexQueryWithScheduler() {
230230
verifyCreateIndexDMLResultCalled();
231231
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH);
232232

233-
verify(asyncQueryScheduler).unscheduleJob(indexName);
233+
verify(asyncQueryScheduler).unscheduleJob(indexName, asyncQueryRequestContext);
234234
}
235235

236236
@Test
@@ -318,8 +318,7 @@ public void createAlterIndexQueryWithScheduler() {
318318
FlintIndexOptions flintIndexOptions = flintIndexOptionsArgumentCaptor.getValue();
319319
assertFalse(flintIndexOptions.autoRefresh());
320320

321-
verify(asyncQueryScheduler).unscheduleJob(indexName);
322-
321+
verify(asyncQueryScheduler).unscheduleJob(indexName, asyncQueryRequestContext);
323322
verifyCreateIndexDMLResultCalled();
324323
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH);
325324
}

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;
99

1010
import com.google.common.annotations.VisibleForTesting;
11+
import com.google.common.base.Strings;
1112
import java.io.InputStream;
1213
import java.nio.charset.StandardCharsets;
1314
import java.time.Instant;
@@ -35,6 +36,7 @@
3536
import org.opensearch.index.engine.DocumentMissingException;
3637
import org.opensearch.index.engine.VersionConflictEngineException;
3738
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
39+
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
3840
import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJobRunner;
3941
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;
4042
import org.opensearch.sql.spark.scheduler.model.ScheduledAsyncQueryJobRequest;
@@ -55,7 +57,9 @@ public class OpenSearchAsyncQueryScheduler implements AsyncQueryScheduler {
5557

5658
@Override
5759
/** Schedules a new job by indexing it into the job index. */
58-
public void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
60+
public void scheduleJob(
61+
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
62+
AsyncQueryRequestContext asyncQueryRequestContext) {
5963
ScheduledAsyncQueryJobRequest request =
6064
ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(asyncQuerySchedulerRequest);
6165
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
@@ -87,15 +91,18 @@ public void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
8791

8892
/** Unschedules a job by marking it as disabled and updating its last update time. */
8993
@Override
90-
public void unscheduleJob(String jobId) {
91-
ScheduledAsyncQueryJobRequest request =
92-
ScheduledAsyncQueryJobRequest.builder()
93-
.jobId(jobId)
94-
.enabled(false)
95-
.lastUpdateTime(Instant.now())
96-
.build();
94+
public void unscheduleJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext) {
95+
if (Strings.isNullOrEmpty(jobId)) {
96+
throw new IllegalArgumentException("JobId cannot be null or empty");
97+
}
9798
try {
98-
updateJob(request);
99+
AsyncQuerySchedulerRequest request =
100+
ScheduledAsyncQueryJobRequest.builder()
101+
.jobId(jobId)
102+
.enabled(false)
103+
.lastUpdateTime(Instant.now())
104+
.build();
105+
updateJob(request, asyncQueryRequestContext);
99106
LOG.info("Unscheduled job for jobId: {}", jobId);
100107
} catch (IllegalStateException | DocumentMissingException e) {
101108
LOG.error("Failed to unschedule job: {}", jobId, e);
@@ -105,7 +112,9 @@ public void unscheduleJob(String jobId) {
105112
/** Updates an existing job with new parameters. */
106113
@Override
107114
@SneakyThrows
108-
public void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
115+
public void updateJob(
116+
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
117+
AsyncQueryRequestContext asyncQueryRequestContext) {
109118
ScheduledAsyncQueryJobRequest request =
110119
ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(asyncQuerySchedulerRequest);
111120
assertIndexExists();
@@ -134,8 +143,11 @@ public void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
134143

135144
/** Removes a job by deleting its document from the index. */
136145
@Override
137-
public void removeJob(String jobId) {
146+
public void removeJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext) {
138147
assertIndexExists();
148+
if (Strings.isNullOrEmpty(jobId)) {
149+
throw new IllegalArgumentException("JobId cannot be null or empty");
150+
}
139151
DeleteRequest deleteRequest = new DeleteRequest(SCHEDULER_INDEX_NAME, jobId);
140152
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
141153
ActionFuture<DeleteResponse> deleteResponseActionFuture = client.delete(deleteRequest);

async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class ScheduledAsyncQueryJobRequest extends AsyncQuerySchedulerRequest
3838
public static final String ENABLED_FIELD = "enabled";
3939
private final Schedule schedule;
4040

41-
@Builder
41+
@Builder(builderMethodName = "scheduledAsyncQueryJobRequestBuilder")
4242
public ScheduledAsyncQueryJobRequest(
4343
String accountId,
4444
String jobId,
@@ -139,7 +139,7 @@ public static ScheduledAsyncQueryJobRequest fromAsyncQuerySchedulerRequest(
139139
AsyncQuerySchedulerRequest request) {
140140
Instant updateTime =
141141
request.getLastUpdateTime() != null ? request.getLastUpdateTime() : Instant.now();
142-
return ScheduledAsyncQueryJobRequest.builder()
142+
return ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
143143
.accountId(request.getAccountId())
144144
.jobId(request.getJobId())
145145
.dataSource(request.getDataSource())

async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/OpenSearchScheduleQueryJobRequestParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ private static Instant parseInstantValue(XContentParser parser) throws IOExcepti
3030
public static ScheduledJobParser getJobParser() {
3131
return (parser, id, jobDocVersion) -> {
3232
ScheduledAsyncQueryJobRequest.ScheduledAsyncQueryJobRequestBuilder builder =
33-
ScheduledAsyncQueryJobRequest.builder();
33+
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder();
3434
XContentParserUtils.ensureExpectedToken(
3535
XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
3636

0 commit comments

Comments
 (0)