Skip to content
Merged
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "solrcopy"
version = "0.9.0"
version = "0.9.1"
edition = "2024"
rust-version = "1.88"

Expand Down
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,21 @@ Extracting and updating documents in huge cores can be challenging. It can take

Bellow some tricks for dealing with such cores:

1. For reducing time, you can use the switches `--readers` and `--writers` for executing operations in parallel.
1. For reducing time, you can use the switches `--readers` and `--writers` for executing operations in parallel. Make sure the sum of this parameters fits the number of threads that your processor cores can handle.
2. When the number of docs to extract is huge, `backup` subcommand tend to slow as times goes and eventually fails. This is because Solr is suffers to get docs batches with hight skip/start parameters. For dealing with this:
1. Use the parameters `--iterate-by`n `between` and `--step`for iterating through parameter `--query` with variables `{begin}` and `{end}`.
2. This way it will iterate and restrict by hour, day, range the docs being downloaded.
3. For example: `--query 'date:[{begin} TO {end}]' --iterate-by day --between '2020-04-01' '2020-04-30T23:59:59'`
4. Keep the number of iterations low by specifying the parameters `--step` and `--num-docs` to adequated values. As the process will run in two nested loops, the amount of time/effort will raise if the number of iterations increases.
3. Use the parameter `--param shards=shard1` for copying by each shard by name in `backkup`subcommand.
4. Use the parameter `--delay` for avoiding to overload the Solr server.

### Non-Stored Fields

When you're backing up the index with `solrcopy`, this can result in a lossy process. In many cases, a core doesn't store fields that are only going to use for searching - and not for displaying. When backuping with `solrcopy`, you'll lose ths information. You will not be able to restore the index so that it works the same as before and the data will be lost for good if the index disappears.

In this case, if you need the value of the non-stored fields the proper way is to use the replication handler or the built-in backup feature in cloud mode.

### Command Line Arguments

#### solrcopy commands
Expand Down
27 changes: 26 additions & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ services:
hostname: solrhost
environment:
- SOLR_IMAGE_TAG=${TAG:-latest}
- SOLR_CREATE_CORE=${CORE:-example}
- SOLR_RUN_MODE=${MODE:-testing}
- JAVA_TOOL_OPTIONS="--sun-misc-unsafe-memory-access=allow"
ports:
- "8983:8983"
volumes:
Expand All @@ -31,6 +33,9 @@ services:
retries: 1
interval: 60s
configs:
- source: bashrc.sh
target: /home/solr/.bashrc
mode: 0644
- source: solr-setup-precreate.sh
target: /opt/solr/docker/scripts/solr-setup-precreate
mode: 0755
Expand Down Expand Up @@ -67,6 +72,26 @@ volumes:
name: configuration

configs:
bashrc.sh:
content: |
# ~/.bashrc: executed by bash(1) for non-login shells.
# see /usr/share/doc/bash/examples/startup-files (in the package bash-doc)
# for examples

# If not running interactively, don't do anything
case $- in
*i*) ;;
*) return;;
esac

$ Some useful aliases
alias ls='ls --escape --dereference-command-line --human-readable --time-style=iso --no-group --color=auto'
alias ll='ls --escape --dereference-command-line --human-readable --time-style=iso --no-group --color=auto --almost-all -l'
alias la='ls --escape --dereference-command-line --human-readable --time-style=iso --no-group --color=auto --almost-all'
alias l='ls --escape --dereference-command-line --human-readable --time-style=iso --no-group --color=auto -CF'
# END OF SCRIPT #


solr-setup-precreate.sh:
content: |
#!/bin/bash
Expand Down Expand Up @@ -176,7 +201,7 @@ configs:
case "$${SOLR_RUN_MODE:-testing}" in
standalone) SOLR_RUN_COMMAND="solr-foreground --user-managed";;
cloud) SOLR_RUN_COMMAND="solr-foreground";;
precreate) SOLR_RUN_COMMAND="solr-precreate";;
precreate) SOLR_RUN_COMMAND="solr-precreate $${SOLR_CREATE_CORE:-}";;
demo) SOLR_RUN_COMMAND="solr-demo";;
testing) SOLR_RUN_COMMAND="solr-runas-user-managed";;
*) echo "ERROR: Invalid Solr run mode: $${SOLR_RUN_MODE:-}."
Expand Down
8 changes: 7 additions & 1 deletion src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,13 @@ pub(crate) struct ParallelArgs {

/// Extra parameter for Solr Update Handler.
/// See: <https://lucene.apache.org/solr/guide/transforming-and-indexing-custom-json.html>
#[arg(short, long, display_order = 60, value_name = "useParams=mypars")]
#[arg(
short,
long,
display_order = 60,
value_name = "p1=value1&p2=value2",
default_value = "echoParams=none"
)]
pub params: Option<String>,

