Skip to content

Commit 92d262d

Browse files
committed
iobuf support reserve_aligned
1 parent a1877bc commit 92d262d

3 files changed

Lines changed: 172 additions & 0 deletions

File tree

src/butil/iobuf.cpp

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,23 @@ inline IOBuf::Block* create_block() {
355355
return create_block(IOBuf::DEFAULT_BLOCK_SIZE);
356356
}
357357

358+
inline IOBuf::Block* create_block_aligned(size_t block_size, size_t alignment) {
359+
if (block_size > 0xFFFFFFFFULL) {
360+
LOG(FATAL) << "block_size=" << block_size << " is too large";
361+
return NULL;
362+
}
363+
char* mem = (char*)iobuf::blockmem_allocate(block_size);
364+
if (mem == NULL) {
365+
return NULL;
366+
}
367+
char* data = mem + sizeof(IOBuf::Block);
368+
// change data pointer & data size make align satisfied
369+
size_t adder = (-reinterpret_cast<uintptr_t>(data)) & (alignment - 1);
370+
size_t size =
371+
(block_size - sizeof(IOBuf::Block) - adder) & ~(alignment - 1);
372+
return new (mem) IOBuf::Block(data + adder, size);
373+
}
374+
358375
// === Share TLS blocks between appending operations ===
359376
// Max number of blocks in each TLS. This is a soft limit namely
360377
// release_tls_block_chain() may exceed this limit sometimes.
@@ -1785,6 +1802,45 @@ void IOPortal::return_cached_blocks_impl(Block* b) {
17851802
iobuf::release_tls_block_chain(b);
17861803
}
17871804

1805+
IOBuf::Area IOReserveAlignedBuf::reserve(size_t count) {
1806+
IOBuf::Area result = INVALID_AREA;
1807+
if (_reserved == true) {
1808+
LOG(ERROR) << "Already call reserved";
1809+
return result;
1810+
}
1811+
_reserved = true;
1812+
bool is_power_two = _alignment > 0 && (_alignment & (_alignment - 1));
1813+
if (is_power_two != 0) {
1814+
LOG(ERROR) << "Invalid alignment, must power of two";
1815+
return INVALID_AREA;
1816+
}
1817+
count = (count + _alignment - 1) & ~(_alignment - 1);
1818+
size_t total_nc = 0;
1819+
while (total_nc < count) {
1820+
const auto block_size =
1821+
std::max(_alignment, 4096UL) * 2 + sizeof(IOBuf::Block);
1822+
auto b = iobuf::create_block_aligned(block_size, _alignment);
1823+
if (BAIDU_UNLIKELY(!b)) {
1824+
LOG(ERROR) << "Create block failed";
1825+
return result;
1826+
}
1827+
const size_t nc = std::min(count - total_nc, b->left_space());
1828+
const IOBuf::BlockRef r = {(uint32_t)b->size, (uint32_t)nc, b};
1829+
_push_back_ref(r);
1830+
// aligned block is not from tls, release block ref
1831+
b->dec_ref();
1832+
if (total_nc == 0) {
1833+
// Encode the area at first time. Notice that the pushed ref may
1834+
// be merged with existing ones.
1835+
result = make_area(_ref_num() - 1, _back_ref().length - nc, count);
1836+
}
1837+
// add total nc
1838+
total_nc += nc;
1839+
b->size += nc;
1840+
};
1841+
return result;
1842+
}
1843+
17881844
//////////////// IOBufCutter ////////////////
17891845

17901846
IOBufCutter::IOBufCutter(butil::IOBuf* buf)

src/butil/iobuf.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,17 @@ class IOPortal : public IOBuf {
489489
Block* _block;
490490
};
491491

492+
class IOReserveAlignedBuf : public IOBuf {
493+
public:
494+
IOReserveAlignedBuf(size_t alignment)
495+
: _alignment(alignment), _reserved(false) {}
496+
Area reserve(size_t count);
497+
498+
private:
499+
size_t _alignment;
500+
bool _reserved;
501+
};
502+
492503
// Specialized utility to cut from IOBuf faster than using corresponding
493504
// methods in IOBuf.
494505
// Designed for efficiently parsing data from IOBuf.

test/iobuf_unittest.cpp

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <fcntl.h> // O_RDONLY
2323
#include <stdlib.h>
2424
#include <memory>
25+
#include <cstring>
2526
#include <butil/files/temp_file.h> // TempFile
2627
#include <butil/containers/flat_map.h>
2728
#include <butil/macros.h>
@@ -1785,4 +1786,108 @@ TEST_F(IOBufTest, acquire_tls_block) {
17851786
ASSERT_NE(butil::iobuf::block_cap(b), butil::iobuf::block_size(b));
17861787
}
17871788

