1717import aito .client .responses as aito_responses
1818from aito .client import AitoClient , RequestError
1919from aito .schema import AitoDatabaseSchema , AitoTableSchema , AitoColumnTypeSchema
20- from aito .utils ._file_utils import gzip_file , check_file_is_gzipped
20+ from aito .utils ._file_utils import gzip_file , check_file_is_gzipped , read_ndjson_gz_file
2121from aito .utils .data_frame_handler import DataFrameHandler
2222
2323LOG = logging .getLogger ('AitoAPI' )
@@ -470,15 +470,39 @@ def poll_file_processing_status(client: AitoClient, table_name: str, session_id:
470470 time .sleep (polling_time )
471471
472472
473+ def _stream_entries_from_gzip (binary_file : BinaryIO ) -> Iterable [Dict ]:
474+ """Stream entries from a gzipped ndjson file.
475+
476+ This is a generator that yields entries one by one from a gzipped ndjson file,
477+ which is memory-efficient for large files.
478+
479+ :param binary_file: binary file object of a gzipped ndjson file
480+ :type binary_file: BinaryIO
481+ :yield: entries from the file
482+ :rtype: Iterable[Dict]
483+ """
484+ import gzip
485+ import json
486+ with gzip .open (binary_file , 'rt' , encoding = 'utf-8' ) as f :
487+ for line in f :
488+ line = line .strip ()
489+ if line :
490+ yield json .loads (line )
491+
492+
473493def upload_binary_file (
474494 client : AitoClient ,
475495 table_name : str ,
476496 binary_file : BinaryIO ,
477497 polling_time : int = 10 ,
478- optimize_on_finished : bool = True
498+ optimize_on_finished : bool = True ,
499+ batch_size : int = 1000
479500):
480501 """`upload a binary file object to a table <https://aito.ai/docs/api/#post-api-v1-data-table-file>`__
481502
503+ For multitenant instances, this function uses streaming batch uploads instead of S3 file upload,
504+ as S3 file upload is not supported in multitenant environments.
505+
482506 .. note::
483507
484508 requires the client to be setup with the READ-WRITE API key
@@ -489,34 +513,54 @@ def upload_binary_file(
489513 :type table_name: str
490514 :param binary_file: binary file object
491515 :type binary_file: BinaryIO
492- :param polling_time: polling wait time
516+ :param polling_time: polling wait time (only used for non-multitenant S3 upload)
493517 :type polling_time: int
494518 :param optimize_on_finished: :func:`optimize_table` when finished uploading, defaults to True
495519 :type optimize_on_finished: bool
520+ :param batch_size: batch size for streaming upload (only used for multitenant), defaults to 1000
521+ :type batch_size: int
496522 """
497523 LOG .debug (f'uploading file object to table `{ table_name } `...' )
498- init_upload_resp = initiate_upload_file (client = client , table_name = table_name )
499- upload_binary_file_to_s3 (initiate_upload_file_response = init_upload_resp , binary_file = binary_file )
500- upload_session_id = init_upload_resp ['id' ]
501- trigger_file_processing (client = client , table_name = table_name , session_id = upload_session_id )
502- poll_file_processing_status (
503- client = client , table_name = table_name , session_id = upload_session_id , polling_time = polling_time
504- )
505524
506- LOG .info (f'uploaded file object to table `{ table_name } `' )
507- if optimize_on_finished :
508- optimize_table (client , table_name )
525+ if client .is_multitenant :
526+ # For multitenant instances, use streaming batch upload
527+ LOG .info (f'using streaming upload for multitenant instance' )
528+ entries = _stream_entries_from_gzip (binary_file )
529+ upload_entries (
530+ client = client ,
531+ table_name = table_name ,
532+ entries = entries ,
533+ batch_size = batch_size ,
534+ optimize_on_finished = optimize_on_finished
535+ )
536+ else :
537+ # For non-multitenant instances, use S3 file upload
538+ init_upload_resp = initiate_upload_file (client = client , table_name = table_name )
539+ upload_binary_file_to_s3 (initiate_upload_file_response = init_upload_resp , binary_file = binary_file )
540+ upload_session_id = init_upload_resp ['id' ]
541+ trigger_file_processing (client = client , table_name = table_name , session_id = upload_session_id )
542+ poll_file_processing_status (
543+ client = client , table_name = table_name , session_id = upload_session_id , polling_time = polling_time
544+ )
545+
546+ LOG .info (f'uploaded file object to table `{ table_name } `' )
547+ if optimize_on_finished :
548+ optimize_table (client , table_name )
509549
510550
511551def upload_file (
512552 client : AitoClient ,
513553 table_name : str ,
514554 file_path : PathLike ,
515555 polling_time : int = 10 ,
516- optimize_on_finished : bool = True
556+ optimize_on_finished : bool = True ,
557+ batch_size : int = 1000
517558):
518559 """`upload a file <https://aito.ai/docs/api/#post-api-v1-data-table-file>`__ to the specfied table
519560
561+ For multitenant instances, this function uses streaming batch uploads instead of S3 file upload,
562+ as S3 file upload is not supported in multitenant environments.
563+
520564 .. note::
521565
522566 requires the client to be setup with the READ-WRITE API key
@@ -527,10 +571,12 @@ def upload_file(
527571 :type table_name: str
528572 :param file_path: path to the file to be uploaded
529573 :type file_path: PathLike
530- :param polling_time: polling wait time
574+ :param polling_time: polling wait time (only used for non-multitenant S3 upload)
531575 :type polling_time: int
532576 :param optimize_on_finished: :func:`optimize_table` when finished uploading, defaults to True
533577 :type optimize_on_finished: bool
578+ :param batch_size: batch size for streaming upload (only used for multitenant), defaults to 1000
579+ :type batch_size: int
534580 :raises ValueError: incorrect file extension, should be .ndjson.gz
535581 """
536582 if not check_file_is_gzipped (file_path ):
@@ -541,15 +587,23 @@ def upload_file(
541587 table_name = table_name ,
542588 binary_file = f ,
543589 polling_time = polling_time ,
544- optimize_on_finished = optimize_on_finished
590+ optimize_on_finished = optimize_on_finished ,
591+ batch_size = batch_size
545592 )
546593
547594
548595def quick_add_table (
549- client : AitoClient , input_file : Union [Path , PathLike ], table_name : str = None , input_format : str = None
596+ client : AitoClient ,
597+ input_file : Union [Path , PathLike ],
598+ table_name : str = None ,
599+ input_format : str = None ,
600+ batch_size : int = 1000
550601):
551602 """Create a table and upload a file to the table, using the default inferred schema
552603
604+ For multitenant instances, this function uses streaming batch uploads instead of S3 file upload,
605+ as S3 file upload is not supported in multitenant environments.
606+
553607 :param client: the AitoClient instance
554608 :type client: AitoClient
555609 :param input_file: path to the input file to be uploaded
@@ -558,6 +612,8 @@ def quick_add_table(
558612 :type table_name: Optional[str]
559613 :param input_format: specify the format of the input file, defaults to the input file extension
560614 :type input_format: Optional[str]
615+ :param batch_size: batch size for streaming upload (only used for multitenant), defaults to 1000
616+ :type batch_size: int
561617 """
562618 df_handler = DataFrameHandler ()
563619
@@ -586,7 +642,7 @@ def quick_add_table(
586642 create_table (client , table_name , inferred_schema )
587643
588644 with open (converted_tmp_file .name , 'rb' ) as in_f :
589- upload_binary_file (client = client , table_name = table_name , binary_file = in_f )
645+ upload_binary_file (client = client , table_name = table_name , binary_file = in_f , batch_size = batch_size )
590646 converted_tmp_file .close ()
591647 unlink (converted_tmp_file .name )
592648
0 commit comments