/// How many times should continue on source document errors
Expand Down
113 changes: 67 additions & 46 deletions src/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ use super::{
steps::{Requests, Slices},
};
use crossbeam_channel::{Receiver, Sender, bounded};
use crossbeam_utils::thread;
use log::{debug, error, info, trace};
use std::sync::{Arc, atomic::AtomicBool};
use log::{debug, error, info};
use std::thread;
use std::{path::PathBuf, time::Instant};

pub(crate) fn backup_main(params: &Backup) -> BoxedError {
Expand All @@ -36,7 +35,6 @@ pub(crate) fn backup_main(params: &Backup) -> BoxedError {
params.options.core
);

let ctrl_c = monitor_term_sinal();
let started = Instant::now();

thread::scope(|pool| {
Expand All @@ -49,32 +47,45 @@ pub(crate) fn backup_main(params: &Backup) -> BoxedError {
let (sender, receiver) = bounded::<Documents>(writers_channel.to_usize());
let (progress, reporter) = bounded::<u64>(transfer.writers.to_usize());

pool.spawn(|_| {
debug!("Started generator thread");
start_querying_core(params, &schema, generator, &ctrl_c);
debug!("Finished generator thread");
});
let gen_handle = thread::Builder::new()
.name("Generator".to_string())
.spawn_scoped(pool, || {
start_querying_core(params, &schema, generator);
})
.unwrap();

start_solr_readers(pool, params, sender, sequence, num_found);
let reader_handles = start_solr_readers(pool, params, sender, sequence);

start_archive_writers(pool, params, receiver, progress, num_retrieve);
let writer_handles = start_archive_writers(pool, params, receiver, progress, num_retrieve);

retrieved = forall_progress(reporter, num_retrieve, params.options.is_quiet());
})
.unwrap();
let bar_handle = thread::Builder::new()
.name("Generator".to_string())
.spawn_scoped(pool, || {
retrieved = forall_progress(reporter, num_retrieve, params.options.is_quiet());
})
.unwrap();

// TODO: handle the finished thread with join
let mut handles = vec![];
handles.push(gen_handle);
handles.extend(reader_handles);
handles.extend(writer_handles);
handles.push(bar_handle);
for handle in handles {
handle.join().unwrap();
}
});

finish_writing(ctrl_c, started, num_retrieve, retrieved, params.transfer.delay_after)
finish_progress(started, num_retrieve, retrieved, params.transfer.delay_after)
}

fn start_solr_readers(
pool: &thread::Scope<'_>, params: &Backup, sender: Sender<Documents>, sequence: Receiver<Step>,
num_found: u64,
) {
fn start_solr_readers<'scope>(
pool: &'scope thread::Scope<'scope, '_>, params: &Backup, sender: Sender<Documents>,
sequence: Receiver<Step>,
) -> Vec<thread::ScopedJoinHandle<'scope, ()>> {
let merr = params.transfer.max_errors;
let delay = params.transfer.delay_per_request;
let must_match = if params.workaround_shards > 0 { num_found } else { 0 };

let mut handles = vec![];

