6868@ Description ("Reads objects from a path in a Google Cloud Storage bucket." )
6969@ Metadata (properties = {@ MetadataProperty (key = Connector .PLUGIN_TYPE , value = GCSConnector .NAME )})
7070public class GCSSource extends AbstractFileSource <GCSSource .GCSSourceConfig > {
71+
7172 public static final String NAME = "GCSFile" ;
7273 private final GCSSourceConfig config ;
7374 private Asset asset ;
@@ -121,34 +122,35 @@ public void prepareRun(BatchSourceContext context) throws Exception {
121122 }
122123
123124 Storage storage = GCPUtils .getStorage (config .connection .getProject (), credentials );
124- Bucket bucket = storage .get (bucketName );
125- if (Objects .isNull (bucket )) {
126- String errorReason = String .format ("Unable to access GCS bucket '%s'." ,
127- bucketName );
128- collector .addFailure (String .format ("%s Ensure you entered the correct bucket path." , errorReason ),
129- null );
130- collector .getOrThrowException ();
131- }
132125 String location = null ;
133126 try {
134127 // Get location of the source for lineage
128+ Bucket bucket = storage .get (bucketName );
129+ if (Objects .isNull (bucket )) {
130+ String errorReason = String .format ("Unable to access GCS bucket '%s'." ,
131+ bucketName );
132+ collector .addFailure (
133+ String .format ("%s Ensure you entered the correct bucket path." , errorReason ),
134+ null );
135+ collector .getOrThrowException ();
136+ }
135137 location = bucket .getLocation ();
136138 } catch (StorageException e ) {
137139 String errorReason = String .format ("Error code: %s, Unable to access GCS bucket '%s'. " ,
138- e .getCode (), bucketName );
140+ e .getCode (), bucketName );
139141 collector .addFailure (String .format ("%s %s" , errorReason , e .getMessage ()),
140- "Ensure you entered the correct bucket path and have permissions for it." )
141- .withStacktrace (e .getStackTrace ());
142+ "Ensure you entered the correct bucket path and have permissions for it." )
143+ .withStacktrace (e .getStackTrace ());
142144 collector .getOrThrowException ();
143145 }
144146
145147 // create asset for lineage
146148 String fqn = GCSPath .getFQN (path );
147149 String referenceName = Strings .isNullOrEmpty (config .getReferenceName ())
148- ? ReferenceNames .normalizeFqn (fqn )
149- : config .getReferenceName ();
150+ ? ReferenceNames .normalizeFqn (fqn )
151+ : config .getReferenceName ();
150152 asset = Asset .builder (referenceName )
151- .setFqn (fqn ).setLocation (location ).build ();
153+ .setFqn (fqn ).setLocation (location ).build ();
152154
153155 // set error details provider
154156 context .setErrorDetailsProvider (
@@ -160,17 +162,20 @@ public void prepareRun(BatchSourceContext context) throws Exception {
160162
161163 @ Override
162164 protected Map <String , String > getFileSystemProperties (BatchSourceContext context ) {
163- Map <String , String > properties = GCPUtils .getFileSystemProperties (config .connection , config .getPath (),
164- new HashMap <>(config .getFileSystemProperties ()));
165+ Map <String , String > properties =
166+ GCPUtils .getFileSystemProperties (config .connection , config .getPath (),
167+ new HashMap <>(config .getFileSystemProperties ()));
165168 if (config .isCopyHeader ()) {
166169 properties .put (PathTrackingInputFormat .COPY_HEADER , Boolean .TRUE .toString ());
167170 }
168171 if (config .getFileEncoding () != null
169- && !config .getFileEncoding ().equalsIgnoreCase (AbstractFileSourceConfig .DEFAULT_FILE_ENCODING )) {
172+ && !config .getFileEncoding ()
173+ .equalsIgnoreCase (AbstractFileSourceConfig .DEFAULT_FILE_ENCODING )) {
170174 properties .put (PathTrackingInputFormat .SOURCE_FILE_ENCODING , config .getFileEncoding ());
171175 }
172176 if (config .getMinSplitSize () != null ) {
173- properties .put ("mapreduce.input.fileinputformat.split.minsize" , String .valueOf (config .getMinSplitSize ()));
177+ properties .put ("mapreduce.input.fileinputformat.split.minsize" ,
178+ String .valueOf (config .getMinSplitSize ()));
174179 }
175180 if (config .isEncrypted ()) {
176181 TinkDecryptor .configure (config .getEncryptedMetadataSuffix (), properties );
@@ -189,24 +194,27 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
189194 @ Override
190195 protected void recordLineage (LineageRecorder lineageRecorder , List <String > outputFields ) {
191196 lineageRecorder .recordRead ("Read" , String .format ("Read %s from Google Cloud Storage." ,
192- config .isEncrypted () ? " and decrypt " : " " ), outputFields );
197+ config .isEncrypted () ? " and decrypt " : " " ), outputFields );
193198 }
194199
195200 @ Override
196201 protected boolean shouldGetSchema () {
197202 return !config .containsMacro (GCPConnectorConfig .NAME_PROJECT ) &&
198- !config .containsMacro (GCSSourceConfig .NAME_PATH ) && !config .containsMacro (GCSSourceConfig .NAME_FORMAT ) &&
199- !config .containsMacro (GCSSourceConfig .NAME_DELIMITER ) &&
200- !config .containsMacro (GCSSourceConfig .NAME_FILE_SYSTEM_PROPERTIES ) &&
201- !config .containsMacro (GCPConnectorConfig .NAME_SERVICE_ACCOUNT_FILE_PATH ) &&
202- !config .containsMacro (GCPConnectorConfig .NAME_SERVICE_ACCOUNT_JSON );
203+ !config .containsMacro (GCSSourceConfig .NAME_PATH ) && !config .containsMacro (
204+ GCSSourceConfig .NAME_FORMAT ) &&
205+ !config .containsMacro (GCSSourceConfig .NAME_DELIMITER ) &&
206+ !config .containsMacro (GCSSourceConfig .NAME_FILE_SYSTEM_PROPERTIES ) &&
207+ !config .containsMacro (GCPConnectorConfig .NAME_SERVICE_ACCOUNT_FILE_PATH ) &&
208+ !config .containsMacro (GCPConnectorConfig .NAME_SERVICE_ACCOUNT_JSON );
203209 }
204210
205211 /**
206212 * Config for the plugin.
207213 */
208214 @ SuppressWarnings ("ConstantConditions" )
209- public static class GCSSourceConfig extends AbstractFileSourceConfig implements FileSourceProperties {
215+ public static class GCSSourceConfig extends AbstractFileSourceConfig
216+ implements FileSourceProperties {
217+
210218 public static final String NAME_PATH = "path" ;
211219 public static final String NAME_FORMAT = "format" ;
212220 private static final String NAME_FILE_SYSTEM_PROPERTIES = "fileSystemProperties" ;
@@ -216,7 +224,8 @@ public static class GCSSourceConfig extends AbstractFileSourceConfig implements
216224 private static final String DEFAULT_ENCRYPTED_METADATA_SUFFIX = ".metadata" ;
217225
218226 private static final Gson GSON = new Gson ();
219- private static final Type MAP_STRING_STRING_TYPE = new TypeToken <Map <String , String >>() { }.getType ();
227+ private static final Type MAP_STRING_STRING_TYPE = new TypeToken <Map <String , String >>() {
228+ }.getType ();
220229
221230 @ Macro
222231 @ Description ("The path to read from. For example, gs://<bucket>/path/to/directory/" )
@@ -234,8 +243,9 @@ public static class GCSSourceConfig extends AbstractFileSourceConfig implements
234243
235244 @ Macro
236245 @ Nullable
237- @ Description ("Whether the data file is encrypted. If it is set to 'true', a associated metadata file needs to be "
238- + "provided for each data file. Please refer to the Documentation for the details of the metadata file content." )
246+ @ Description (
247+ "Whether the data file is encrypted. If it is set to 'true', a associated metadata file needs to be "
248+ + "provided for each data file. Please refer to the Documentation for the details of the metadata file content." )
239249 private Boolean encrypted ;
240250
241251 @ Macro
@@ -246,8 +256,10 @@ public static class GCSSourceConfig extends AbstractFileSourceConfig implements
246256
247257 @ Macro
248258 @ Nullable
249- @ Description ("A list of columns with the corresponding data types for whom the automatic data type detection gets" +
250- " skipped." )
259+ @ Description (
260+ "A list of columns with the corresponding data types for whom the automatic data type detection gets"
261+ +
262+ " skipped." )
251263 private String override ;
252264
253265 @ Macro
0 commit comments