Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 124 additions & 16 deletions src/llama-quant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <cmath>
#include <cstring>
#include <cinttypes>
#include <condition_variable>
#include <fstream>
#include <mutex>
#include <regex>
Expand Down Expand Up @@ -1064,9 +1065,27 @@ static void llama_model_quantize_impl(const std::string & fname_inp, const std::
std::vector<std::thread> workers;
workers.reserve(nthread);

std::vector<no_init<uint8_t>> read_data;
std::vector<no_init<uint8_t>> work;
std::vector<no_init<float>> f32_conv_buf;
// double-buffered work areas for pipelined read/compute/write
// buffers 0 and 1 are alternated so that the previous tensor's write
// can overlap with the current tensor's compute (and the next read)
std::vector<no_init<uint8_t>> read_data[2];
std::vector<no_init<uint8_t>> work[2];
std::vector<no_init<float>> f32_conv_buf[2];

// async writer state -- the write thread drains completed tensors to disk
// while the main thread moves on to compute the next tensor
struct write_request {
const void * data;
size_t size;
size_t align_pad;
};

std::mutex wr_mutex;
std::condition_variable wr_cv;
write_request wr_pending = {};
bool wr_has_work = false;
bool wr_done = false;
std::exception_ptr wr_error = nullptr;

int cur_split = -1;
std::ofstream fout;
Expand Down Expand Up @@ -1097,33 +1116,108 @@ static void llama_model_quantize_impl(const std::string & fname_inp, const std::
::zeros(fout, meta_size);
};

// no output file for --dry-run
// start background writer thread
// the shutdown lambda ensures the thread is joined even if the main thread
// throws (e.g. requantize error), preventing std::terminate
std::thread writer_thread;
auto shutdown_writer = [&]() {
if (writer_thread.joinable()) {
{
std::lock_guard<std::mutex> lock(wr_mutex);
wr_done = true;
}
wr_cv.notify_one();
writer_thread.join();
}
};

if (!params->dry_run) {
new_ofstream(0);
writer_thread = std::thread([&]() {
while (true) {
std::unique_lock<std::mutex> lock(wr_mutex);
wr_cv.wait(lock, [&]{ return wr_has_work || wr_done; });
if (wr_done && !wr_has_work) {
break;
}
write_request req = wr_pending;
lock.unlock();

try {
fout.write((const char *) req.data, req.size);
zeros(fout, req.align_pad);
} catch (...) {
std::lock_guard<std::mutex> elock(wr_mutex);
wr_error = std::current_exception();
return;
}

// signal completion after the write finishes so the main thread
// knows the buffer is safe to reuse
{
std::lock_guard<std::mutex> dlock(wr_mutex);
wr_has_work = false;
}
wr_cv.notify_one();
}
});
}

// submit a write and wait for the previous write to complete
// at most one outstanding write at a time -- we double-buffer the data
auto submit_write = [&](const void * data, size_t size, size_t align_pad) {
std::unique_lock<std::mutex> lock(wr_mutex);
// wait for previous write to finish
wr_cv.wait(lock, [&]{ return !wr_has_work; });
if (wr_error) {
std::rethrow_exception(wr_error);
}
wr_pending = { data, size, align_pad };
wr_has_work = true;
lock.unlock();
wr_cv.notify_one();
};

auto finish_writes = [&]() {
std::unique_lock<std::mutex> lock(wr_mutex);
wr_cv.wait(lock, [&]{ return !wr_has_work; });
if (wr_error) {
std::rethrow_exception(wr_error);
}
};

// no output file for --dry-run
// (writer_thread is not started in dry_run mode)

//
// main loop: iterate over all weights
//
// the loop pipelines I/O with computation:
// - write(tensor[i-1]) runs in background while we compute tensor[i]
// - we use double-buffered work areas indexed by (i % 2)

try {

for (size_t i = 0; i < tensors.size(); ++i) {
const auto & weight = *tensors[i];
const auto & tm = metadata[i];
ggml_tensor * tensor = weight.tensor;

if (!params->dry_run && (weight.idx != cur_split && params->keep_split)) {
finish_writes();
close_ofstream();
new_ofstream(weight.idx);
}

const size_t tensor_size = ggml_nbytes(tensor);
const int buf = i % 2;

if (!params->dry_run) {
if (!ml.use_mmap) {
if (read_data.size() < tensor_size) {
read_data.resize(tensor_size);
if (read_data[buf].size() < tensor_size) {
read_data[buf].resize(tensor_size);
}
tensor->data = read_data.data();
tensor->data = read_data[buf].data();
}
ml.load_data_for(tensor);
}
Expand Down Expand Up @@ -1165,7 +1259,13 @@ static void llama_model_quantize_impl(const std::string & fname_inp, const std::
} else {
// no --dry-run, perform quantization
if (!quantize) {
new_data = tensor->data;
// copy to work buffer so the read buffer can be reused by the next tensor
// while the writer thread is still draining this data
if (work[buf].size() < tensor_size) {
work[buf].resize(tensor_size);
}
memcpy(work[buf].data(), tensor->data, tensor_size);
new_data = work[buf].data();
new_size = tensor_size;
LLAMA_LOG_INFO("size = %8.3f MiB\n", tensor_size/1024.0/1024.0);
} else {
Expand Down Expand Up @@ -1209,17 +1309,17 @@ static void llama_model_quantize_impl(const std::string & fname_inp, const std::
} else if (ggml_is_quantized(tensor->type) && !params->allow_requantize) {
throw std::runtime_error(format("requantizing from type %s is disabled", ggml_type_name(tensor->type)));
} else {
llama_tensor_dequantize_impl(tensor, f32_conv_buf, workers, nelements, nthread);
f32_data = (float *) f32_conv_buf.data();
llama_tensor_dequantize_impl(tensor, f32_conv_buf[buf], workers, nelements, nthread);
f32_data = (float *) f32_conv_buf[buf].data();
}

LLAMA_LOG_INFO("converting to %s .. ", ggml_type_name(new_type));
fflush(stdout);

if (work.size() < (size_t)nelements * 4) {
work.resize(nelements * 4); // upper bound on size
if (work[buf].size() < (size_t)nelements * 4) {
work[buf].resize(nelements * 4); // upper bound on size
}
new_data = work.data();
new_data = work[buf].data();

const int64_t n_per_row = tensor->ne[0];
const int64_t nrows = tensor->ne[1];
Expand Down Expand Up @@ -1250,16 +1350,24 @@ static void llama_model_quantize_impl(const std::string & fname_inp, const std::
GGML_ASSERT(gguf_get_tensor_size(ctx_outs[cur_split].get(), gguf_find_tensor(ctx_outs[cur_split].get(), metadata[i].name.c_str())) == new_size);
gguf_set_tensor_data(ctx_outs[cur_split].get(), metadata[i].name.c_str(), new_data);

// write tensor data + padding
fout.write((const char *) new_data, new_size);
zeros(fout, GGML_PAD(new_size, align) - new_size);
// submit async write -- the writer thread writes while we compute the next tensor
submit_write(new_data, new_size, GGML_PAD(new_size, align) - new_size);
} // no --dry-run
} // main loop

if (!params->dry_run) {
// wait for last write, then signal writer thread to exit
finish_writes();
shutdown_writer();
close_ofstream();
}

} catch (...) {
// ensure writer thread is joined before rethrowing to avoid std::terminate
shutdown_writer();
throw;
}

LLAMA_LOG_INFO("%s: model size = %8.2f MiB (%.2f BPW)\n", __func__, total_size_org/1024.0/1024.0, total_size_org*8.0/ml.n_elements);
LLAMA_LOG_INFO("%s: quant size = %8.2f MiB (%.2f BPW)\n", __func__, total_size_new/1024.0/1024.0, total_size_new*8.0/ml.n_elements);

Expand Down