2020import com .fasterxml .jackson .databind .ObjectMapper ;
2121import com .google .common .annotations .VisibleForTesting ;
2222import com .google .common .base .Strings ;
23- import com .google .common .io .ByteStreams ;
2423import java .io .IOException ;
25- import java .io .InputStream ;
2624import java .net .MalformedURLException ;
2725import java .net .URL ;
26+ import java .net .URLConnection ;
2827import java .util .Date ;
2928import java .util .Map ;
3029import java .util .UUID ;
3534import okhttp3 .Request ;
3635import okhttp3 .RequestBody ;
3736import okhttp3 .Response ;
37+ import okio .BufferedSink ;
38+ import okio .Okio ;
39+ import okio .Source ;
3840import org .datatransferproject .api .launcher .Monitor ;
3941import org .datatransferproject .datatransfer .synology .models .C2Api ;
4042import org .datatransferproject .datatransfer .synology .models .ServiceConfig ;
4143import org .datatransferproject .datatransfer .synology .utils .ServiceConfigParser ;
4244import org .datatransferproject .spi .cloud .storage .JobStore ;
45+ import org .datatransferproject .spi .cloud .storage .TemporaryPerJobDataStore .InputStreamWrapper ;
4346import org .datatransferproject .spi .transfer .types .CopyExceptionWithFailureReason ;
4447import org .datatransferproject .spi .transfer .types .DestinationMemoryFullException ;
4548import org .datatransferproject .spi .transfer .types .NoNasInAccountException ;
@@ -61,6 +64,11 @@ public class SynologyDTPService {
6164 C2Api c2Api ;
6265 ServiceConfig .Retry retryConfig ;
6366
67+ @ FunctionalInterface
68+ protected interface RequestBodyGenerator {
69+ RequestBody get () throws CopyExceptionWithFailureReason ;
70+ }
71+
6472 /**
6573 * Constructs a new {@code SynologyDTPService} instance.
6674 *
@@ -89,17 +97,6 @@ public SynologyDTPService(
8997 this .client = client ;
9098 }
9199
92- /**
93- * getInputStream
94- *
95- * @param url
96- * @return an InputStream Instance
97- * @throws IOException
98- */
99- protected InputStream getInputStream (String url ) throws IOException {
100- return new URL (url ).openStream ();
101- }
102-
103100 /**
104101 * Creates album.
105102 *
@@ -115,62 +112,124 @@ public Map<String, Object> createAlbum(MediaAlbum album, UUID jobId)
115112 builder .add ("service" , exportingService );
116113 monitor .info (() -> "[SynologyImporter] Creating album" , album .getName (), jobId );
117114
115+ RequestBody requestBody = builder .build ();
118116 return (Map <String , Object >)
119- sendPostRequest (c2Api .getCreateAlbum (), builder . build () , jobId ).get ("data" );
117+ sendPostRequest (c2Api .getCreateAlbum (), () -> requestBody , jobId ).get ("data" );
120118 }
121119
122120 /**
123- * Creates photo.
121+ * get InputStreamWrapper for media file, it can be from temp store or from fetchable url
124122 *
125- * @param photo the photo
126123 * @param jobId the job ID
127- * @return a map of shape {"data": {"item_id": <item_id>}}
124+ * @param fetchableUrl the url to fetch media file, can be null if the file is in temp store
125+ * @return an InputStreamWrapper instance
126+ * @throws CopyExceptionWithFailureReason
128127 */
129- public Map <String , Object > createPhoto (PhotoModel photo , UUID jobId )
128+ @ VisibleForTesting
129+ protected InputStreamWrapper getMediaInputStreamWrapper (
130+ UUID jobId , String fetchableUrl , boolean isInTempStore )
130131 throws CopyExceptionWithFailureReason {
131- byte [] imageBytes ;
132132 try {
133- InputStream inputStream = null ;
134- if (photo .isInTempStore ()) {
135- inputStream = jobStore .getStream (jobId , photo .getFetchableUrl ()).getStream ();
136- } else if (photo .getFetchableUrl () != null ) {
137- inputStream = getInputStream (photo .getFetchableUrl ());
138- } else {
139- monitor .severe (() -> "[SynologyImporter] Can't get inputStream for a photo" );
140- return null ;
133+ if (isInTempStore ) {
134+ return jobStore .getStream (jobId , fetchableUrl );
135+ } else if (fetchableUrl != null ) {
136+ URL url = new URL (fetchableUrl );
137+ URLConnection connection = url .openConnection ();
138+ return new InputStreamWrapper (
139+ connection .getInputStream (), connection .getContentLengthLong ());
141140 }
142- imageBytes = ByteStreams .toByteArray (inputStream );
143141 } catch (MalformedURLException e ) {
144- throw new UploadErrorException ("Failed to create url for photo" , e );
142+ throw new UploadErrorException (
143+ String .format ("Failed to create url for fetchableUrl [%s]" , fetchableUrl ), e );
145144 } catch (IOException e ) {
146- throw new UploadErrorException ("Failed to create input stream for photo" , e );
145+ throw new UploadErrorException (
146+ String .format ("Failed to create input stream for fetchableUrl [%s]" , fetchableUrl ), e );
147147 }
148+ throw new UploadErrorException (
149+ "No valid input stream source for media" ,
150+ new IOException ("fetchableUrl is null and isInTempStore is false" ));
151+ }
148152
149- RequestBody fileBody = RequestBody .create (MediaType .parse (photo .getMimeType ()), imageBytes );
150-
151- MultipartBody .Builder builder =
152- new MultipartBody .Builder ()
153- .setType (MultipartBody .FORM )
154- .addFormDataPart ("file" , photo .getTitle (), fileBody )
155- .addFormDataPart ("item_id" , photo .getDataId ())
156- .addFormDataPart ("title" , photo .getTitle ())
157- .addFormDataPart ("job_id" , jobId .toString ())
158- .addFormDataPart ("service" , exportingService );
153+ /**
154+ * Creates photo.
155+ *
156+ * @param photo the photo
157+ * @param jobId the job ID
158+ * @return a map of shape {"data": {"item_id": <item_id>}}
159+ */
160+ public Map <String , Object > createPhoto (PhotoModel photo , UUID jobId )
161+ throws CopyExceptionWithFailureReason {
162+ monitor .info (
163+ () ->
164+ String .format (
165+ "[SynologyImporter] starts creating photo, dataId: [%s], name: [%s]" ,
166+ photo .getDataId (), photo .getTitle ()),
167+ jobId );
168+
169+ RequestBodyGenerator bodyGenerator =
170+ () -> {
171+ // Due to InputStream may not repeatable, we need to open it inside the generator function
172+ // to make sure it can be read when retrying.
173+ InputStreamWrapper inputStreamWrapper =
174+ getMediaInputStreamWrapper (jobId , photo .getFetchableUrl (), photo .isInTempStore ());
175+
176+ RequestBody fileBody =
177+ new RequestBody () {
178+ private boolean isConsumed = false ;
179+
180+ @ Override
181+ public MediaType contentType () {
182+ return MediaType .parse (photo .getMimeType ());
183+ }
184+
185+ @ Override
186+ public long contentLength () {
187+ return inputStreamWrapper .getBytes ();
188+ }
189+
190+ @ Override
191+ public void writeTo (BufferedSink sink ) throws IOException {
192+ if (isConsumed ) {
193+ throw new IOException ("InputStream has already been consumed" );
194+ }
195+ isConsumed = true ;
196+ try (Source source = Okio .source (inputStreamWrapper .getStream ())) {
197+ sink .writeAll (source );
198+ }
199+ }
200+ };
201+
202+ MultipartBody .Builder builder =
203+ new MultipartBody .Builder ()
204+ .setType (MultipartBody .FORM )
205+ .addFormDataPart ("file" , photo .getTitle (), fileBody )
206+ .addFormDataPart ("item_id" , photo .getDataId ())
207+ .addFormDataPart ("title" , photo .getTitle ())
208+ .addFormDataPart ("job_id" , jobId .toString ())
209+ .addFormDataPart ("service" , exportingService );
210+
211+ String imageDescription = photo .getDescription ();
212+ if (!Strings .isNullOrEmpty (imageDescription )) {
213+ builder .addFormDataPart ("description" , imageDescription );
214+ }
215+ Date imageUploadedTime = photo .getUploadedTime ();
216+ if (imageUploadedTime != null ) {
217+ long timestampInSeconds = imageUploadedTime .getTime () / 1000 ;
218+ builder .addFormDataPart ("uploaded_time" , String .valueOf (timestampInSeconds ));
219+ }
159220
160- String imageDescription = photo .getDescription ();
161- if (!Strings .isNullOrEmpty (imageDescription )) {
162- builder .addFormDataPart ("description" , imageDescription );
163- }
164- Date imageUploadedTime = photo .getUploadedTime ();
165- if (imageUploadedTime != null ) {
166- long timestampInSeconds = imageUploadedTime .getTime () / 1000 ;
167- builder .addFormDataPart ("uploaded_time" , String .valueOf (timestampInSeconds ));
168- }
221+ return builder .build ();
222+ };
169223
170224 @ SuppressWarnings ("unchecked" )
171225 Map <String , Object > responseData =
172226 (Map <String , Object >)
173- sendPostRequest (c2Api .getUploadItem (), builder .build (), jobId ).get ("data" );
227+ sendPostRequest (c2Api .getUploadItem (), bodyGenerator , jobId ).get ("data" );
228+ monitor .info (
229+ () ->
230+ String .format (
231+ "[SynologyImporter] photo created successfully, name: [%s]" , photo .getTitle ()),
232+ jobId );
174233 return responseData ;
175234 }
176235
@@ -183,49 +242,72 @@ public Map<String, Object> createPhoto(PhotoModel photo, UUID jobId)
183242 */
184243 public Map <String , Object > createVideo (VideoModel video , UUID jobId )
185244 throws CopyExceptionWithFailureReason {
186- byte [] videoBytes ;
187- try {
188- InputStream inputStream = null ;
189- if (video .isInTempStore ()) {
190- inputStream = jobStore .getStream (jobId , video .getFetchableUrl ()).getStream ();
191- } else if (video .getFetchableUrl () != null ) {
192- inputStream = getInputStream (video .getFetchableUrl ());
193- } else {
194- monitor .severe (() -> "[SynologyImporter] Can't get inputStream for a video" );
195- return null ;
196- }
197-
198- videoBytes = ByteStreams .toByteArray (inputStream );
199- } catch (MalformedURLException e ) {
200- throw new UploadErrorException ("Failed to create url for video" , e );
201- } catch (IOException e ) {
202- throw new UploadErrorException ("Failed to create input stream for video" , e );
203- }
245+ monitor .info (
246+ () ->
247+ String .format (
248+ "[SynologyImporter] starts creating video, dataId: [%s], name: [%s]" ,
249+ video .getDataId (), video .getName ()),
250+ jobId );
251+
252+ RequestBodyGenerator bodyGenerator =
253+ () -> {
254+ // Due to InputStream may not repeatable, we need to open it inside the generator function
255+ // to make sure it can be read when retrying.
256+ InputStreamWrapper inputStreamWrapper =
257+ getMediaInputStreamWrapper (jobId , video .getFetchableUrl (), video .isInTempStore ());
258+
259+ RequestBody fileBody =
260+ new RequestBody () {
261+ private boolean isConsumed = false ;
262+
263+ @ Override
264+ public MediaType contentType () {
265+ return MediaType .parse (video .getMimeType ());
266+ }
267+
268+ @ Override
269+ public long contentLength () {
270+ return inputStreamWrapper .getBytes ();
271+ }
272+
273+ @ Override
274+ public void writeTo (BufferedSink sink ) throws IOException {
275+ if (isConsumed ) {
276+ throw new IOException ("InputStream has already been consumed" );
277+ }
278+ isConsumed = true ;
279+ try (Source source = Okio .source (inputStreamWrapper .getStream ())) {
280+ sink .writeAll (source );
281+ }
282+ }
283+ };
284+
285+ MultipartBody .Builder builder =
286+ new MultipartBody .Builder ()
287+ .setType (MultipartBody .FORM )
288+ .addFormDataPart ("file" , video .getName (), fileBody )
289+ .addFormDataPart ("item_id" , video .getDataId ())
290+ .addFormDataPart ("title" , video .getName ())
291+ .addFormDataPart ("job_id" , jobId .toString ())
292+ .addFormDataPart ("service" , exportingService );
293+
294+ String imageDescription = video .getDescription ();
295+ if (!Strings .isNullOrEmpty (imageDescription )) {
296+ builder .addFormDataPart ("description" , imageDescription );
297+ }
298+ Date videoUploadedTime = video .getUploadedTime ();
299+ if (videoUploadedTime != null ) {
300+ long timestampInSeconds = videoUploadedTime .getTime () / 1000 ;
301+ builder .addFormDataPart ("uploaded_time" , String .valueOf (timestampInSeconds ));
302+ }
204303
205- RequestBody fileBody = RequestBody .create (MediaType .parse (video .getMimeType ()), videoBytes );
206- MultipartBody .Builder builder =
207- new MultipartBody .Builder ()
208- .setType (MultipartBody .FORM )
209- .addFormDataPart ("file" , video .getName (), fileBody )
210- .addFormDataPart ("item_id" , video .getDataId ())
211- .addFormDataPart ("title" , video .getName ())
212- .addFormDataPart ("job_id" , jobId .toString ())
213- .addFormDataPart ("service" , exportingService );
214-
215- String imageDescription = video .getDescription ();
216- if (!Strings .isNullOrEmpty (imageDescription )) {
217- builder .addFormDataPart ("description" , imageDescription );
218- }
219- Date videoUploadedTime = video .getUploadedTime ();
220- if (videoUploadedTime != null ) {
221- long timestampInSeconds = videoUploadedTime .getTime () / 1000 ;
222- builder .addFormDataPart ("uploaded_time" , String .valueOf (timestampInSeconds ));
223- }
304+ return builder .build ();
305+ };
224306
225307 @ SuppressWarnings ("unchecked" )
226308 Map <String , Object > responseData =
227309 (Map <String , Object >)
228- sendPostRequest (c2Api .getUploadItem (), builder . build () , jobId ).get ("data" );
310+ sendPostRequest (c2Api .getUploadItem (), bodyGenerator , jobId , 300 ).get ("data" );
229311 return responseData ;
230312 }
231313
@@ -244,7 +326,8 @@ public Map<String, Object> addItemToAlbum(String albumId, String itemId, UUID jo
244326 .add ("service" , exportingService )
245327 .add ("album_id" , albumId )
246328 .add ("item_id" , itemId );
247- return sendPostRequest (c2Api .getAddItemToAlbum (), builder .build (), jobId );
329+ RequestBody requestBody = builder .build ();
330+ return sendPostRequest (c2Api .getAddItemToAlbum (), () -> requestBody , jobId );
248331 }
249332
250333 /**
@@ -265,7 +348,8 @@ public Map<String, Object> sendJobSignal(JobLifeCycle jobStatus, UUID jobId)
265348 builder .add ("end_reason" , jobStatus .endReason ().name ());
266349 }
267350
268- return sendPostRequest (c2Api .getSignalJob (), builder .build (), jobId );
351+ RequestBody requestBody = builder .build ();
352+ return sendPostRequest (c2Api .getSignalJob (), () -> requestBody , jobId );
269353 }
270354
271355 @ VisibleForTesting
@@ -304,17 +388,43 @@ protected void throwExceptionIfNoQuota(Response response) throws CopyExceptionWi
304388 }
305389
306390 @ VisibleForTesting
307- protected Map <String , Object > sendPostRequest (String url , RequestBody body , UUID jobId )
391+ protected Map <String , Object > sendPostRequest (
392+ String url , RequestBodyGenerator bodyGenerator , UUID jobId )
393+ throws CopyExceptionWithFailureReason {
394+ return sendPostRequest (url , bodyGenerator , jobId , -1 );
395+ }
396+
397+ /*
398+ * @param url the URL to send the POST request to
399+ * @param bodyGenerator a generator function that produces the request body, it will be called for each retry attempt
400+ * @param jobId the job ID
401+ * @param timeoutInSeconds the timeout for the request in seconds, -1 means do not modify the default timeout of OkHttpClient
402+ */
403+ @ VisibleForTesting
404+ protected Map <String , Object > sendPostRequest (
405+ String url , RequestBodyGenerator bodyGenerator , UUID jobId , int timeoutInSeconds )
308406 throws CopyExceptionWithFailureReason {
309407 boolean triedRefreshToken = false ;
310- Request .Builder requestBuilder = new Request .Builder ().url (url ).post (body );
311408
312409 Exception lastException = null ;
313410 for (int retry = retryConfig .getMaxAttempts (); retry > 0 ; retry --) {
314411 Response response = null ;
315412 try {
413+ RequestBody body = bodyGenerator .get ();
414+ Request .Builder requestBuilder = new Request .Builder ().url (url ).post (body );
316415 requestBuilder .header ("Authorization" , "Bearer " + tokenManager .getAccessToken (jobId ));
317- response = client .newCall (requestBuilder .build ()).execute ();
416+ if (timeoutInSeconds < 0 ) {
417+ // Use default timeout of OkHttpClient
418+ response = client .newCall (requestBuilder .build ()).execute ();
419+ } else {
420+ response =
421+ client
422+ .newBuilder ()
423+ .readTimeout (timeoutInSeconds , java .util .concurrent .TimeUnit .SECONDS )
424+ .build ()
425+ .newCall (requestBuilder .build ())
426+ .execute ();
427+ }
318428 if (!response .isSuccessful ()) {
319429 int code = response .code ();
320430 if (code == 401 && !triedRefreshToken ) {
0 commit comments