Skip to content

Commit 645c24c

Browse files
committed
All the review comments have been addressed, and the required changes have been committed. The build has been successfully tested locally.
1 parent 06505be commit 645c24c

6 files changed

Lines changed: 56 additions & 60 deletions

File tree

src/e2e-test/features/bigquery/source/BigQueryToBigQuery.feature

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -358,9 +358,9 @@ Feature: BigQuery source - Verification of BigQuery to BigQuery successful data
358358
@BQ_SOURCE_TEST @BQ_SINK_TEST
359359
Scenario:Validate that pipeline run gets failed when incorrect filter values and verify the log error message
360360
Given Open Datafusion Project to configure pipeline
361-
When Source is BigQuery
362-
When Sink is BigQuery
363-
Then Open BigQuery source properties
361+
When Expand Plugin group in the LHS plugins list: "Source"
362+
When Select plugin: "BigQuery" from the plugins list as: "Source"
363+
Then Navigate to the properties page of plugin: "BigQuery"
364364
Then Enter BigQuery property reference name
365365
Then Enter BigQuery property projectId "projectId"
366366
Then Enter BigQuery property datasetProjectId "projectId"
@@ -371,10 +371,12 @@ Feature: BigQuery source - Verification of BigQuery to BigQuery successful data
371371
Then Validate output schema with expectedSchema "bqSourceSchema"
372372
Then Validate "BigQuery" plugin properties
373373
Then Close the BigQuery properties
374-
Then Open BigQuery sink properties
374+
When Expand Plugin group in the LHS plugins list: "Sink"
375+
When Select plugin: "BigQuery" from the plugins list as: "Sink"
376+
Then Navigate to the properties page of plugin: "BigQuery2"
375377
Then Override Service account details if set in environment variables
376378
Then Enter the BigQuery sink mandatory properties
377-
Then Validate "BigQuery" plugin properties
379+
Then Validate "BigQuery2" plugin properties
378380
Then Close the BigQuery properties
379381
Then Connect source as "BigQuery" and sink as "BigQuery" to establish connection
380382
Then Save the pipeline

src/e2e-test/features/bigquery/source/BigQueryToGCS_WithMacro.feature

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ Feature: BigQuery source - Verification of BigQuery to GCS successful data trans
7373
@CMEK @BQ_SOURCE_TEST @GCS_SINK_TEST
7474
Scenario:Validate successful records transfer from BigQuery to GCS with macro arguments for partition start date and partition end date
7575
Given Open Datafusion Project to configure pipeline
76-
When Source is BigQuery
77-
When Sink is GCS
78-
Then Open BigQuery source properties
76+
When Expand Plugin group in the LHS plugins list: "Source"
77+
When Select plugin: "BigQuery" from the plugins list as: "Source"
78+
Then Navigate to the properties page of plugin: "BigQuery"
7979
Then Enter BigQuery property reference name
8080
Then Enter BigQuery property "projectId" as macro argument "bqProjectId"
8181
Then Enter BigQuery property "datasetProjectId" as macro argument "bqDatasetProjectId"
@@ -88,7 +88,9 @@ Feature: BigQuery source - Verification of BigQuery to GCS successful data trans
8888
Then Enter BigQuery property "table" as macro argument "bqSourceTable"
8989
Then Validate "BigQuery" plugin properties
9090
Then Close the BigQuery properties
91-
Then Open GCS sink properties
91+
When Expand Plugin group in the LHS plugins list: "Sink"
92+
When Select plugin: "GCS" from the plugins list as: "Sink"
93+
Then Navigate to the properties page of plugin: "GCS"
9294
Then Enter GCS property reference name
9395
Then Enter GCS property "projectId" as macro argument "gcsProjectId"
9496
Then Enter GCS property "serviceAccountType" as macro argument "serviceAccountType"
@@ -148,9 +150,9 @@ Feature: BigQuery source - Verification of BigQuery to GCS successful data trans
148150
@CMEK @BQ_SOURCE_TEST @GCS_SINK_TEST
149151
Scenario:Validate successful records transfer from BigQuery to GCS with macro arguments for filter and Output Schema
150152
Given Open Datafusion Project to configure pipeline
151-
When Source is BigQuery
152-
When Sink is GCS
153-
Then Open BigQuery source properties
153+
When Expand Plugin group in the LHS plugins list: "Source"
154+
When Select plugin: "BigQuery" from the plugins list as: "Source"
155+
Then Navigate to the properties page of plugin: "BigQuery"
154156
Then Enter BigQuery property reference name
155157
Then Enter BigQuery property "projectId" as macro argument "bqProjectId"
156158
Then Enter BigQuery property "datasetProjectId" as macro argument "bqDatasetProjectId"
@@ -163,7 +165,9 @@ Feature: BigQuery source - Verification of BigQuery to GCS successful data trans
163165
Then Select Macro action of output schema property: "Output Schema-macro-input" and set the value to "bqOutputSchema"
164166
Then Validate "BigQuery" plugin properties
165167
Then Close the BigQuery properties
166-
Then Open GCS sink properties
168+
When Expand Plugin group in the LHS plugins list: "Sink"
169+
When Select plugin: "GCS" from the plugins list as: "Sink"
170+
Then Navigate to the properties page of plugin: "GCS"
167171
Then Enter GCS property reference name
168172
Then Enter GCS property "projectId" as macro argument "gcsProjectId"
169173
Then Enter GCS property "serviceAccountType" as macro argument "serviceAccountType"

