Skip to content

Commit 71813bf

Browse files
Remove all AccessController refs (#4924)
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent e7fc5f5 commit 71813bf

24 files changed

Lines changed: 375 additions & 579 deletions

File tree

async-query-core/src/main/java/org/opensearch/sql/spark/client/EMRServerlessClientFactoryImpl.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
1111
import com.amazonaws.services.emrserverless.AWSEMRServerless;
1212
import com.amazonaws.services.emrserverless.AWSEMRServerlessClientBuilder;
13-
import java.security.AccessController;
14-
import java.security.PrivilegedAction;
1513
import lombok.RequiredArgsConstructor;
1614
import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext;
1715
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
@@ -56,15 +54,11 @@ private void validateSparkExecutionEngineConfig(
5654

5755
private EMRServerlessClient createEMRServerlessClient(String awsRegion) {
5856
// TODO: It does not handle accountId for now. (it creates client for same account)
59-
return AccessController.doPrivileged(
60-
(PrivilegedAction<EMRServerlessClient>)
61-
() -> {
62-
AWSEMRServerless awsemrServerless =
63-
AWSEMRServerlessClientBuilder.standard()
64-
.withRegion(awsRegion)
65-
.withCredentials(new DefaultAWSCredentialsProviderChain())
66-
.build();
67-
return new EmrServerlessClientImpl(awsemrServerless, metricsService);
68-
});
57+
AWSEMRServerless awsemrServerless =
58+
AWSEMRServerlessClientBuilder.standard()
59+
.withRegion(awsRegion)
60+
.withCredentials(new DefaultAWSCredentialsProviderChain())
61+
.build();
62+
return new EmrServerlessClientImpl(awsemrServerless, metricsService);
6963
}
7064
}

async-query-core/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java

Lines changed: 35 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import com.amazonaws.services.emrserverless.model.StartJobRunRequest;
2222
import com.amazonaws.services.emrserverless.model.StartJobRunResult;
2323
import com.amazonaws.services.emrserverless.model.ValidationException;
24-
import java.security.AccessController;
25-
import java.security.PrivilegedAction;
2624
import lombok.RequiredArgsConstructor;
2725
import org.apache.commons.lang3.StringUtils;
2826
import org.apache.logging.log4j.LogManager;
@@ -61,23 +59,18 @@ public String startJobRun(StartJobRequest startJobRequest) {
6159
.withEntryPointArguments(resultIndex)
6260
.withSparkSubmitParameters(startJobRequest.getSparkSubmitParams())));
6361

64-
StartJobRunResult startJobRunResult =
65-
AccessController.doPrivileged(
66-
(PrivilegedAction<StartJobRunResult>)
67-
() -> {
68-
try {
69-
return emrServerless.startJobRun(request);
70-
} catch (Throwable t) {
71-
logger.error("Error while making start job request to emr:", t);
72-
metricsService.incrementNumericalMetric(EMR_START_JOB_REQUEST_FAILURE_COUNT);
73-
if (t instanceof ValidationException) {
74-
throw new IllegalArgumentException(
75-
"The input fails to satisfy the constraints specified by AWS EMR"
76-
+ " Serverless.");
77-
}
78-
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
79-
}
80-
});
62+
StartJobRunResult startJobRunResult;
63+
try {
64+
startJobRunResult = emrServerless.startJobRun(request);
65+
} catch (Throwable t) {
66+
logger.error("Error while making start job request to emr:", t);
67+
metricsService.incrementNumericalMetric(EMR_START_JOB_REQUEST_FAILURE_COUNT);
68+
if (t instanceof ValidationException) {
69+
throw new IllegalArgumentException(
70+
"The input fails to satisfy the constraints specified by AWS EMR" + " Serverless.");
71+
}
72+
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
73+
}
8174
logger.info("Job Run ID: " + startJobRunResult.getJobRunId());
8275
return startJobRunResult.getJobRunId();
8376
}
@@ -86,18 +79,14 @@ public String startJobRun(StartJobRequest startJobRequest) {
8679
public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
8780
GetJobRunRequest request =
8881
new GetJobRunRequest().withApplicationId(applicationId).withJobRunId(jobId);
89-
GetJobRunResult getJobRunResult =
90-
AccessController.doPrivileged(
91-
(PrivilegedAction<GetJobRunResult>)
92-
() -> {
93-
try {
94-
return emrServerless.getJobRun(request);
95-
} catch (Throwable t) {
96-
logger.error("Error while making get job run request to emr:", t);
97-
metricsService.incrementNumericalMetric(EMR_GET_JOB_RESULT_FAILURE_COUNT);
98-
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
99-
}
100-
});
82+
GetJobRunResult getJobRunResult;
83+
try {
84+
getJobRunResult = emrServerless.getJobRun(request);
85+
} catch (Throwable t) {
86+
logger.error("Error while making get job run request to emr:", t);
87+
metricsService.incrementNumericalMetric(EMR_GET_JOB_RESULT_FAILURE_COUNT);
88+
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
89+
}
10190
logger.info("Job Run state: " + getJobRunResult.getJobRun().getState());
10291
return getJobRunResult;
10392
}
@@ -107,27 +96,22 @@ public CancelJobRunResult cancelJobRun(
10796
String applicationId, String jobId, boolean allowExceptionPropagation) {
10897
CancelJobRunRequest cancelJobRunRequest =
10998
new CancelJobRunRequest().withJobRunId(jobId).withApplicationId(applicationId);
110-
CancelJobRunResult cancelJobRunResult =
111-
AccessController.doPrivileged(
112-
(PrivilegedAction<CancelJobRunResult>)
113-
() -> {
114-
try {
115-
return emrServerless.cancelJobRun(cancelJobRunRequest);
116-
} catch (Throwable t) {
117-
if (allowExceptionPropagation) {
118-
throw t;
119-
}
99+
CancelJobRunResult cancelJobRunResult;
100+
try {
101+
cancelJobRunResult = emrServerless.cancelJobRun(cancelJobRunRequest);
102+
} catch (Throwable t) {
103+
if (allowExceptionPropagation) {
104+
throw t;
105+
}
120106

121-
logger.error("Error while making cancel job request to emr: jobId=" + jobId, t);
122-
metricsService.incrementNumericalMetric(EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT);
123-
if (t instanceof ValidationException) {
124-
throw new IllegalArgumentException(
125-
"The input fails to satisfy the constraints specified by AWS EMR"
126-
+ " Serverless.");
127-
}
128-
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
129-
}
130-
});
107+
logger.error("Error while making cancel job request to emr: jobId=" + jobId, t);
108+
metricsService.incrementNumericalMetric(EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT);
109+
if (t instanceof ValidationException) {
110+
throw new IllegalArgumentException(
111+
"The input fails to satisfy the constraints specified by AWS EMR" + " Serverless.");
112+
}
113+
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
114+
}
131115
logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId()));
132116
return cancelJobRunResult;
133117
}

