Skip to content

Commit 5c95081

Browse files
committed
Added a sharing mode to the VCFV4 class
This determines how the class reuses records, i.e. either manually or automatically. In manual mode, records must be explicitly returned to the class for reuse, otherwise the record's shared_ptr will destroy it when the use counter reaches 0. In automatic mode, the VCFV4 class keeps a copy of the record's shared_ptr and automatically reuses it when the use counter reaches 1.
1 parent 7248542 commit 5c95081

2 files changed

Lines changed: 87 additions & 17 deletions

File tree

libtiledbvcf/src/vcf/vcf_v4.cc

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@
2525
*/
2626

2727
#include "vcf/vcf_v4.h"
28-
#include "utils/logger_public.h"
2928

3029
namespace tiledb {
3130
namespace vcf {
3231

33-
VCFV4::VCFV4()
34-
: open_(false)
32+
VCFV4::VCFV4(SharingMode mode)
33+
: mode_(mode)
34+
, open_(false)
3535
, inited_(false)
3636
, max_record_buffer_size_(10000)
3737
, hdr_(nullptr)
@@ -156,7 +156,13 @@ void VCFV4::pop_record() {
156156
}
157157

158158
void VCFV4::return_record(SafeSharedBCFRec& record) {
159-
record_queue_pool_.emplace(std::move(record));
159+
if (mode_ == SharingMode::MANUAL) {
160+
record_queue_pool_.emplace(std::move(record));
161+
} else {
162+
LOG_ERROR(
163+
"VCFV4::return_record records cannot be manually returned for mode {}",
164+
modeToString(mode_));
165+
}
160166
}
161167

162168
std::string VCFV4::contig_name(bcf1_t* const r) const {
@@ -285,6 +291,21 @@ bool VCFV4::seek(const std::string& contig_name, uint32_t pos) {
285291
return !record_queue_.empty();
286292
}
287293

294+
void VCFV4::create_record(SafeBCFRec& tmp_r) {
295+
SafeSharedBCFRec r(bcf_dup(tmp_r.get()), bcf_destroy);
296+
bcf_unpack(r.get(), BCF_UN_ALL);
297+
record_queue_.emplace(std::move(r));
298+
}
299+
300+
void VCFV4::reuse_record(SafeBCFRec& tmp_r) {
301+
SafeSharedBCFRec r = record_queue_pool_.front();
302+
record_queue_pool_.pop();
303+
// Use `bcf_copy` to destroy (free) the stale data to prevent memory leaks
304+
bcf_copy(r.get(), tmp_r.get());
305+
bcf_unpack(r.get(), BCF_UN_ALL);
306+
record_queue_.emplace(std::move(r));
307+
}
308+
288309
void VCFV4::read_records() {
289310
if (!record_queue_.empty())
290311
std::queue<SafeSharedBCFRec>().swap(record_queue_);
@@ -301,19 +322,21 @@ void VCFV4::read_records() {
301322
break;
302323
}
303324

304-
if (!record_queue_pool_.empty()) {
305-
// Pop a stale record for re-use. Note that `bcf_copy`
306-
// destroys (frees) the stale data to prevent a memory
307-
// leak.
308-
SafeSharedBCFRec r = record_queue_pool_.front();
309-
record_queue_pool_.pop();
310-
bcf_copy(r.get(), tmp_r.get());
311-
bcf_unpack(r.get(), BCF_UN_ALL);
312-
record_queue_.emplace(std::move(r));
325+
if (!record_queue_pool_.empty() && mode_ == SharingMode::MANUAL) {
326+
reuse_record(tmp_r);
327+
} else if (!record_queue_pool_.empty() && mode_ == SharingMode::AUTOMATIC) {
328+
// Check if the record at the front of the pool is stale, i.e. the pool
329+
// has the only copy
330+
SafeSharedBCFRec& r = record_queue_pool_.front();
331+
if (r.use_count() == 1) {
332+
reuse_record(tmp_r);
333+
} else {
334+
// Resuse a stale record
335+
create_record(tmp_r);
336+
}
337+
record_queue_pool_.push(record_queue_.front());
313338
} else {
314-
SafeSharedBCFRec r(bcf_dup(tmp_r.get()), bcf_destroy);
315-
bcf_unpack(r.get(), BCF_UN_ALL);
316-
record_queue_.emplace(std::move(r));
339+
create_record(tmp_r);
317340
}
318341
record_buffer_size += sizeof(bcf1_t) + record_queue_.front()->shared.m +
319342
record_queue_.front()->indiv.m;
@@ -328,6 +351,7 @@ void VCFV4::read_records() {
328351
}
329352

330353
void VCFV4::swap(VCFV4& other) {
354+
std::swap(mode_, other.mode_);
331355
std::swap(open_, other.open_);
332356
std::swap(path_, other.path_);
333357
std::swap(index_path_, other.index_path_);

libtiledbvcf/src/vcf/vcf_v4.h

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include <unordered_map>
4242
#include <vector>
4343

44+
#include "utils/logger_public.h"
4445
#include "utils/utils.h"
4546
#include "vcf/htslib_value.h"
4647
#include "vcf/region.h"
@@ -54,7 +55,25 @@ namespace vcf {
5455
*/
5556
class VCFV4 {
5657
public:
57-
VCFV4();
58+
enum SharingMode {
59+
MANUAL,
60+
AUTOMATIC,
61+
};
62+
63+
/**
64+
* Constructor that determines how `SafeSharedBCFRec` records are managed. In
65+
* MANUAL mode, records must be returned using `return_record()` to prevent
66+
* them from being automatically destroyed when the `SafeSharedBCFRec` counter
67+
* reaches 0. In AUTOMATIC mode, `VCFV4` keeps a copy of each
68+
* `SafeSharedBCFRec` and releases it for reuse when the counter reachers 1.
69+
*
70+
* Note that AUTOMATIC mode uses a lazy algorithm for releasing records, so
71+
* it is only appropriate when records can be released in roughly the same
72+
* order that they were popped.
73+
*
74+
* @param mode The mode to use for managing SafeSharedBCFRec pointers
75+
*/
76+
VCFV4(SharingMode mode = SharingMode::MANUAL);
5877
~VCFV4();
5978

6079
VCFV4(VCFV4&& other) = delete;
@@ -135,6 +154,15 @@ class VCFV4 {
135154
void set_max_record_buff_size(uint64_t max_record_buffer_size);
136155

137156
private:
157+
inline const std::string modeToString(SharingMode mode) {
158+
switch (mode) {
159+
case MANUAL:
160+
return "MANUAL";
161+
case AUTOMATIC:
162+
return "AUTOMATIC";
163+
}
164+
LOG_ERROR("VCFV4::modeToString {} is not a valid SharingMode", mode);
165+
}
138166
/** BCF/VCF iterator wrapper. */
139167
class Iter {
140168
public:
@@ -166,6 +194,9 @@ class VCFV4 {
166194
kstring_t tmps_ = {0, 0, nullptr};
167195
};
168196

197+
/** The mode used to manage the SafeSharedBCFRec pointers. */
198+
SharingMode mode_;
199+
169200
/** True if the file is open. */
170201
bool open_;
171202

@@ -199,6 +230,21 @@ class VCFV4 {
199230
/** The HTS index handle, if the index format is HTS. */
200231
hts_idx_t* index_hts_;
201232

233+
/**
234+
* Creates a new record and adds it to the `record_queue_`.
235+
*
236+
* @param tmp_r The record to wrap
237+
*/
238+
void create_record(SafeBCFRec& tmp_r);
239+
240+
/**
241+
* Pops the first record from `record_queue_pool_` and reuses it to add a
242+
* record to `record_queue_`.
243+
*
244+
* @param tmp_r The record to wrap
245+
*/
246+
void reuse_record(SafeBCFRec& tmp_r);
247+
202248
/** Reads records into the record buffer using `iter_`. */
203249
void read_records();
204250

0 commit comments

Comments
 (0)