@@ -156,6 +156,30 @@ def __init__(
156156 f"Invalid { upload_format = } . Must be one of { _FORMATS } "
157157 )
158158 self ._format = upload_format
159+ self ._content_type = (
160+ "application/json"
161+ if self ._format == "json"
162+ else "application/jsonl"
163+ )
164+ test_path = posixpath .join (
165+ self ._base_path ,
166+ f".one_off_test_to_see_if_upload_works.{ self ._format } " ,
167+ )
168+ try :
169+ with self ._fs .open (
170+ test_path , "w" , content_type = self ._content_type
171+ ) as file :
172+ file .write ("\n " )
173+ except Exception as exception : # pylint: disable=broad-exception-caught
174+ raise ValueError (
175+ f"Failed to write file to the following path, upload is not working: { test_path } .\n Got error: { exception } "
176+ )
177+ # Try to delete the file.. But we don't explicitly ask people to grant the GCS delete IAM permission in our
178+ # docs, so if delete fails just leave the file..
179+ try :
180+ self ._fs .rm_file (test_path ) # pyright: ignore[reportUnknownMemberType]
181+ except Exception : # pylint: disable=broad-exception-caught
182+ pass
159183
160184 # Use a ThreadPoolExecutor for its queueing and thread management. The semaphore
161185 # limits the number of queued tasks. If the queue is full, data will be dropped.
@@ -271,13 +295,7 @@ def _do_upload(
271295 for message_idx , line in enumerate (message_lines ):
272296 line [_MESSAGE_INDEX_KEY ] = message_idx
273297
274- content_type = (
275- "application/json"
276- if self ._format == "json"
277- else "application/jsonl"
278- )
279-
280- with self ._fs .open (path , "w" , content_type = content_type ) as file :
298+ with self ._fs .open (path , "w" , content_type = self ._content_type ) as file :
281299 for message in message_lines :
282300 gen_ai_json_dump (message , file )
283301 file .write ("\n " )
0 commit comments