for ir in 0..params.transfer.readers {
let producer = sender.clone();
Expand All @@ -83,27 +94,31 @@ fn start_solr_readers(
let reader = ir;
let thread_name = format!("Reader_{}", reader);

pool.builder()
let handle = thread::Builder::new()
.name(thread_name)
.spawn(move |_| {
.spawn_scoped(pool, move || {
debug!("Started reader #{}", reader);
start_retrieving_docs(reader, iterator, producer, must_match, merr, delay);
start_retrieving_docs(reader, iterator, producer, merr, delay);
debug!("Finished reader #{}", reader);
})
.unwrap();
handles.push(handle);
}
drop(sequence);
drop(sender);
handles
}

fn start_archive_writers(
pool: &thread::Scope<'_>, params: &Backup, receiver: Receiver<Documents>,
fn start_archive_writers<'scope>(
pool: &'scope thread::Scope<'scope, '_>, params: &Backup, receiver: Receiver<Documents>,
progress: Sender<u64>, num_retrieve: u64,
) {
) -> Vec<thread::ScopedJoinHandle<'scope, ()>> {
let output_pat = params.get_archive_pattern(num_retrieve);
let max = params.archive_files;
let comp = params.archive_compression;

let mut handles = vec![];

for iw in 0..params.transfer.writers {
let consumer = receiver.clone();
let updater = progress.clone();
Expand All @@ -113,39 +128,41 @@ fn start_archive_writers(
let writer = iw;
let thread_name = format!("Writer_{}", writer);

pool.builder()
let handle = thread::Builder::new()
.name(thread_name)
.spawn(move |_| {
.spawn_scoped(pool, move || {
debug!("Started writer #{}", writer);
start_storing_docs(writer, dir, name, comp, max, consumer, updater);
debug!("Finished writer #{}", writer);
})
.unwrap();
handles.push(handle);
}
drop(receiver);
drop(progress);
handles
}

fn finish_writing(
ctrl_c: Arc<AtomicBool>, started: Instant, num_retrieve: u64, retrieved: u64, delay_after: u64,
fn finish_progress(
started: Instant, num_retrieve: u64, retrieved: u64, delay_after: u64,
) -> BoxedError {
let ctrl_c = monitor_term_sinal();
if ctrl_c.aborted() {
raise("# Execution aborted by user!")
} else {
let (r, n, s) = (retrieved, num_retrieve, started.elapsed());
info!("Downloaded {} of {} documents in {:?}.", r, n, s);
if retrieved > 0 {
wait_with_progress(delay_after, "Exporting documents to archives...");
wait_with_progress(delay_after, "Finished exporting documents to archives...");
}
let (r, n, s) = (retrieved, num_retrieve, started.elapsed());
info!("Downloaded {} of {} documents in {:?}.", r, n, s);
Ok(())
}
}

// region Channels

fn start_querying_core(
params: &Backup, schema: &SolrCore, generator: Sender<Step>, ctrl_c: &Arc<AtomicBool>,
) {
fn start_querying_core(params: &Backup, schema: &SolrCore, generator: Sender<Step>) {
let ctrl_c = monitor_term_sinal();
let core_fields = params.merge_core_fields(schema);

let slices: Slices<String> = params.get_slices();
Expand All @@ -157,11 +174,13 @@ fn start_querying_core(
if num_found == 0 {
continue;
}
let expected = if params.workaround_shards > 0 { num_found } else { 0 };
let num_retrieve = params.get_docs_to_retrieve(num_found);
let requests: Requests = params.get_requests_for_range(
retrieved,
num_retrieve,
&core_fields,
expected,
&range.begin,
&range.end,
);
Expand All @@ -177,8 +196,7 @@ fn start_querying_core(
}

fn start_retrieving_docs(
reader: u64, iterator: Receiver<Step>, producer: Sender<Documents>, must_match: u64,
max_errors: u64, delay: u64,
reader: u64, iterator: Receiver<Step>, producer: Sender<Documents>, max_errors: u64, delay: u64,
) {
let ctrl_c = monitor_term_sinal();
let mut error_count = 0;
Expand All @@ -190,7 +208,7 @@ fn start_retrieving_docs(
break;
}
let failed = match received {
Ok(step) => retrieve_docs_from_solr(reader, &producer, step, &mut client, must_match),
Ok(step) => retrieve_docs_from_solr(reader, &producer, step, &mut client),
Err(_) => true,
};
if failed {
Expand All @@ -210,10 +228,10 @@ fn start_retrieving_docs(
}

fn retrieve_docs_from_solr(
reader: u64, producer: &Sender<Documents>, step: Step, client: &mut SolrClient, must_match: u64,
reader: u64, producer: &Sender<Documents>, step: Step, client: &mut SolrClient,
) -> bool {
let query_url = step.url.as_str();
let response = fetch_docs_from_solr(reader, client, query_url, must_match);
let response = fetch_docs_from_solr(reader, client, query_url, step.expected);
match response {
Err(_) => true,
Ok(content) => {
Expand All @@ -234,7 +252,7 @@ fn retrieve_docs_from_solr(
}

fn fetch_docs_from_solr(
reader: u64, client: &mut SolrClient, query_url: &str, must_match: u64,
reader: u64, client: &mut SolrClient, query_url: &str, expected: u64,
) -> Result<String, ()> {
let mut times = 0;
loop {
Expand All @@ -245,11 +263,14 @@ fn fetch_docs_from_solr(
return Err(());
}
Ok(content) => {
if must_match > 0 {
if expected > 0 {
match SolrCore::parse_num_found(&content) {
Ok(num_found) => {
trace!("#{} got num_found {} not {}", times, num_found, must_match);
if must_match != num_found.to_u64() && times < 13 {
if expected != num_found.to_u64() && times < 13 {
debug!(
"#{} got num_found {} but expected {}",
times, num_found, expected
);
times += 1;
wait(times);
continue;
Expand Down
1 change: 1 addition & 0 deletions src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub(crate) struct Documents {
#[derive(Debug)]
pub(crate) struct Step {
pub curr: u64,
pub expected: u64,
pub url: String,
}

Expand Down
Loading
Loading