Skip to content

Commit 9aae22e

Browse files
Add optional parameter dynamic_read_timeout for functions that upload files to azure storage account
1 parent 74d3a4d commit 9aae22e

2 files changed

Lines changed: 30 additions & 8 deletions

File tree

datareservoirio/client.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def ping(self):
104104
return response.json()
105105

106106
@log_decorator("exception")
107-
def create(self, series=None, wait_on_verification=True):
107+
def create(self, series=None, wait_on_verification=True, dynamic_read_timeout=False):
108108
"""
109109
Create a new series in DataReservoir.io from a pandas.Series. If no
110110
data is provided, an empty series is created.
@@ -124,6 +124,12 @@ def create(self, series=None, wait_on_verification=True):
124124
validation is successful. The latter is significantly faster, but
125125
is recommended when the data is "validated" in advance.
126126
Default is True.
127+
dynamic_read_timeout : bool (optional)
128+
While uploading file there is no timeout for read operations which can cause
129+
problems when there is no response from the server. If this flag is set to true,
130+
application will calculate and apply timeout for read operations and retry upload
131+
if necessary.
132+
Default is False.
127133
128134
Returns
129135
-------
@@ -157,7 +163,7 @@ def create(self, series=None, wait_on_verification=True):
157163
environment.api_base_url + "files/commit",
158164
{"json": {"FileId": file_id}, "timeout": _TIMEOUT_DEAULT},
159165
)
160-
self._storage.put(df, target_url, commit_request)
166+
self._storage.put(df, target_url, commit_request, dynamic_read_timeout)
161167

162168
if wait_on_verification:
163169
status = self._wait_until_file_ready(file_id)
@@ -173,7 +179,7 @@ def create(self, series=None, wait_on_verification=True):
173179
return response.json()
174180

175181
@log_decorator("exception")
176-
def append(self, series, series_id, wait_on_verification=True):
182+
def append(self, series, series_id, wait_on_verification=True, dynamic_read_timeout=False):
177183
"""
178184
Append data to an already existing series.
179185
@@ -193,6 +199,12 @@ def append(self, series, series_id, wait_on_verification=True):
193199
validation is successful. The latter is significantly faster, but
194200
is recommended when the data is "validated" in advance.
195201
Default is True.
202+
dynamic_read_timeout : bool (optional)
203+
While uploading file there is no timeout for read operations which can cause
204+
problems when there is no response from the server. If this flag is set to true,
205+
application will calculate and apply timeout for read operations and retry upload
206+
if necessary.
207+
Default is False.
196208
197209
Returns
198210
-------
@@ -217,7 +229,7 @@ def append(self, series, series_id, wait_on_verification=True):
217229
{"json": {"FileId": file_id}, "timeout": _TIMEOUT_DEAULT},
218230
)
219231

220-
self._storage.put(df, target_url, commit_request)
232+
self._storage.put(df, target_url, commit_request, dynamic_read_timeout)
221233

222234
if wait_on_verification:
223235
status = self._wait_until_file_ready(file_id)

datareservoirio/storage/storage.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def __init__(self, session, cache=True, cache_opt=None):
5959

6060
self._session = session
6161

62-
def put(self, df, target_url, commit_request):
62+
def put(self, df, target_url, commit_request, dynamic_read_timeout=False):
6363
"""
6464
Put a Pandas DataFrame into storage.
6565
@@ -72,9 +72,12 @@ def put(self, df, target_url, commit_request):
7272
commit_request : tuple
7373
Parameteres for "commit" request. Given as `(METHOD, URL, kwargs)`.
7474
The tuple is passed forward to `session.request(method=METHOD, url=URL, **kwargs)`
75+
dynamic_read_timeout : bool
76+
Flag that enables timeout calculation for read operation while
77+
uploading file
7578
7679
"""
77-
_df_to_blob(df, target_url)
80+
_df_to_blob(df, target_url, _BLOBSTORAGE_SESSION, dynamic_read_timeout)
7881

7982
method, url, kwargs = commit_request
8083
response = self._session.request(method=method, url=url, **kwargs)
@@ -322,7 +325,7 @@ def _blob_to_df(blob_url, session=_BLOBSTORAGE_SESSION):
322325
return df
323326

324327

325-
def _df_to_blob(df, blob_url, session=_BLOBSTORAGE_SESSION):
328+
def _df_to_blob(df, blob_url, session=_BLOBSTORAGE_SESSION, dynamic_read_timeout=False):
326329
"""
327330
Upload a Pandas Dataframe as blob to a remote storage.
328331
@@ -354,6 +357,13 @@ def _df_to_blob(df, blob_url, session=_BLOBSTORAGE_SESSION):
354357
url=blob_url,
355358
headers={"x-ms-blob-type": "BlockBlob"},
356359
data=fp,
357-
timeout=(30, None),
360+
timeout=(30, _calculate_timeout(fp.getbuffer().nbytes) if dynamic_read_timeout else None),
358361
).raise_for_status()
359362
return
363+
364+
365+
def _calculate_timeout(file_size_bytes):
366+
bytes_per_second = 1 * 1024 * 1024 # 1MB/s
367+
min_timeout = 30
368+
timeout = max(min_timeout, (file_size_bytes / bytes_per_second) * 1.5)
369+
return timeout

0 commit comments

Comments
 (0)