1789+
TEST_F(IOBufTest, reserve_aligned) {
1790+
{
1791+
butil::IOReserveAlignedBuf buf(16);
1792+
auto area = buf.reserve(1024);
1793+
ASSERT_NE(area, butil::IOBuf::INVALID_AREA);
1794+
butil::IOBufAsZeroCopyInputStream wrapper(buf);
1795+
const void* data;
1796+
int size;
1797+
int total_size = 0;
1798+
while (wrapper.Next(&data, &size)) {
1799+
ASSERT_EQ(reinterpret_cast<uintptr_t>(data) % 16, 0);
1800+
ASSERT_EQ(size % 16, 0);
1801+
total_size += size;
1802+
}
1803+
ASSERT_EQ(total_size, 1024);
1804+
}
1805+
{
1806+
butil::IOReserveAlignedBuf buf(4096);
1807+
auto area = buf.reserve(1024);
1808+
ASSERT_NE(area, butil::IOBuf::INVALID_AREA);
1809+
butil::IOBufAsZeroCopyInputStream wrapper(buf);
1810+
const void* data;
1811+
int size;
1812+
int total_size = 0;
1813+
while (wrapper.Next(&data, &size)) {
1814+
ASSERT_EQ(reinterpret_cast<uintptr_t>(data) % 4096, 0);
1815+
ASSERT_EQ(size % 4096, 0);
1816+
total_size += size;
1817+
}
1818+
ASSERT_EQ(total_size, 4096);
1819+
}
1820+
{
1821+
butil::IOReserveAlignedBuf buf(4096);
1822+
auto area = buf.reserve(8191);
1823+
ASSERT_NE(area, butil::IOBuf::INVALID_AREA);
1824+
butil::IOBufAsZeroCopyInputStream wrapper(buf);
1825+
const void* data;
1826+
int size;
1827+
int total_size = 0;
1828+
while (wrapper.Next(&data, &size)) {
1829+
ASSERT_EQ(reinterpret_cast<uintptr_t>(data) % 4096, 0);
1830+
ASSERT_EQ(size % 4096, 0);
1831+
total_size += size;
1832+
}
1833+
ASSERT_EQ(total_size, 8192);
1834+
}
1835+
{
1836+
butil::IOReserveAlignedBuf buf(4096);
1837+
auto area = buf.reserve(4096 * 10 - 1);
1838+
ASSERT_NE(area, butil::IOBuf::INVALID_AREA);
1839+
butil::IOBufAsZeroCopyInputStream wrapper(buf);
1840+
const void* data;
1841+
int size;
1842+
int total_size = 0;
1843+
while (wrapper.Next(&data, &size)) {
1844+
ASSERT_EQ(reinterpret_cast<uintptr_t>(data) % 4096, 0);
1845+
ASSERT_EQ(size % 4096, 0);
1846+
total_size += size;
1847+
}
1848+
ASSERT_EQ(total_size, 4096 * 10);
1849+
}
1850+
{
1851+
butil::IOReserveAlignedBuf buf(4095);
1852+
auto area = buf.reserve(4096);
1853+
ASSERT_EQ(area, butil::IOBuf::INVALID_AREA);
1854+
}
1855+
{
1856+
butil::IOReserveAlignedBuf buf(8192);
1857+
auto area = buf.reserve(4096 * 10 + 1);
1858+
ASSERT_NE(area, butil::IOBuf::INVALID_AREA);
1859+
butil::IOBufAsZeroCopyInputStream wrapper(buf);
1860+
const void* data;
1861+
int size;
1862+
int total_size = 0;
1863+
while (wrapper.Next(&data, &size)) {
1864+
ASSERT_EQ(reinterpret_cast<uintptr_t>(data) % 4096, 0);
1865+
ASSERT_EQ(size % 4096, 0);
1866+
total_size += size;
1867+
}
1868+
ASSERT_EQ(total_size, 4096 * 10 + 8192);
1869+
}
1870+
{
1871+
butil::IOReserveAlignedBuf buf(4096);
1872+
auto area = buf.reserve(1024 * 1024 * 3);
1873+
ASSERT_NE(area, butil::IOBuf::INVALID_AREA);
1874+
butil::IOBufAsZeroCopyInputStream wrapper(buf);
1875+
const void* data;
1876+
int size;
1877+
int count = 0;
1878+
int total_size = 0;
1879+
std::stringstream ss;
1880+
while (wrapper.Next(&data, &size)) {
1881+
ASSERT_EQ(reinterpret_cast<uintptr_t>(data) % 4096, 0);
1882+
ASSERT_EQ(size % 4096, 0);
1883+
std::string str(size, 'A' + count++);
1884+
ss << str;
1885+
std::memcpy(const_cast<void*>(data), str.data(), str.size());
1886+
total_size += size;
1887+
}
1888+
ASSERT_EQ(total_size, 3145728);
1889+
ASSERT_EQ(ss.str(), buf.to_string());
1890+
}
1891+
}
1892+
17881893
} // namespace

0 commit comments

Comments
 (0)