Skip to content

Commit e1ee3b1

Browse files
authored
Add AsyncQueryRequestContext to update/get in StatementStorageService (#2943)
Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
1 parent 83bc3d2 commit e1ee3b1

24 files changed

Lines changed: 217 additions & 104 deletions

async-query-core/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ CreateAsyncQueryResponse createAsyncQuery(
3131
* @param queryId queryId.
3232
* @return {@link AsyncQueryExecutionResponse}
3333
*/
34-
AsyncQueryExecutionResponse getAsyncQueryResults(String queryId);
34+
AsyncQueryExecutionResponse getAsyncQueryResults(
35+
String queryId, AsyncQueryRequestContext asyncQueryRequestContext);
3536

3637
/**
3738
* Cancels running async query and returns the cancelled queryId.

async-query-core/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,14 @@ public CreateAsyncQueryResponse createAsyncQuery(
7474
}
7575

7676
@Override
77-
public AsyncQueryExecutionResponse getAsyncQueryResults(String queryId) {
77+
public AsyncQueryExecutionResponse getAsyncQueryResults(
78+
String queryId, AsyncQueryRequestContext asyncQueryRequestContext) {
7879
Optional<AsyncQueryJobMetadata> jobMetadata =
7980
asyncQueryJobMetadataStorageService.getJobMetadata(queryId);
8081
if (jobMetadata.isPresent()) {
8182
String sessionId = jobMetadata.get().getSessionId();
82-
JSONObject jsonObject = sparkQueryDispatcher.getQueryResponse(jobMetadata.get());
83+
JSONObject jsonObject =
84+
sparkQueryDispatcher.getQueryResponse(jobMetadata.get(), asyncQueryRequestContext);
8385
if (JobRunState.SUCCESS.toString().equals(jsonObject.getString(STATUS_FIELD))) {
8486
DefaultSparkSqlFunctionResponseHandle sparkSqlFunctionResponseHandle =
8587
new DefaultSparkSqlFunctionResponseHandle(jsonObject);

async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/AsyncQueryHandler.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
/** Process async query request. */
2222
public abstract class AsyncQueryHandler {
2323

24-
public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) {
25-
JSONObject result = getResponseFromResultIndex(asyncQueryJobMetadata);
24+
public JSONObject getQueryResponse(
25+
AsyncQueryJobMetadata asyncQueryJobMetadata,
26+
AsyncQueryRequestContext asyncQueryRequestContext) {
27+
JSONObject result = getResponseFromResultIndex(asyncQueryJobMetadata, asyncQueryRequestContext);
2628
if (result.has(DATA_FIELD)) {
2729
JSONObject items = result.getJSONObject(DATA_FIELD);
2830

@@ -35,7 +37,8 @@ public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata)
3537
result.put(ERROR_FIELD, error);
3638
return result;
3739
} else {
38-
JSONObject statement = getResponseFromExecutor(asyncQueryJobMetadata);
40+
JSONObject statement =
41+
getResponseFromExecutor(asyncQueryJobMetadata, asyncQueryRequestContext);
3942

4043
// Consider statement still running if state is success but query result unavailable
4144
if (isSuccessState(statement)) {
@@ -50,10 +53,12 @@ private boolean isSuccessState(JSONObject statement) {
5053
}
5154

5255
protected abstract JSONObject getResponseFromResultIndex(
53-
AsyncQueryJobMetadata asyncQueryJobMetadata);
56+
AsyncQueryJobMetadata asyncQueryJobMetadata,
57+
AsyncQueryRequestContext asyncQueryRequestContext);
5458

5559
protected abstract JSONObject getResponseFromExecutor(
56-
AsyncQueryJobMetadata asyncQueryJobMetadata);
60+
AsyncQueryJobMetadata asyncQueryJobMetadata,
61+
AsyncQueryRequestContext asyncQueryRequestContext);
5762

5863
public abstract String cancelJob(
5964
AsyncQueryJobMetadata asyncQueryJobMetadata,

async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,19 @@ public class BatchQueryHandler extends AsyncQueryHandler {
4141
protected final SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider;
4242

4343
@Override
44-
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
44+
protected JSONObject getResponseFromResultIndex(
45+
AsyncQueryJobMetadata asyncQueryJobMetadata,
46+
AsyncQueryRequestContext asyncQueryRequestContext) {
4547
// either empty json when the result is not available or data with status
4648
// Fetch from Result Index
4749
return jobExecutionResponseReader.getResultWithJobId(
4850
asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex());
4951
}
5052

5153
@Override
52-
protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata) {
54+
protected JSONObject getResponseFromExecutor(
55+
AsyncQueryJobMetadata asyncQueryJobMetadata,
56+
AsyncQueryRequestContext asyncQueryRequestContext) {
5357
JSONObject result = new JSONObject();
5458
// make call to EMR Serverless when related result index documents are not available
5559
GetJobRunResult getJobRunResult =

async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,14 +162,18 @@ private FlintIndexMetadata getFlintIndexMetadata(
162162
}
163163

164164
@Override
165-
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
165+
protected JSONObject getResponseFromResultIndex(
166+
AsyncQueryJobMetadata asyncQueryJobMetadata,
167+
AsyncQueryRequestContext asyncQueryRequestContext) {
166168
String queryId = asyncQueryJobMetadata.getQueryId();
167169
return jobExecutionResponseReader.getResultWithQueryId(
168170
queryId, asyncQueryJobMetadata.getResultIndex());
169171
}
170172

171173
@Override
172-
protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata) {
174+
protected JSONObject getResponseFromExecutor(
175+
AsyncQueryJobMetadata asyncQueryJobMetadata,
176+
AsyncQueryRequestContext asyncQueryRequestContext) {
173177
// Consider statement still running if result doc created in submit() is not available yet
174178
JSONObject result = new JSONObject();
175179
result.put(STATUS_FIELD, StatementState.RUNNING.getState());

async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,21 +50,26 @@ public class InteractiveQueryHandler extends AsyncQueryHandler {
5050
protected final SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider;
5151

5252
@Override
53-
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
53+
protected JSONObject getResponseFromResultIndex(
54+
AsyncQueryJobMetadata asyncQueryJobMetadata,
55+
AsyncQueryRequestContext asyncQueryRequestContext) {
5456
String queryId = asyncQueryJobMetadata.getQueryId();
5557
return jobExecutionResponseReader.getResultWithQueryId(
5658
queryId, asyncQueryJobMetadata.getResultIndex());
5759
}
5860

5961
@Override
60-
protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata) {
62+
protected JSONObject getResponseFromExecutor(
63+
AsyncQueryJobMetadata asyncQueryJobMetadata,
64+
AsyncQueryRequestContext asyncQueryRequestContext) {
6165
JSONObject result = new JSONObject();
6266
String queryId = asyncQueryJobMetadata.getQueryId();
6367
Statement statement =
6468
getStatementByQueryId(
6569
asyncQueryJobMetadata.getSessionId(),
6670
queryId,
67-
asyncQueryJobMetadata.getDatasourceName());
71+
asyncQueryJobMetadata.getDatasourceName(),
72+
asyncQueryRequestContext);
6873
StatementState statementState = statement.getStatementState();
6974
result.put(STATUS_FIELD, statementState.getState());
7075
result.put(ERROR_FIELD, Optional.of(statement.getStatementModel().getError()).orElse(""));
@@ -79,7 +84,8 @@ public String cancelJob(
7984
getStatementByQueryId(
8085
asyncQueryJobMetadata.getSessionId(),
8186
queryId,
82-
asyncQueryJobMetadata.getDatasourceName())
87+
asyncQueryJobMetadata.getDatasourceName(),
88+
asyncQueryRequestContext)
8389
.cancel();
8490
return queryId;
8591
}
@@ -148,12 +154,16 @@ public DispatchQueryResponse submit(
148154
.build();
149155
}
150156

151-
private Statement getStatementByQueryId(String sessionId, String queryId, String datasourceName) {
157+
private Statement getStatementByQueryId(
158+
String sessionId,
159+
String queryId,
160+
String datasourceName,
161+
AsyncQueryRequestContext asyncQueryRequestContext) {
152162
Optional<Session> session = sessionManager.getSession(sessionId, datasourceName);
153163
if (session.isPresent()) {
154164
// todo, statementId == jobId if statement running in session.
155165
StatementId statementId = new StatementId(queryId);
156-
Optional<Statement> statement = session.get().get(statementId);
166+
Optional<Statement> statement = session.get().get(statementId, asyncQueryRequestContext);
157167
if (statement.isPresent()) {
158168
return statement.get();
159169
} else {

async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,11 @@ private boolean isEligibleForIndexDMLHandling(IndexQueryDetails indexQueryDetail
157157
&& !indexQueryDetails.getFlintIndexOptions().autoRefresh()));
158158
}
159159

160-
public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) {
160+
public JSONObject getQueryResponse(
161+
AsyncQueryJobMetadata asyncQueryJobMetadata,
162+
AsyncQueryRequestContext asyncQueryRequestContext) {
161163
return getAsyncQueryHandlerForExistingQuery(asyncQueryJobMetadata)
162-
.getQueryResponse(asyncQueryJobMetadata);
164+
.getQueryResponse(asyncQueryJobMetadata, asyncQueryRequestContext);
163165
}
164166

165167
public String cancelJob(

async-query-core/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,10 @@ public StatementId submit(
121121
}
122122

123123
@Override
124-
public Optional<Statement> get(StatementId stID) {
124+
public Optional<Statement> get(
125+
StatementId stID, AsyncQueryRequestContext asyncQueryRequestContext) {
125126
return statementStorageService
126-
.getStatement(stID.getId(), sessionModel.getDatasourceName())
127+
.getStatement(stID.getId(), sessionModel.getDatasourceName(), asyncQueryRequestContext)
127128
.map(
128129
model ->
129130
Statement.builder()
@@ -137,6 +138,7 @@ public Optional<Statement> get(StatementId stID) {
137138
.queryId(model.getQueryId())
138139
.statementStorageService(statementStorageService)
139140
.statementModel(model)
141+
.asyncQueryRequestContext(asyncQueryRequestContext)
140142
.build());
141143
}
142144

async-query-core/src/main/java/org/opensearch/sql/spark/execution/session/Session.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ void open(
3535
* @param stID {@link StatementId}
3636
* @return {@link Statement}
3737
*/
38-
Optional<Statement> get(StatementId stID);
38+
Optional<Statement> get(StatementId stID, AsyncQueryRequestContext asyncQueryRequestContext);
3939

4040
SessionModel getSessionModel();
4141

async-query-core/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public void cancel() {
7070
throw new IllegalStateException(errorMsg);
7171
}
7272
this.statementModel =
73-
statementStorageService.updateStatementState(statementModel, StatementState.CANCELLED);
73+
statementStorageService.updateStatementState(
74+
statementModel, StatementState.CANCELLED, asyncQueryRequestContext);
7475
}
7576

7677
public StatementState getStatementState() {

0 commit comments

Comments
 (0)