diff --git a/configs/nextflow.config b/configs/nextflow.config deleted file mode 100644 index 452f52c..0000000 --- a/configs/nextflow.config +++ /dev/null @@ -1,427 +0,0 @@ -/* -======================================================================================== - Generic Nextflow config file -======================================================================================== - Default config for all compute environments ----------------------------------------------------------------------------------------- -*/ - -// Manifest providing default pipeline description -// Every pipeline should override this -manifest { - name = "PAM pipeline" - author = 'PAM Informatics' - homePage = 'NA' - description = 'NA' - mainScript = 'main.nf' - version = 'NA' -} - -// Global default params -params { - // Input options - input = null - - // Boilerplate options - outdir = './results' - tracedir = "${params.outdir}/pipeline_info" - help = false - - // Max resource options - // Defaults only, expecting to be overwritten in profiles - max_memory = '4.GB' - max_cpus = 2 - max_time = '10000.h' - max_retries = 2 - retry_strategy = 'ignore' - - // LSF - queue_size = null - submit_rate_limit = null -} - -// Specify process resource requirements and escalation strategy - -/** -* Function to ensure that resource requirements don't go beyond -* a maximum limit -*/ -def check_max(obj, type) { - if (type == 'memory') { - try { - if (obj.compareTo(params.max_memory as nextflow.util.MemoryUnit) == 1) - return params.max_memory as nextflow.util.MemoryUnit - else - return obj - } catch (Exception e) { - println "ERROR - Could not compare ${obj} to params.max_memory ${params.max_memory}! Printing exception and using process.memory = ${obj}:" - println "${e.toString()}" - return obj - } - } else if (type == 'time') { - try { - if (obj.compareTo(params.max_time as nextflow.util.Duration) == 1) - return params.max_time as nextflow.util.Duration - else - return obj - } catch (Exception e) { - println "ERROR - Could not compare ${obj} to params.max_time ${params.max_time}! Printing exception and using process.time = ${obj}:" - println "${e.toString()}" - return obj - } - } else if (type == 'cpus') { - try { - return Math.min( obj, params.max_cpus as int ) - } catch (Exception e) { - println "ERROR - Could not compare ${obj} to params.max_cpus ${params.max_cpus}! Printing exception and using process.cpus = ${obj}:" - println "${e.toString()}" - return obj - } - } -} - -def escalate_linear(initial, task, multiplier=1) { - assert multiplier > 0 - return (initial * multiplier * task.attempt) -} - -def escalate_exp(initial, task, multiplier=1) { - assert multiplier > 0 - return (initial * (multiplier ** (task.attempt - 1))) -} - -def escalate_queue_time(queue, task) { - def queue_index = [ - "small": 0, - "normal": 1, - "long": 2, - "week": 3, - "basement": 4, - ] - def times = [30.m, 12.h, 48.h, 7.d, 30.d] - def index = queue_index[queue] + (task.attempt - 1) - if (index < 4) { - return times[index] - } - return times[4] -} - -def escalate_queue_time_by2(queue, task) { - def queue_index = [ - "small": 0, - "normal": 1, - "long": 2, - "week": 3, - "basement": 4, - ] - def times = [30.m, 12.h, 48.h, 7.d, 30.d] - def index = queue_index[queue] + Math.floor((task.attempt - 1) / 2) - if (index < 4) { - return times[index] - } - return times[4] -} - -def retry_strategy(task, max_retries) { - def MISC_EXIT_CODES = [ - "SIGKILL": 137, - "SIGTERM": 143, - "SIGABRT": 134, - "SIGSEGV": 139 - ].values() - - def SCALING_EXIT_CODES = [ - // see https://ssg-confluence.internal.sanger.ac.uk/pages/viewpage.action?pageId=101361150 - "SIGUSR2": 140, // LSF Runlimit exceeded or Out of memory Error; first signal allowing to quit cleanly - "SIGINT": 130, // LSF Runlimit exceeded or Out of memory Error or bkill; second signal sent shortly after the SIGUSR2 to make it quit if not done yet - "SIGKILLNOW": 9, // LSF Runlimit exceeded or Out of memory Error; apparently what LSF issues after a while once the above two signals have been sent and not acted upon. - // Apparently kills the job instantly so that the exit status has not got time to be written into an .exitcode file and is not reported to the Nextflow - // master process - therefore the value 9 is never seen here in practice; see below for actual value. - // source of issue reported here https://github.com/nextflow-io/nextflow/issues/2847 - "NOEXITCODE": Integer.MAX_VALUE // the default value of $task.exitStatus; this is what it is set to in the absence of an .exitcode file for the previous execution of the task, as would happen in the case mentioned above - ].values() - - if (task.attempt > max_retries) { - return 'ignore' - } - - switch(task.exitStatus) { - case {it in MISC_EXIT_CODES}: - // Ignore due to non-scalable error code - return 'ignore' - - case {it in SCALING_EXIT_CODES}: - // Retry with more memory and longer time limit - return 'retry' - - case {it == null}: - /* - If exitStatus is null as is the case on the first attempt return 'retry' - */ - return 'retry' - - default: - // Return the value of params.retry_strategy - return 'ignore' - } -} - -process { - // Defaults - cpus = { check_max( escalate_linear( 1, task ), 'cpus' ) } - memory = { check_max( escalate_linear( 1.GB, task ), 'memory' ) } - time = { check_max( escalate_linear( 1.h, task ), 'time' ) } - - maxErrors = -1 - maxRetries = params.max_retries - - errorStrategy = { retry_strategy(task, params.max_retries) } - - // Process-specific resource requirements - withLabel:cpu_1 { - cpus = { check_max( 1, 'cpus' ) } - } - - withLabel:cpu_2 { - cpus = { check_max( 2, 'cpus' ) } - } - - withLabel:cpu_4 { - cpus = { check_max( 4, 'cpus' ) } - } - - withLabel:cpu_8 { - cpus = { check_max( 8, 'cpus' ) } - } - - withLabel:cpu_16 { - cpus = { check_max( 16, 'cpus' ) } - } - - withLabel:cpu_32 { - cpus = { check_max( 32, 'cpus' ) } - } - - withLabel:cpu_64 { - cpus = { check_max( 64, 'cpus' ) } - } - - withLabel:mem_50M { - memory = { check_max( escalate_exp( 50.MB, task, 2 ), 'memory' ) } - } - - withLabel:mem_100M { - memory = { check_max( escalate_exp( 100.MB, task, 2 ), 'memory' ) } - } - - withLabel:mem_250M { - memory = { check_max( escalate_exp( 250.MB, task, 2 ), 'memory' ) } - } - - withLabel:mem_500M { - memory = { check_max( escalate_exp( 500.MB, task, 2 ), 'memory' ) } - } - - withLabel:mem_1 { - memory = { check_max( escalate_exp( 1.GB, task, 2 ), 'memory' ) } - } - - withLabel:mem_2 { - memory = { check_max( escalate_exp( 2.GB, task, 2 ), 'memory' ) } - } - - withLabel:mem_4 { - memory = { check_max( escalate_exp( 4.GB, task, 2 ), 'memory' ) } - } - - withLabel:mem_8 { - memory = { check_max( escalate_exp( 8.GB, task, 2 ), 'memory' ) } - } - - withLabel:mem_10 { - memory = { check_max( escalate_exp( 10.GB, task, 2 ), 'memory' ) } - } - - withLabel:mem_16 { - memory = { check_max( escalate_exp( 16.GB, task, 2 ), 'memory' ) } - } - - withLabel:mem_20 { - memory = { check_max( escalate_exp( 20.GB, task, 2 ), 'memory' ) } - } - - withLabel:mem_32 { - memory = { check_max( escalate_exp( 32.GB, task, 2 ), 'memory' ) } - } - - withLabel:mem_64 { - memory = { check_max( escalate_exp( 64.GB, task, 2 ), 'memory' ) } - } - - withLabel:mem_96 { - memory = { check_max( escalate_exp( 96.GB, task, 2 ), 'memory' ) } - } - - withLabel:mem_120 { - memory = { check_max( escalate_linear( 120.GB, task, 1.25 ), 'memory' ) } - } - - withLabel:time_30m { - time = { check_max( escalate_linear( 30.m, task ), 'time' ) } - } - withLabel:time_1 { - time = { check_max( escalate_linear( 1.h, task ), 'time' ) } - } - - withLabel:time_12 { - time = { check_max( escalate_linear( 12.h, task, 2 ), 'time' ) } - } - - withLabel:time_48 { - time = { check_max( escalate_linear( 48.h, task, 2 ), 'time' ) } - } - - withLabel:no_retry { - errorStrategy = 'ignore' - } - - withLabel:time_queue_from_small { - time = { check_max( escalate_queue_time( 'small', task ), 'time' ) } - } - - withLabel:time_queue_from_normal { - time = { check_max( escalate_queue_time( 'normal', task ), 'time' ) } - } - - withLabel:time_queue_from_long { - time = { check_max( escalate_queue_time( 'long', task ), 'time' ) } - } - - withLabel:time_queue_from_week { - time = { check_max( escalate_queue_time( 'week', task ), 'time' ) } - } - - withLabel:time_queue_from_small_slow2 { - time = { check_max( escalate_queue_time_by2( 'small', task ), 'time' ) } - } - - withLabel:time_queue_from_normal_slow2 { - time = { check_max( escalate_queue_time_by2( 'normal', task ), 'time' ) } - } - - withLabel:time_queue_from_long_slow2 { - time = { check_max( escalate_queue_time_by2( 'long', task ), 'time' ) } - } - - withLabel:time_queue_from_week_slow2 { - time = { check_max( escalate_queue_time_by2( 'week', task ), 'time' ) } - } -} - - -profiles { - debug { process.beforeScript = 'echo $HOSTNAME' } - - docker { - docker.enabled = true - docker.userEmulation = true - singularity.enabled = false - conda.enabled = false - } - - singularity { - singularity.enabled = true - singularity.autoMounts = true - docker.enabled = false - conda.enabled = false - } - - conda { - conda.enabled = true - singularity.enabled = false - docker.enabled = false - } - - sanger_local { - singularity { - enabled = true - runOptions = '--bind /data,/lustre,/nfs,/software' - // To avoid downloading/converting to singularity image every time - libraryDir = System.getenv("NEXTFLOW_SINGULARITY_LIBRARY") - } - } - - // Default profile - standard { - docker { - enabled = false - } - - singularity { - enabled = true - autoMounts = true - runOptions = '--bind /data,/lustre,/nfs,/software' - // To avoid downloading/converting to singularity image every time - libraryDir = System.getenv("NEXTFLOW_SINGULARITY_LIBRARY") - } - - process { - executor = 'lsf' - queue = { task.memory > 745.GB ? 'teramem' : - task.memory > 256.GB ? 'hugemem' : - task.time <= 30.m ? 'small' : - task.time <= 12.h ? 'normal' : - task.time <= 48.h ? 'long' : - task.time <= 168.h ? 'week' : - 'basement'} - } - - executor { - name = 'lsf' - perJobMemLimit = true - submitRateLimit = params.submit_rate_limit - queueSize = params.queue_size - jobName = { "PAM-info-pipeline - $task.name - $task.hash" } - } - - params { - // Max resources - max_memory = 2.9.TB - max_cpus = 256 - max_time = 720.h - } - } -} - -// Export these variables to prevent local Python/R libraries from conflicting with those in the container -env { - PYTHONNOUSERSITE = 1 - R_PROFILE_USER = "/.Rprofile" - R_ENVIRON_USER = "/.Renviron" -} - -// Capture exit codes from upstream processes when piping -process.shell = ['/bin/bash', '-euo', 'pipefail'] - -// Setup reports with uniform filename` -def trace_timestamp = new java.util.Date().format( 'yyyy-MM-dd_HH-mm-ss') - -timeline { - enabled = true - file = "${params.tracedir}/execution_timeline_${trace_timestamp}.html" -} - -report { - enabled = true - file = "${params.tracedir}/execution_report_${trace_timestamp}.html" -} - -trace { - enabled = true - file = "${params.tracedir}/execution_trace_${trace_timestamp}.txt" -} - -dag { - enabled = true - file = "${params.tracedir}/pipeline_dag_${trace_timestamp}.svg" -} diff --git a/configs/nextflow.config b/configs/nextflow.config new file mode 120000 index 0000000..4b6ae34 --- /dev/null +++ b/configs/nextflow.config @@ -0,0 +1 @@ +paminfo_common.config \ No newline at end of file diff --git a/configs/paminfo_common.config b/configs/paminfo_common.config new file mode 100644 index 0000000..452f52c --- /dev/null +++ b/configs/paminfo_common.config @@ -0,0 +1,427 @@ +/* +======================================================================================== + Generic Nextflow config file +======================================================================================== + Default config for all compute environments +---------------------------------------------------------------------------------------- +*/ + +// Manifest providing default pipeline description +// Every pipeline should override this +manifest { + name = "PAM pipeline" + author = 'PAM Informatics' + homePage = 'NA' + description = 'NA' + mainScript = 'main.nf' + version = 'NA' +} + +// Global default params +params { + // Input options + input = null + + // Boilerplate options + outdir = './results' + tracedir = "${params.outdir}/pipeline_info" + help = false + + // Max resource options + // Defaults only, expecting to be overwritten in profiles + max_memory = '4.GB' + max_cpus = 2 + max_time = '10000.h' + max_retries = 2 + retry_strategy = 'ignore' + + // LSF + queue_size = null + submit_rate_limit = null +} + +// Specify process resource requirements and escalation strategy + +/** +* Function to ensure that resource requirements don't go beyond +* a maximum limit +*/ +def check_max(obj, type) { + if (type == 'memory') { + try { + if (obj.compareTo(params.max_memory as nextflow.util.MemoryUnit) == 1) + return params.max_memory as nextflow.util.MemoryUnit + else + return obj + } catch (Exception e) { + println "ERROR - Could not compare ${obj} to params.max_memory ${params.max_memory}! Printing exception and using process.memory = ${obj}:" + println "${e.toString()}" + return obj + } + } else if (type == 'time') { + try { + if (obj.compareTo(params.max_time as nextflow.util.Duration) == 1) + return params.max_time as nextflow.util.Duration + else + return obj + } catch (Exception e) { + println "ERROR - Could not compare ${obj} to params.max_time ${params.max_time}! Printing exception and using process.time = ${obj}:" + println "${e.toString()}" + return obj + } + } else if (type == 'cpus') { + try { + return Math.min( obj, params.max_cpus as int ) + } catch (Exception e) { + println "ERROR - Could not compare ${obj} to params.max_cpus ${params.max_cpus}! Printing exception and using process.cpus = ${obj}:" + println "${e.toString()}" + return obj + } + } +} + +def escalate_linear(initial, task, multiplier=1) { + assert multiplier > 0 + return (initial * multiplier * task.attempt) +} + +def escalate_exp(initial, task, multiplier=1) { + assert multiplier > 0 + return (initial * (multiplier ** (task.attempt - 1))) +} + +def escalate_queue_time(queue, task) { + def queue_index = [ + "small": 0, + "normal": 1, + "long": 2, + "week": 3, + "basement": 4, + ] + def times = [30.m, 12.h, 48.h, 7.d, 30.d] + def index = queue_index[queue] + (task.attempt - 1) + if (index < 4) { + return times[index] + } + return times[4] +} + +def escalate_queue_time_by2(queue, task) { + def queue_index = [ + "small": 0, + "normal": 1, + "long": 2, + "week": 3, + "basement": 4, + ] + def times = [30.m, 12.h, 48.h, 7.d, 30.d] + def index = queue_index[queue] + Math.floor((task.attempt - 1) / 2) + if (index < 4) { + return times[index] + } + return times[4] +} + +def retry_strategy(task, max_retries) { + def MISC_EXIT_CODES = [ + "SIGKILL": 137, + "SIGTERM": 143, + "SIGABRT": 134, + "SIGSEGV": 139 + ].values() + + def SCALING_EXIT_CODES = [ + // see https://ssg-confluence.internal.sanger.ac.uk/pages/viewpage.action?pageId=101361150 + "SIGUSR2": 140, // LSF Runlimit exceeded or Out of memory Error; first signal allowing to quit cleanly + "SIGINT": 130, // LSF Runlimit exceeded or Out of memory Error or bkill; second signal sent shortly after the SIGUSR2 to make it quit if not done yet + "SIGKILLNOW": 9, // LSF Runlimit exceeded or Out of memory Error; apparently what LSF issues after a while once the above two signals have been sent and not acted upon. + // Apparently kills the job instantly so that the exit status has not got time to be written into an .exitcode file and is not reported to the Nextflow + // master process - therefore the value 9 is never seen here in practice; see below for actual value. + // source of issue reported here https://github.com/nextflow-io/nextflow/issues/2847 + "NOEXITCODE": Integer.MAX_VALUE // the default value of $task.exitStatus; this is what it is set to in the absence of an .exitcode file for the previous execution of the task, as would happen in the case mentioned above + ].values() + + if (task.attempt > max_retries) { + return 'ignore' + } + + switch(task.exitStatus) { + case {it in MISC_EXIT_CODES}: + // Ignore due to non-scalable error code + return 'ignore' + + case {it in SCALING_EXIT_CODES}: + // Retry with more memory and longer time limit + return 'retry' + + case {it == null}: + /* + If exitStatus is null as is the case on the first attempt return 'retry' + */ + return 'retry' + + default: + // Return the value of params.retry_strategy + return 'ignore' + } +} + +process { + // Defaults + cpus = { check_max( escalate_linear( 1, task ), 'cpus' ) } + memory = { check_max( escalate_linear( 1.GB, task ), 'memory' ) } + time = { check_max( escalate_linear( 1.h, task ), 'time' ) } + + maxErrors = -1 + maxRetries = params.max_retries + + errorStrategy = { retry_strategy(task, params.max_retries) } + + // Process-specific resource requirements + withLabel:cpu_1 { + cpus = { check_max( 1, 'cpus' ) } + } + + withLabel:cpu_2 { + cpus = { check_max( 2, 'cpus' ) } + } + + withLabel:cpu_4 { + cpus = { check_max( 4, 'cpus' ) } + } + + withLabel:cpu_8 { + cpus = { check_max( 8, 'cpus' ) } + } + + withLabel:cpu_16 { + cpus = { check_max( 16, 'cpus' ) } + } + + withLabel:cpu_32 { + cpus = { check_max( 32, 'cpus' ) } + } + + withLabel:cpu_64 { + cpus = { check_max( 64, 'cpus' ) } + } + + withLabel:mem_50M { + memory = { check_max( escalate_exp( 50.MB, task, 2 ), 'memory' ) } + } + + withLabel:mem_100M { + memory = { check_max( escalate_exp( 100.MB, task, 2 ), 'memory' ) } + } + + withLabel:mem_250M { + memory = { check_max( escalate_exp( 250.MB, task, 2 ), 'memory' ) } + } + + withLabel:mem_500M { + memory = { check_max( escalate_exp( 500.MB, task, 2 ), 'memory' ) } + } + + withLabel:mem_1 { + memory = { check_max( escalate_exp( 1.GB, task, 2 ), 'memory' ) } + } + + withLabel:mem_2 { + memory = { check_max( escalate_exp( 2.GB, task, 2 ), 'memory' ) } + } + + withLabel:mem_4 { + memory = { check_max( escalate_exp( 4.GB, task, 2 ), 'memory' ) } + } + + withLabel:mem_8 { + memory = { check_max( escalate_exp( 8.GB, task, 2 ), 'memory' ) } + } + + withLabel:mem_10 { + memory = { check_max( escalate_exp( 10.GB, task, 2 ), 'memory' ) } + } + + withLabel:mem_16 { + memory = { check_max( escalate_exp( 16.GB, task, 2 ), 'memory' ) } + } + + withLabel:mem_20 { + memory = { check_max( escalate_exp( 20.GB, task, 2 ), 'memory' ) } + } + + withLabel:mem_32 { + memory = { check_max( escalate_exp( 32.GB, task, 2 ), 'memory' ) } + } + + withLabel:mem_64 { + memory = { check_max( escalate_exp( 64.GB, task, 2 ), 'memory' ) } + } + + withLabel:mem_96 { + memory = { check_max( escalate_exp( 96.GB, task, 2 ), 'memory' ) } + } + + withLabel:mem_120 { + memory = { check_max( escalate_linear( 120.GB, task, 1.25 ), 'memory' ) } + } + + withLabel:time_30m { + time = { check_max( escalate_linear( 30.m, task ), 'time' ) } + } + withLabel:time_1 { + time = { check_max( escalate_linear( 1.h, task ), 'time' ) } + } + + withLabel:time_12 { + time = { check_max( escalate_linear( 12.h, task, 2 ), 'time' ) } + } + + withLabel:time_48 { + time = { check_max( escalate_linear( 48.h, task, 2 ), 'time' ) } + } + + withLabel:no_retry { + errorStrategy = 'ignore' + } + + withLabel:time_queue_from_small { + time = { check_max( escalate_queue_time( 'small', task ), 'time' ) } + } + + withLabel:time_queue_from_normal { + time = { check_max( escalate_queue_time( 'normal', task ), 'time' ) } + } + + withLabel:time_queue_from_long { + time = { check_max( escalate_queue_time( 'long', task ), 'time' ) } + } + + withLabel:time_queue_from_week { + time = { check_max( escalate_queue_time( 'week', task ), 'time' ) } + } + + withLabel:time_queue_from_small_slow2 { + time = { check_max( escalate_queue_time_by2( 'small', task ), 'time' ) } + } + + withLabel:time_queue_from_normal_slow2 { + time = { check_max( escalate_queue_time_by2( 'normal', task ), 'time' ) } + } + + withLabel:time_queue_from_long_slow2 { + time = { check_max( escalate_queue_time_by2( 'long', task ), 'time' ) } + } + + withLabel:time_queue_from_week_slow2 { + time = { check_max( escalate_queue_time_by2( 'week', task ), 'time' ) } + } +} + + +profiles { + debug { process.beforeScript = 'echo $HOSTNAME' } + + docker { + docker.enabled = true + docker.userEmulation = true + singularity.enabled = false + conda.enabled = false + } + + singularity { + singularity.enabled = true + singularity.autoMounts = true + docker.enabled = false + conda.enabled = false + } + + conda { + conda.enabled = true + singularity.enabled = false + docker.enabled = false + } + + sanger_local { + singularity { + enabled = true + runOptions = '--bind /data,/lustre,/nfs,/software' + // To avoid downloading/converting to singularity image every time + libraryDir = System.getenv("NEXTFLOW_SINGULARITY_LIBRARY") + } + } + + // Default profile + standard { + docker { + enabled = false + } + + singularity { + enabled = true + autoMounts = true + runOptions = '--bind /data,/lustre,/nfs,/software' + // To avoid downloading/converting to singularity image every time + libraryDir = System.getenv("NEXTFLOW_SINGULARITY_LIBRARY") + } + + process { + executor = 'lsf' + queue = { task.memory > 745.GB ? 'teramem' : + task.memory > 256.GB ? 'hugemem' : + task.time <= 30.m ? 'small' : + task.time <= 12.h ? 'normal' : + task.time <= 48.h ? 'long' : + task.time <= 168.h ? 'week' : + 'basement'} + } + + executor { + name = 'lsf' + perJobMemLimit = true + submitRateLimit = params.submit_rate_limit + queueSize = params.queue_size + jobName = { "PAM-info-pipeline - $task.name - $task.hash" } + } + + params { + // Max resources + max_memory = 2.9.TB + max_cpus = 256 + max_time = 720.h + } + } +} + +// Export these variables to prevent local Python/R libraries from conflicting with those in the container +env { + PYTHONNOUSERSITE = 1 + R_PROFILE_USER = "/.Rprofile" + R_ENVIRON_USER = "/.Renviron" +} + +// Capture exit codes from upstream processes when piping +process.shell = ['/bin/bash', '-euo', 'pipefail'] + +// Setup reports with uniform filename` +def trace_timestamp = new java.util.Date().format( 'yyyy-MM-dd_HH-mm-ss') + +timeline { + enabled = true + file = "${params.tracedir}/execution_timeline_${trace_timestamp}.html" +} + +report { + enabled = true + file = "${params.tracedir}/execution_report_${trace_timestamp}.html" +} + +trace { + enabled = true + file = "${params.tracedir}/execution_trace_${trace_timestamp}.txt" +} + +dag { + enabled = true + file = "${params.tracedir}/pipeline_dag_${trace_timestamp}.svg" +}