src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQueryBase.java

Lines changed: 28 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -90,18 +90,18 @@ public void getCountOfNoOfRecordsTransferredToTargetBigQueryTable() throws IOExc
9090
int countRecords = BigQueryClient.countBqQuery(TestSetupHooks.bqTargetTable);
9191
BeforeActions.scenario.write("**********No of Records Transferred******************:" + countRecords);
9292
Assert.assertEquals("Number of records transferred should be equal to records out ",
93-
countRecords, recordOut());
93+
countRecords, recordOut());
9494
}
9595

9696
@Then("Validate records transferred to target table is equal to number of records from source table " +
97-
"with filter {string}")
97+
"with filter {string}")
9898
public void validateRecordsTransferredToTargetTableIsEqualToNumberOfRecordsFromSourceTableWithFilter(String filter)
99-
throws IOException, InterruptedException {
99+
throws IOException, InterruptedException {
100100
String projectId = (PluginPropertyUtils.pluginProp("projectId"));
101101
String datasetName = (PluginPropertyUtils.pluginProp("dataset"));
102102
int countRecordsTarget = BigQueryClient.countBqQuery(TestSetupHooks.bqTargetTable);
103103
String selectQuery = "SELECT count(*) FROM `" + projectId + "." + datasetName + "." +
104-
TestSetupHooks.bqTargetTable + "` WHERE " + PluginPropertyUtils.pluginProp(filter);
104+
TestSetupHooks.bqTargetTable + "` WHERE " + PluginPropertyUtils.pluginProp(filter);
105105
Optional<String> result = BigQueryClient.getSoleQueryResult(selectQuery);
106106
int count = result.map(Integer::parseInt).orElse(0);
107107
BeforeActions.scenario.write("Number of records transferred with respect to filter:" + count);
@@ -110,13 +110,13 @@ public void validateRecordsTransferredToTargetTableIsEqualToNumberOfRecordsFromS
110110

111111
@Then("Validate partition date in output partitioned table")
112112
public void validatePartitionDateInOutputPartitionedTable()
113-
throws IOException, InterruptedException {
113+
throws IOException, InterruptedException {
114114
Optional<String> result = BigQueryClient
115-
.getSoleQueryResult("SELECT distinct _PARTITIONDATE as pt FROM `" +
116-
(PluginPropertyUtils.pluginProp("projectId")) + "." +
117-
(PluginPropertyUtils.pluginProp("dataset")) + "." +
118-
TestSetupHooks.bqTargetTable +
119-
"` WHERE _PARTITION_LOAD_TIME IS Not NULL ORDER BY _PARTITIONDATE DESC ");
115+
.getSoleQueryResult("SELECT distinct _PARTITIONDATE as pt FROM `" +
116+
(PluginPropertyUtils.pluginProp("projectId")) + "." +
117+
(PluginPropertyUtils.pluginProp("dataset")) + "." +
118+
TestSetupHooks.bqTargetTable +
119+
"` WHERE _PARTITION_LOAD_TIME IS Not NULL ORDER BY _PARTITIONDATE DESC ");
120120
String outputDate = StringUtils.EMPTY;
121121
if (result.isPresent()) {
122122
outputDate = result.get();
@@ -136,10 +136,10 @@ public void validateTheRecordsAreNotCreatedInOutputTable() throws IOException, I
136136
public void validatePartitioningIsNotDoneOnTheOutputTable() {
137137
try {
138138
BigQueryClient.getSoleQueryResult("SELECT distinct _PARTITIONDATE as pt FROM `" +
139-
(PluginPropertyUtils.pluginProp("projectId"))
140-
+ "." + (PluginPropertyUtils.pluginProp("dataset")) + "." +
141-
TestSetupHooks.bqTargetTable
142-
+ "` WHERE _PARTITION_LOAD_TIME IS Not NULL ");
139+
(PluginPropertyUtils.pluginProp("projectId"))
140+
+ "." + (PluginPropertyUtils.pluginProp("dataset")) + "." +
141+
TestSetupHooks.bqTargetTable
142+
+ "` WHERE _PARTITION_LOAD_TIME IS Not NULL ");
143143
} catch (Exception e) {
144144
String partitionException = e.toString();
145145
Assert.assertTrue(partitionException.contains("Unrecognized name: _PARTITION_LOAD_TIME"));
@@ -168,8 +168,8 @@ public void validateTheCmekKeyOfTargetBigQueryTableIfCmekIsEnabled(String cmek)
168168
String cmekBQ = PluginPropertyUtils.pluginProp(cmek);
169169
if (cmekBQ != null) {
170170
Assert.assertTrue("Cmek key of target BigQuery table should be equal to " +
171-
"cmek key provided in config file",
172-
BigQueryClient.verifyCmekKey(TestSetupHooks.bqTargetTable, cmekBQ));
171+
"cmek key provided in config file",
172+
BigQueryClient.verifyCmekKey(TestSetupHooks.bqTargetTable, cmekBQ));
173173
return;
174174
}
175175
BeforeActions.scenario.write("CMEK not enabled");
@@ -204,13 +204,13 @@ public void enterRuntimeArgumentValueForBigQueryCmekPropertyKeyIfBQCmekIsEnabled
204204

205205
@Then("Verify the partition table is created with partitioned on field {string}")
206206
public void verifyThePartitionTableIsCreatedWithPartitionedOnField(String partitioningField) throws IOException,
207-
InterruptedException {
207+
InterruptedException {
208208
Optional<String> result = BigQueryClient
209-
.getSoleQueryResult("SELECT IS_PARTITIONING_COLUMN FROM `" +
210-
(PluginPropertyUtils.pluginProp("projectId")) + "."
211-
+ (PluginPropertyUtils.pluginProp("dataset")) + ".INFORMATION_SCHEMA.COLUMNS` " +
212-
"WHERE table_name = '" + TestSetupHooks.bqTargetTable
213-
+ "' and column_name = '" + PluginPropertyUtils.pluginProp(partitioningField) + "' ");
209+
.getSoleQueryResult("SELECT IS_PARTITIONING_COLUMN FROM `" +
210+
(PluginPropertyUtils.pluginProp("projectId")) + "."
211+
+ (PluginPropertyUtils.pluginProp("dataset")) + ".INFORMATION_SCHEMA.COLUMNS` " +
212+
"WHERE table_name = '" + TestSetupHooks.bqTargetTable
213+
+ "' and column_name = '" + PluginPropertyUtils.pluginProp(partitioningField) + "' ");
214214
String isPartitioningDoneOnField = StringUtils.EMPTY;
215215
if (result.isPresent()) {
216216
isPartitioningDoneOnField = result.get();
@@ -230,7 +230,7 @@ public void verifyTheBigQueryValidationErrorMessageForInvalidProperty(String pro
230230
String expectedErrorMessage;
231231
if (property.equalsIgnoreCase("gcsChunkSize")) {
232232
expectedErrorMessage = PluginPropertyUtils
233-
.errorProp(E2ETestConstants.ERROR_MSG_BQ_INCORRECT_CHUNKSIZE);
233+
.errorProp(E2ETestConstants.ERROR_MSG_BQ_INCORRECT_CHUNKSIZE);
234234
} else if (property.equalsIgnoreCase("bucket")) {
235235
expectedErrorMessage = PluginPropertyUtils
236236
.errorProp(E2ETestConstants.ERROR_MSG_BQ_INCORRECT_TEMPORARY_BUCKET);
@@ -239,7 +239,7 @@ public void verifyTheBigQueryValidationErrorMessageForInvalidProperty(String pro
239239
.errorProp(E2ETestConstants.ERROR_MSG_INCORRECT_TABLE_NAME);
240240
} else {
241241
expectedErrorMessage = PluginPropertyUtils.errorProp(E2ETestConstants.ERROR_MSG_BQ_INCORRECT_PROPERTY).
242-
replaceAll("PROPERTY", property.substring(0, 1).toUpperCase() + property.substring(1));
242+
replaceAll("PROPERTY", property.substring(0, 1).toUpperCase() + property.substring(1));
243243
}
244244
String actualErrorMessage = PluginPropertyUtils.findPropertyErrorElement(property).getText();
245245
Assert.assertEquals(expectedErrorMessage, actualErrorMessage);
@@ -250,20 +250,15 @@ public void verifyTheBigQueryValidationErrorMessageForInvalidProperty(String pro
250250

251251
@Then("Validate records transferred to target table is equal to number of records from source table")
252252
public void validateRecordsTransferredToTargetTableIsEqualToNumberOfRecordsFromSourceTable()
253-
throws IOException, InterruptedException {
253+
throws IOException, InterruptedException {
254254
int countRecordsTarget = BigQueryClient.countBqQuery(TestSetupHooks.bqTargetTable);
255255
Optional<String> result = BigQueryClient.getSoleQueryResult("SELECT count(*) FROM `" +
256-
(PluginPropertyUtils.pluginProp("projectId"))
257-
+ "." + (PluginPropertyUtils.pluginProp
258-
("dataset")) + "." + TestSetupHooks.bqTargetTable + "` ");
256+
(PluginPropertyUtils.pluginProp("projectId"))
257+
+ "." + (PluginPropertyUtils.pluginProp
258+
("dataset")) + "." + TestSetupHooks.bqTargetTable + "` ");
259259
int count = result.map(Integer::parseInt).orElse(0);
260260
BeforeActions.scenario.write("Number of records transferred from source table to target table:" + count);
261261
Assert.assertEquals(count, countRecordsTarget);
262262
}
263263

264-
@Then("Enter BigQuery source properties filter")
265-
public void enterBigQuerysourcePropertiesfilter() throws IOException {
266-
CdfBigQueryPropertiesActions.enterFilter("%%%%");
267-
}
268-
269264
}

src/e2e-test/java/io/cdap/plugin/utils/E2ETestConstants.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,4 @@ public class E2ETestConstants {
1717
public static final String ERROR_MSG_BQ_INCORRECT_CHUNKSIZE = "errorMessageIncorrectBQChunkSize";
1818
public static final String ERROR_MSG_BQ_INCORRECT_TEMPORARY_BUCKET = "errorMessageIncorrectBQBucketName";
1919
public static final String ERROR_MSG_BQ_INCORRECT_PROPERTY = "errorMessageIncorrectBQProperty";
20-
public static final String ERROR_MSG_INCORRECT_PARTITIONSTARTDATE = "errorMessageIncorrectPartitionStartDate";
21-
public static final String ERROR_MSG_INCORRECT_PARTITIONENDDATE = "errorMessageIncorrectPartitionEndDate";
22-
public static final String ERROR_MSG_INCORRECT_REFERENCENAME = "errorMessageIncorrectReferenceName";
23-
public static final String ERROR_MSG_INCORRECT_FILTER = "errorMessageIncorrectRegexPathFilter";
2420
}

src/e2e-test/resources/errorMessage.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ errorMessageMultipleFileWithoutClearDefaultSchema=Found a row with 4 fields when
3333
errorMessageInvalidSourcePath=Invalid bucket name in path 'abc@'. Bucket name should
3434
errorMessageInvalidDestPath=Invalid bucket name in path 'abc@'. Bucket name should
3535
errorMessageInvalidEncryptionKey=CryptoKeyName.parse: formattedString not in valid format: Parameter "abc@" must be
36-
errorMessageIncorrectPartitionStartDate=02-01-2025 is not in a valid format. Enter valid date in format: yyyy-MM-dd
37-
errorMessageIncorrectPartitionEndDate=03-01-2025 is not in a valid format. Enter valid date in format: yyyy-MM-dd
36+
errorMessageIncorrectPartitionStartDate=10-01-2025 is not in a valid format. Enter valid date in format: yyyy-MM-dd
37+
errorMessageIncorrectPartitionEndDate=11-01-2025 is not in a valid format. Enter valid date in format: yyyy-MM-dd
3838
errorMessageIncorrectReferenceName=Invalid reference name 'invalidRef&^*&&*'. Supported characters are: letters, numbers, and '_', '-', '.', or '$'.
3939
errorLogsMessageInvalidFilter=Spark Program 'phase-1' failed.

src/e2e-test/resources/pluginParameters.properties

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
projectId=cdf-athena
2-
datasetprojectId=testbq_bqmt
3-
//cdf-athena
2+
datasetprojectId=cdf-athena
43
dataset=testbq_bqmt
54
wrongSourcePath=gs://00000000-e2e-0014a44f-81be-4501-8360-0ddca192492
65
serviceAccountType=filePath
@@ -371,17 +370,17 @@ bqTargetTable=dummy
371370
bqTargetTable2=dummy
372371
bqmtTargetTable=tabA
373372
bqmtTargetTable2=tabB
374-
bqStartDate=2025-01-02
375-
bqEndDate=2025-01-03
376-
partitionFrom=2025-01-02
377-
partitionTo=2025-01-03
373+
bqStartDate=2025-01-10
374+
bqEndDate=2025-01-11
375+
partitionFrom=2025-01-10
376+
partitionTo=2025-01-11
378377
filter=Id=20
379378
bqInvalidReferenceName=invalidRef&^*&&*
380379
OutputSchema={ "type": "record", "name": "text", "fields": [{ "name": "Id", "type": "long" }, { "name": "Value", "type": "long" }, \
381380
{ "name": "UID", "type": "string" } ] }
382381
incorrectFilter=%%%%
383-
bqIncorrectFormatStartDate=02-01-2025
384-
bqIncorrectFormatEndDate=03-01-2025
382+
bqIncorrectFormatStartDate=10-01-2025
383+
bqIncorrectFormatEndDate=11-01-2025
385384
## BQMT-PLUGIN-PROPERTIES-END
386385

387386
##CLOUDBIGTABLE-PLUGIN-PROPERTIES-START

0 commit comments

Comments
 (0)