async-query/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigClusterSettingLoader.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77

88
import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG;
99

10-
import java.security.AccessController;
11-
import java.security.PrivilegedAction;
1210
import java.util.Optional;
1311
import lombok.RequiredArgsConstructor;
1412
import org.apache.commons.lang3.StringUtils;
@@ -24,11 +22,8 @@ public Optional<SparkExecutionEngineConfigClusterSetting> load() {
2422
this.settings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG);
2523
if (!StringUtils.isBlank(sparkExecutionEngineConfigSettingString)) {
2624
return Optional.of(
27-
AccessController.doPrivileged(
28-
(PrivilegedAction<SparkExecutionEngineConfigClusterSetting>)
29-
() ->
30-
SparkExecutionEngineConfigClusterSetting.toSparkExecutionEngineConfig(
31-
sparkExecutionEngineConfigSettingString)));
25+
SparkExecutionEngineConfigClusterSetting.toSparkExecutionEngineConfig(
26+
sparkExecutionEngineConfigSettingString));
3227
} else {
3328
return Optional.empty();
3429
}

core/src/main/java/org/opensearch/sql/executor/QueryService.java

Lines changed: 20 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
package org.opensearch.sql.executor;
77

8-
import java.security.AccessController;
9-
import java.security.PrivilegedAction;
108
import java.util.List;
119
import java.util.Optional;
1210
import javax.annotation.Nullable;
@@ -100,19 +98,14 @@ public void executeWithCalcite(
10098
CalcitePlanContext.run(
10199
() -> {
102100
try {
103-
AccessController.doPrivileged(
104-
(PrivilegedAction<Void>)
105-
() -> {
106-
CalcitePlanContext context =
107-
CalcitePlanContext.create(
108-
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
109-
RelNode relNode = analyze(plan, context);
110-
relNode = mergeAdjacentFilters(relNode);
111-
RelNode optimized = optimize(relNode, context);
112-
RelNode calcitePlan = convertToCalcitePlan(optimized);
113-
executionEngine.execute(calcitePlan, context, listener);
114-
return null;
115-
});
101+
CalcitePlanContext context =
102+
CalcitePlanContext.create(
103+
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
104+
RelNode relNode = analyze(plan, context);
105+
relNode = mergeAdjacentFilters(relNode);
106+
RelNode optimized = optimize(relNode, context);
107+
RelNode calcitePlan = convertToCalcitePlan(optimized);
108+
executionEngine.execute(calcitePlan, context, listener);
116109
} catch (Throwable t) {
117110
if (isCalciteFallbackAllowed(t) && !(t instanceof NonFallbackCalciteException)) {
118111
log.warn("Fallback to V2 query engine since got exception", t);
@@ -144,23 +137,18 @@ public void explainWithCalcite(
144137
CalcitePlanContext.run(
145138
() -> {
146139
try {
147-
AccessController.doPrivileged(
148-
(PrivilegedAction<Void>)
149-
() -> {
150-
CalcitePlanContext context =
151-
CalcitePlanContext.create(
152-
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
153-
context.run(
154-
() -> {
155-
RelNode relNode = analyze(plan, context);
156-
relNode = mergeAdjacentFilters(relNode);
157-
RelNode optimized = optimize(relNode, context);
158-
RelNode calcitePlan = convertToCalcitePlan(optimized);
159-
executionEngine.explain(calcitePlan, format, context, listener);
160-
},
161-
settings);
162-
return null;
163-
});
140+
CalcitePlanContext context =
141+
CalcitePlanContext.create(
142+
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
143+
context.run(
144+
() -> {
145+
RelNode relNode = analyze(plan, context);
146+
relNode = mergeAdjacentFilters(relNode);
147+
RelNode optimized = optimize(relNode, context);
148+
RelNode calcitePlan = convertToCalcitePlan(optimized);
149+
executionEngine.explain(calcitePlan, format, context, listener);
150+
},
151+
settings);
164152
} catch (Throwable t) {
165153
if (isCalciteFallbackAllowed(t)) {
166154
log.warn("Fallback to V2 query engine since got exception", t);

0 commit comments

Comments
 (0)