@@ -71,6 +71,30 @@ Writer::IngestSamplesV4Job::IngestSamplesV4Job(
7171 worker->init (dataset, params, samples);
7272}
7373
74+ bool Writer::IngestSamplesV4Job::is_finalizing () {
75+ return (
76+ finalize &&
77+ ((records_task.valid () && records_task.wait_for (std::chrono::seconds (
78+ 0 )) != std::future_status::ready) ||
79+ (anchors_task.valid () && anchors_task.wait_for (std::chrono::seconds (
80+ 0 )) != std::future_status::ready) ||
81+ (stats_task.valid () && stats_task.wait_for (std::chrono::seconds (0 )) !=
82+ std::future_status::ready)));
83+ }
84+
85+ void Writer::IngestSamplesV4Job::wait_for_flush_tasks () {
86+ // Use .get() instead of .wait() to propagate errors
87+ if (records_task.valid ()) {
88+ records_task.get ();
89+ }
90+ if (anchors_task.valid ()) {
91+ anchors_task.get ();
92+ }
93+ if (stats_task.valid ()) {
94+ stats_task.get ();
95+ }
96+ }
97+
7498void Writer::IngestSamplesV4Job::reset_queries () {
7599 query.reset (new Query (*ctx, *array));
76100 query->set_layout (TILEDB_GLOBAL_ORDER);
@@ -352,8 +376,9 @@ void Writer::update_params(IngestionParams& params) {
352376 LOG_INFO (" Output buffer flush = {} MiB" , params.max_tiledb_buffer_size_mb );
353377
354378 } else {
379+ // Divide by 2 because we're double buffering
355380 params.max_tiledb_buffer_size_mb =
356- params.ratio_output_flush * params.output_memory_budget_mb ;
381+ params.ratio_output_flush * params.output_memory_budget_mb / 2 ;
357382 LOG_INFO (
358383 " Output buffers = {} threads * {} MiB (flush = {} MiB)" ,
359384 params.num_threads ,
@@ -1004,30 +1029,49 @@ std::pair<uint64_t, uint64_t> Writer::ingest_samples(
10041029void Writer::ingest_samples_v4_flush (
10051030 std::shared_ptr<IngestSamplesV4Job>& job, bool finalize) {
10061031 // Wait for the previous flush to finish
1007- if (job->flush_task .valid ()) {
1008- job->flush_task .get ();
1009- }
1032+ job->wait_for_flush_tasks ();
10101033 const size_t write_buffer = job->current_buffer ;
10111034 if (finalize) {
10121035 job->worker ->pre_finalize (write_buffer);
10131036 // Move the queries before reseting them
10141037 TRY_CATCH_THROW (
1015- job->flush_task = std::async (
1038+ job->records_task = std::async (
1039+ std::launch::async,
1040+ [job, query = std::move (job->query ), write_buffer]() mutable {
1041+ job->worker ->write_record_buffers (query, true , write_buffer);
1042+ }));
1043+ TRY_CATCH_THROW (
1044+ job->anchors_task = std::async (
10161045 std::launch::async,
10171046 [job,
1018- query = std::move (job->query ),
10191047 anchor_query = std::move (job->anchor_query ),
10201048 write_buffer]() mutable {
1021- job->worker ->write_buffers (
1022- query, anchor_query, true , write_buffer);
1049+ job->worker ->write_anchor_buffers (
1050+ anchor_query, true , write_buffer);
1051+ }));
1052+ TRY_CATCH_THROW (
1053+ job->stats_task =
1054+ std::async (std::launch::async, [job, write_buffer]() mutable {
1055+ job->worker ->flush_ingestion_tasks (true , write_buffer);
10231056 }));
10241057 // Start new queries since the next contig will be on a new fragment
10251058 job->reset_queries ();
10261059 } else {
10271060 TRY_CATCH_THROW (
1028- job->flush_task = std::async (std::launch::async, [job, write_buffer]() {
1029- job->worker ->write_buffers (
1030- job->query , job->anchor_query , false , write_buffer);
1061+ job->records_task =
1062+ std::async (std::launch::async, [job, write_buffer]() {
1063+ job->worker ->write_record_buffers (
1064+ job->query , false , write_buffer);
1065+ }));
1066+ TRY_CATCH_THROW (
1067+ job->anchors_task =
1068+ std::async (std::launch::async, [job, write_buffer]() {
1069+ job->worker ->write_anchor_buffers (
1070+ job->anchor_query , false , write_buffer);
1071+ }));
1072+ TRY_CATCH_THROW (
1073+ job->stats_task = std::async (std::launch::async, [job, write_buffer]() {
1074+ job->worker ->flush_ingestion_tasks (false , write_buffer);
10311075 }));
10321076 }
10331077}
@@ -1183,9 +1227,7 @@ std::pair<uint64_t, uint64_t> Writer::ingest_samples_v4(
11831227 // Create a new job if the job's flush task is still running and the job is
11841228 // finalized, otherwise leave the job at the front of the pool in case it's
11851229 // part of a merged contig
1186- if (job->flush_task .valid () && job->finalize &&
1187- job->flush_task .wait_for (std::chrono::seconds (0 )) !=
1188- std::future_status::ready) {
1230+ if (job->is_finalizing ()) {
11891231 // Add a new job to the front of the pool
11901232 ingestion_job_pool.emplace_front (new IngestSamplesV4Job (
11911233 ctx_,
@@ -1197,9 +1239,8 @@ std::pair<uint64_t, uint64_t> Writer::ingest_samples_v4(
11971239 params,
11981240 samples));
11991241 job = ingestion_job_pool.front ();
1200- } else if (job->flush_task .valid ()) {
1201- job->flush_task .wait ();
12021242 }
1243+ job->wait_for_flush_tasks ();
12031244
12041245 // Repeatedly parse variants into a buffer and write the buffer when it's
12051246 // full
@@ -1216,6 +1257,7 @@ std::pair<uint64_t, uint64_t> Writer::ingest_samples_v4(
12161257 }
12171258
12181259 // Concurrently flush the current buffers
1260+ LOG_DEBUG (" allow duplicates: {}" , creation_params_.allow_duplicates );
12191261 ingest_samples_v4_flush (job);
12201262 ++job->current_buffer %= num_buffers;
12211263
@@ -1260,9 +1302,7 @@ std::pair<uint64_t, uint64_t> Writer::ingest_samples_v4(
12601302 // Wait for all flush tasks before finishing
12611303 while (!ingestion_job_pool.empty ()) {
12621304 std::shared_ptr<IngestSamplesV4Job> job = ingestion_job_pool.front ();
1263- if (job->flush_task .valid ()) {
1264- job->flush_task .get ();
1265- }
1305+ job->wait_for_flush_tasks ();
12661306 ingestion_job_pool.pop_front ();
12671307 }
12681308
0 commit comments