Skip to content

Commit cab4be8

Browse files
Add EMR client to spark connector (#1790) (#1860)
* Create Spark Connector * Add spark client and engine * Remove vars * Spark connector draft * nit * Fix checkstyle errors * nit * Fix license header * Add spark storage test * Update comments * Fix checkstyle in comments * Update tests * Add emr client * Set default values for flint args * Validate emr auth type * Add default constants for flint * Update unit tests * Address PR comments * tests draft * Refactor class name * Rename classes and update tests * Update scan operator test * Address PR comment * Fix Connection pool shut down issue * Update emr unit tests * Update doc and tests * nit * Update EMR clinet impl tests * Address PR comments * Make spark & flint jars configurable * Address comments * Add spark application id in logs * nit * Delete result when not required --------- Signed-off-by: Vamsi Manohar <reddyvam@amazon.com> Signed-off-by: Rupal Mahajan <maharup@amazon.com> Co-authored-by: Vamsi Manohar <reddyvam@amazon.com>
1 parent 02da891 commit cab4be8

30 files changed

Lines changed: 1559 additions & 44 deletions

DEVELOPER_GUIDE.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ The plugin codebase is in standard layout of Gradle project::
147147
├── plugin
148148
├── protocol
149149
├── ppl
150+
├── spark
150151
├── sql
151152
├── sql-cli
152153
├── sql-jdbc
@@ -161,6 +162,7 @@ Here are sub-folders (Gradle modules) for plugin source code:
161162
- ``core``: core query engine.
162163
- ``opensearch``: OpenSearch storage engine.
163164
- ``prometheus``: Prometheus storage engine.
165+
- ``spark`` : Spark storage engine
164166
- ``protocol``: request/response protocol formatter.
165167
- ``common``: common util code.
166168
- ``integ-test``: integration and comparison test.
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
.. highlight:: sh
2+
3+
====================
4+
Spark Connector
5+
====================
6+
7+
.. rubric:: Table of contents
8+
9+
.. contents::
10+
:local:
11+
:depth: 1
12+
13+
14+
Introduction
15+
============
16+
17+
This page covers spark connector properties for dataSource configuration
18+
and the nuances associated with spark connector.
19+
20+
21+
Spark Connector Properties in DataSource Configuration
22+
========================================================
23+
Spark Connector Properties.
24+
25+
* ``spark.connector`` [Required].
26+
* This parameters provides the spark client information for connection.
27+
* ``spark.sql.application`` [Optional].
28+
* This parameters provides the spark sql application jar. Default value is ``s3://spark-datasource/sql-job.jar``.
29+
* ``emr.cluster`` [Required].
30+
* This parameters provides the emr cluster id information.
31+
* ``emr.auth.type`` [Required]
32+
* This parameters provides the authentication type information.
33+
* Spark emr connector currently supports ``awssigv4`` authentication mechanism and following parameters are required.
34+
* ``emr.auth.region``, ``emr.auth.access_key`` and ``emr.auth.secret_key``
35+
* ``spark.datasource.flint.*`` [Optional]
36+
* This parameters provides the Opensearch domain host information for flint integration.
37+
* ``spark.datasource.flint.integration`` [Optional]
38+
* Default value for integration jar is ``s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar``.
39+
* ``spark.datasource.flint.host`` [Optional]
40+
* Default value for host is ``localhost``.
41+
* ``spark.datasource.flint.port`` [Optional]
42+
* Default value for port is ``9200``.
43+
* ``spark.datasource.flint.scheme`` [Optional]
44+
* Default value for scheme is ``http``.
45+
* ``spark.datasource.flint.auth`` [Optional]
46+
* Default value for auth is ``false``.
47+
* ``spark.datasource.flint.region`` [Optional]
48+
* Default value for auth is ``us-west-2``.
49+
50+
Example spark dataSource configuration
51+
========================================
52+
53+
AWSSigV4 Auth::
54+
55+
[{
56+
"name" : "my_spark",
57+
"connector": "spark",
58+
"properties" : {
59+
"spark.connector": "emr",
60+
"emr.cluster" : "{{clusterId}}",
61+
"emr.auth.type" : "awssigv4",
62+
"emr.auth.region" : "us-east-1",
63+
"emr.auth.access_key" : "{{accessKey}}"
64+
"emr.auth.secret_key" : "{{secretKey}}"
65+
"spark.datasource.flint.host" : "{{opensearchHost}}",
66+
"spark.datasource.flint.port" : "{{opensearchPort}}",
67+
"spark.datasource.flint.scheme" : "{{opensearchScheme}}",
68+
"spark.datasource.flint.auth" : "{{opensearchAuth}}",
69+
"spark.datasource.flint.region" : "{{opensearchRegion}}",
70+
}
71+
}]
72+
73+
74+
Spark SQL Support
75+
==================
76+
77+
`sql` Function
78+
----------------------------
79+
Spark connector offers `sql` function. This function can be used to run spark sql query.
80+
The function takes spark sql query as input. Argument should be either passed by name or positionArguments should be either passed by name or position.
81+
`source=my_spark.sql('select 1')`
82+
or
83+
`source=my_spark.sql(query='select 1')`
84+
Example::
85+
86+
> source=my_spark.sql('select 1')
87+
+---+
88+
| 1 |
89+
|---+
90+
| 1 |
91+
+---+
92+

spark/build.gradle

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ dependencies {
1919

2020
implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
2121
implementation group: 'org.json', name: 'json', version: '20230227'
22+
implementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.1'
2223

2324
testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
24-
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
25-
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4'
25+
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0'
26+
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '5.2.0'
27+
testImplementation 'junit:junit:4.13.1'
2628
}
2729

2830
test {
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.spark.client;
7+
8+
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_INDEX_NAME;
9+
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR;
10+
11+
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
12+
import com.amazonaws.services.elasticmapreduce.model.ActionOnFailure;
13+
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
14+
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
15+
import com.amazonaws.services.elasticmapreduce.model.DescribeStepRequest;
16+
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
17+
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
18+
import com.amazonaws.services.elasticmapreduce.model.StepStatus;
19+
import com.google.common.annotations.VisibleForTesting;
20+
import java.io.IOException;
21+
import lombok.SneakyThrows;
22+
import org.apache.logging.log4j.LogManager;
23+
import org.apache.logging.log4j.Logger;
24+
import org.json.JSONObject;
25+
import org.opensearch.sql.spark.helper.FlintHelper;
26+
import org.opensearch.sql.spark.response.SparkResponse;
27+
28+
public class EmrClientImpl implements SparkClient {
29+
private final AmazonElasticMapReduce emr;
30+
private final String emrCluster;
31+
private final FlintHelper flint;
32+
private final String sparkApplicationJar;
33+
private static final Logger logger = LogManager.getLogger(EmrClientImpl.class);
34+
private SparkResponse sparkResponse;
35+
36+
/**
37+
* Constructor for EMR Client Implementation.
38+
*
39+
* @param emr EMR helper
40+
* @param flint Opensearch args for flint integration jar
41+
* @param sparkResponse Response object to help with retrieving results from Opensearch index
42+
*/
43+
public EmrClientImpl(AmazonElasticMapReduce emr, String emrCluster, FlintHelper flint,
44+
SparkResponse sparkResponse, String sparkApplicationJar) {
45+
this.emr = emr;
46+
this.emrCluster = emrCluster;
47+
this.flint = flint;
48+
this.sparkResponse = sparkResponse;
49+
this.sparkApplicationJar =
50+
sparkApplicationJar == null ? SPARK_SQL_APPLICATION_JAR : sparkApplicationJar;
51+
}
52+
53+
@Override
54+
public JSONObject sql(String query) throws IOException {
55+
runEmrApplication(query);
56+
return sparkResponse.getResultFromOpensearchIndex();
57+
}
58+
59+
@VisibleForTesting
60+
void runEmrApplication(String query) {
61+
62+
HadoopJarStepConfig stepConfig = new HadoopJarStepConfig()
63+
.withJar("command-runner.jar")
64+
.withArgs("spark-submit",
65+
"--class","org.opensearch.sql.SQLJob",
66+
"--jars",
67+
flint.getFlintIntegrationJar(),
68+
sparkApplicationJar,
69+
query,
70+
SPARK_INDEX_NAME,
71+
flint.getFlintHost(),
72+
flint.getFlintPort(),
73+
flint.getFlintScheme(),
74+
flint.getFlintAuth(),
75+
flint.getFlintRegion()
76+
);
77+
78+
StepConfig emrstep = new StepConfig()
79+
.withName("Spark Application")
80+
.withActionOnFailure(ActionOnFailure.CONTINUE)
81+
.withHadoopJarStep(stepConfig);
82+
83+
AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
84+
.withJobFlowId(emrCluster)
85+
.withSteps(emrstep);
86+
87+
AddJobFlowStepsResult result = emr.addJobFlowSteps(request);
88+
logger.info("EMR step ID: " + result.getStepIds());
89+
90+
String stepId = result.getStepIds().get(0);
91+
DescribeStepRequest stepRequest = new DescribeStepRequest()
92+
.withClusterId(emrCluster)
93+
.withStepId(stepId);
94+
95+
waitForStepExecution(stepRequest);
96+
sparkResponse.setValue(stepId);
97+
}
98+
99+
@SneakyThrows
100+
private void waitForStepExecution(DescribeStepRequest stepRequest) {
101+
// Wait for the step to complete
102+
boolean completed = false;
103+
while (!completed) {
104+
// Get the step status
105+
StepStatus statusDetail = emr.describeStep(stepRequest).getStep().getStatus();
106+
// Check if the step has completed
107+
if (statusDetail.getState().equals("COMPLETED")) {
108+
completed = true;
109+
logger.info("EMR step completed successfully.");
110+
} else if (statusDetail.getState().equals("FAILED")
111+
|| statusDetail.getState().equals("CANCELLED")) {
112+
logger.error("EMR step failed or cancelled.");
113+
throw new RuntimeException("Spark SQL application failed.");
114+
} else {
115+
// Sleep for some time before checking the status again
116+
Thread.sleep(2500);
117+
}
118+
}
119+
}
120+
121+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.spark.data.constants;
7+
8+
public class SparkConstants {
9+
public static final String EMR = "emr";
10+
public static final String STEP_ID_FIELD = "stepId.keyword";
11+
public static final String SPARK_SQL_APPLICATION_JAR = "s3://spark-datasource/sql-job.jar";
12+
public static final String SPARK_INDEX_NAME = ".query_execution_result";
13+
public static final String FLINT_INTEGRATION_JAR =
14+
"s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar";
15+
public static final String FLINT_DEFAULT_HOST = "localhost";
16+
public static final String FLINT_DEFAULT_PORT = "9200";
17+
public static final String FLINT_DEFAULT_SCHEME = "http";
18+
public static final String FLINT_DEFAULT_AUTH = "-1";
19+
public static final String FLINT_DEFAULT_REGION = "us-west-2";
20+
}

0 commit comments

Comments
 (0)