diff --git a/src/llama-quant.cpp b/src/llama-quant.cpp index 322cb313f1c..056aa58caff 100644 --- a/src/llama-quant.cpp +++ b/src/llama-quant.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -1064,9 +1065,27 @@ static void llama_model_quantize_impl(const std::string & fname_inp, const std:: std::vector workers; workers.reserve(nthread); - std::vector> read_data; - std::vector> work; - std::vector> f32_conv_buf; + // double-buffered work areas for pipelined read/compute/write + // buffers 0 and 1 are alternated so that the previous tensor's write + // can overlap with the current tensor's compute (and the next read) + std::vector> read_data[2]; + std::vector> work[2]; + std::vector> f32_conv_buf[2]; + + // async writer state -- the write thread drains completed tensors to disk + // while the main thread moves on to compute the next tensor + struct write_request { + const void * data; + size_t size; + size_t align_pad; + }; + + std::mutex wr_mutex; + std::condition_variable wr_cv; + write_request wr_pending = {}; + bool wr_has_work = false; + bool wr_done = false; + std::exception_ptr wr_error = nullptr; int cur_split = -1; std::ofstream fout; @@ -1097,14 +1116,87 @@ static void llama_model_quantize_impl(const std::string & fname_inp, const std:: ::zeros(fout, meta_size); }; - // no output file for --dry-run + // start background writer thread + // the shutdown lambda ensures the thread is joined even if the main thread + // throws (e.g. requantize error), preventing std::terminate + std::thread writer_thread; + auto shutdown_writer = [&]() { + if (writer_thread.joinable()) { + { + std::lock_guard lock(wr_mutex); + wr_done = true; + } + wr_cv.notify_one(); + writer_thread.join(); + } + }; + if (!params->dry_run) { new_ofstream(0); + writer_thread = std::thread([&]() { + while (true) { + std::unique_lock lock(wr_mutex); + wr_cv.wait(lock, [&]{ return wr_has_work || wr_done; }); + if (wr_done && !wr_has_work) { + break; + } + write_request req = wr_pending; + lock.unlock(); + + try { + fout.write((const char *) req.data, req.size); + zeros(fout, req.align_pad); + } catch (...) { + std::lock_guard elock(wr_mutex); + wr_error = std::current_exception(); + return; + } + + // signal completion after the write finishes so the main thread + // knows the buffer is safe to reuse + { + std::lock_guard dlock(wr_mutex); + wr_has_work = false; + } + wr_cv.notify_one(); + } + }); } + // submit a write and wait for the previous write to complete + // at most one outstanding write at a time -- we double-buffer the data + auto submit_write = [&](const void * data, size_t size, size_t align_pad) { + std::unique_lock lock(wr_mutex); + // wait for previous write to finish + wr_cv.wait(lock, [&]{ return !wr_has_work; }); + if (wr_error) { + std::rethrow_exception(wr_error); + } + wr_pending = { data, size, align_pad }; + wr_has_work = true; + lock.unlock(); + wr_cv.notify_one(); + }; + + auto finish_writes = [&]() { + std::unique_lock lock(wr_mutex); + wr_cv.wait(lock, [&]{ return !wr_has_work; }); + if (wr_error) { + std::rethrow_exception(wr_error); + } + }; + + // no output file for --dry-run + // (writer_thread is not started in dry_run mode) + // // main loop: iterate over all weights // + // the loop pipelines I/O with computation: + // - write(tensor[i-1]) runs in background while we compute tensor[i] + // - we use double-buffered work areas indexed by (i % 2) + + try { for (size_t i = 0; i < tensors.size(); ++i) { const auto & weight = *tensors[i]; @@ -1112,18 +1204,20 @@ static void llama_model_quantize_impl(const std::string & fname_inp, const std:: ggml_tensor * tensor = weight.tensor; if (!params->dry_run && (weight.idx != cur_split && params->keep_split)) { + finish_writes(); close_ofstream(); new_ofstream(weight.idx); } const size_t tensor_size = ggml_nbytes(tensor); + const int buf = i % 2; if (!params->dry_run) { if (!ml.use_mmap) { - if (read_data.size() < tensor_size) { - read_data.resize(tensor_size); + if (read_data[buf].size() < tensor_size) { + read_data[buf].resize(tensor_size); } - tensor->data = read_data.data(); + tensor->data = read_data[buf].data(); } ml.load_data_for(tensor); } @@ -1165,7 +1259,13 @@ static void llama_model_quantize_impl(const std::string & fname_inp, const std:: } else { // no --dry-run, perform quantization if (!quantize) { - new_data = tensor->data; + // copy to work buffer so the read buffer can be reused by the next tensor + // while the writer thread is still draining this data + if (work[buf].size() < tensor_size) { + work[buf].resize(tensor_size); + } + memcpy(work[buf].data(), tensor->data, tensor_size); + new_data = work[buf].data(); new_size = tensor_size; LLAMA_LOG_INFO("size = %8.3f MiB\n", tensor_size/1024.0/1024.0); } else { @@ -1209,17 +1309,17 @@ static void llama_model_quantize_impl(const std::string & fname_inp, const std:: } else if (ggml_is_quantized(tensor->type) && !params->allow_requantize) { throw std::runtime_error(format("requantizing from type %s is disabled", ggml_type_name(tensor->type))); } else { - llama_tensor_dequantize_impl(tensor, f32_conv_buf, workers, nelements, nthread); - f32_data = (float *) f32_conv_buf.data(); + llama_tensor_dequantize_impl(tensor, f32_conv_buf[buf], workers, nelements, nthread); + f32_data = (float *) f32_conv_buf[buf].data(); } LLAMA_LOG_INFO("converting to %s .. ", ggml_type_name(new_type)); fflush(stdout); - if (work.size() < (size_t)nelements * 4) { - work.resize(nelements * 4); // upper bound on size + if (work[buf].size() < (size_t)nelements * 4) { + work[buf].resize(nelements * 4); // upper bound on size } - new_data = work.data(); + new_data = work[buf].data(); const int64_t n_per_row = tensor->ne[0]; const int64_t nrows = tensor->ne[1]; @@ -1250,16 +1350,24 @@ static void llama_model_quantize_impl(const std::string & fname_inp, const std:: GGML_ASSERT(gguf_get_tensor_size(ctx_outs[cur_split].get(), gguf_find_tensor(ctx_outs[cur_split].get(), metadata[i].name.c_str())) == new_size); gguf_set_tensor_data(ctx_outs[cur_split].get(), metadata[i].name.c_str(), new_data); - // write tensor data + padding - fout.write((const char *) new_data, new_size); - zeros(fout, GGML_PAD(new_size, align) - new_size); + // submit async write -- the writer thread writes while we compute the next tensor + submit_write(new_data, new_size, GGML_PAD(new_size, align) - new_size); } // no --dry-run } // main loop if (!params->dry_run) { + // wait for last write, then signal writer thread to exit + finish_writes(); + shutdown_writer(); close_ofstream(); } + } catch (...) { + // ensure writer thread is joined before rethrowing to avoid std::terminate + shutdown_writer(); + throw; + } + LLAMA_LOG_INFO("%s: model size = %8.2f MiB (%.2f BPW)\n", __func__, total_size_org/1024.0/1024.0, total_size_org*8.0/ml.n_elements); LLAMA_LOG_INFO("%s: quant size = %8.2f MiB (%.2f BPW)\n", __func__, total_size_new/1024.0/1024.0, total_size_new*8.0/ml.n_elements);