-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathbuffer.cpp
More file actions
95 lines (84 loc) · 2.91 KB
/
buffer.cpp
File metadata and controls
95 lines (84 loc) · 2.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#include "databento/detail/buffer.hpp"
#include <algorithm>
#include <sstream>
#include "databento/exceptions.hpp"
#include "detail/stream_op_helper.hpp"
using databento::detail::Buffer;
using Status = databento::IReadable::Status;
size_t Buffer::Write(const char* data, std::size_t length) {
return Write(reinterpret_cast<const std::byte*>(data), length);
}
size_t Buffer::Write(const std::byte* data, std::size_t length) {
ShiftForSpace(length);
const auto write_size = std::min(WriteCapacity(), length);
std::copy(data, data + write_size, WriteBegin());
Fill(write_size);
return write_size;
}
void Buffer::WriteAll(const char* data, std::size_t length) {
WriteAll(reinterpret_cast<const std::byte*>(data), length);
}
void Buffer::WriteAll(const std::byte* data, std::size_t length) {
if (length > Capacity() - ReadCapacity()) {
Reserve(ReadCapacity() + length);
} else {
ShiftForSpace(length);
}
std::copy(data, data + length, WriteBegin());
write_pos_ += length;
}
void Buffer::ReadExact(std::byte* buffer, std::size_t length) {
if (length > ReadCapacity()) {
std::ostringstream err_msg;
err_msg << "Reached end of buffer without " << length << " bytes, only "
<< ReadCapacity() << " bytes available";
throw databento::Exception{err_msg.str()};
}
ReadSome(buffer, length);
}
std::size_t Buffer::ReadSome(std::byte* buffer, std::size_t max_length) {
const auto read_size = std::min(ReadCapacity(), max_length);
std::copy(ReadBegin(), ReadBegin() + read_size, buffer);
Consume(read_size);
return read_size;
}
databento::IReadable::Result Buffer::ReadSome(std::byte* buffer, std::size_t max_length,
std::chrono::milliseconds) {
const auto bytes_read = ReadSome(buffer, max_length);
return {bytes_read, Status::Ok};
}
void Buffer::Reserve(std::size_t capacity) {
if (capacity <= Capacity()) {
return;
}
UniqueBufPtr new_buf{AlignedNew(capacity), AlignedDelete};
const auto unread_bytes = ReadCapacity();
std::copy(ReadBegin(), ReadEnd(), new_buf.get());
buf_ = std::move(new_buf);
end_ = buf_.get() + capacity;
read_pos_ = buf_.get();
write_pos_ = read_pos_ + unread_bytes;
}
void Buffer::Shift() {
const auto unread_bytes = ReadCapacity();
if (unread_bytes) {
std::copy(ReadBegin(), ReadEnd(), buf_.get());
}
read_pos_ = buf_.get();
write_pos_ = read_pos_ + unread_bytes;
}
namespace databento::detail {
std::ostream& operator<<(std::ostream& stream, const Buffer& buffer) {
return StreamOpBuilder{stream}
.SetTypeName("Buffer")
.SetSpacer(" ")
.Build()
.AddField("buf_", buffer.buf_.get())
.AddField("end_", buffer.end_)
.AddField("read_pos", buffer.read_pos_)
.AddField("write_pos_", buffer.write_pos_)
.AddField("ReadCapacity", buffer.ReadCapacity())
.AddField("WriteCapacity", buffer.WriteCapacity())
.Finish();
}
} // namespace databento::detail