Nextflow is a workflow management system that allows you to build highly parallelizable & scalable computational pipelines
Advatanges
Advantages of using Nextflow (and workflow managers in general) is that they help make workflows more:
- Portable
- Thanks in part to it’s built-in support of containers Nextflow pipelines can be run in a portable manner across different instructure be it Cloud, local or HPC
- Reproducible
- Thanks in part to it’s built-in support of containers such as Docker & Singularity
- Scalable
- Thanks in part to Nextflow’s built-in parallelism as it’s built on the data flow programming model
There are other workflow management systems such as Snakemake, CWL and WDL, however, I am by far more familiar with Nextflow. Some of the main advantages of each are that Nextflow benefits from lots of community support (see nf-core), Snakemake is written in Python and CWL/WDL benefits from support from the Broad Institute.
Basics
The basic pipeline structure is as follows:
- Pipeline (main script)
- Channels
- Processes
- Input
- Output
- Script
- Configuration
See the Official documentation & Nextflow tutorial
Patterns
You can find Nextflow scripts for each of the examples here: https://github.com/PhilPalmer/docs/tree/master/computing/nextflow-scripts
Inspired by Nextflow patterns
Inputs
Split text input
Create a channel from a plain text file split line by line
Channel
.fromPath(params.regions)
.ifEmpty { exit 1, "Cannot find file : ${params.regions}" }
.splitText()
.map { it -> it.trim() }
.set { regions }
Run it:
nextflow run split_text_input.nf
CSV input
Create a channel from a CSV file input
Channel
.fromPath(params.vcf_paths)
.ifEmpty { exit 1, "Cannot find CSV file : ${params.vcf_paths}" }
.splitCsv(skip:1)
.map { sample_id,vcf,index -> [sample_id,file(vcf),file(index)] }
.set { vcfs }
Run it:
nextflow run csv_input.nf
Reusable channels
Nextflow consumes (queue) channels meaning that they are consumed on use. However, it is possible to prevent needing to duplicate channels by creating value channels instead like so!
inputChannel = Channel.value(file(params.input_path))
// Channel can be used multiple times
inputChannel.println()
inputChannel.println()
Run it:
nextflow run reusable_channels.nf
Channel duplication
Sometimes you may need to duplicate a channel. This is espeically true if it’s a channel which contains multiple values (and/or is a queue channel) because these are consumed by Nextflow on use
Channel
.fromPath(params.input_path)
.ifEmpty { exit 1, "${params.input_path} not found"}
.into { inputChannel; inputChannel1; inputChannel2 }
Run it:
nextflow run channel_duplication.nf
Get basename
Get the basename (i.e. the name filename minus the file extension) of a file in a channel. You can also use simpleName
to get everything prior to the first period (.
)
Channel
.fromPath(params.input_path)
.map { file -> [file.baseName, file] }
.ifEmpty { exit 1, "${params.input_path} not found"}
.set { inputChannel }
Run it:
nextflow run get_basename.nf
Input validation
if (!params.important_parameter) exit 1, "The params `--important_parameter` has not been set.\n\tPlease provide a valid value for this parameter"
Run it:
nextflow run input_validation.nf
Processes
Export bash varialbe to Nextflow
env()
Conditional input files
Nextflow does not like having conditional input files for processes. Fortunately you can use optional input files like so
optional_input_path = params.optional_input_path ? params.optional_input_path : 'data/no_file.txt'
Channel
.fromPath(optional_input_path)
.ifEmpty { exit 1, "${optional_input_path} not found"}
.set { optionalInputChannel }
process test {
echo true
input:
file(optional_input) from optionalInputChannel
script:
optional_flag = optional_input != 'no_file.txt' ? "--optional_input $optional_input" : ''
"""
some_command.sh $optional_flag
"""
}
Run it:
nextflow run optional_input.nf
Conditional flags
Here the optional_flag
will only be present if the user has set the optional_flag
Nextflow parameter, otherwise it will be equal to an empty string
process test {
echo true
script:
optional_flag = params.optional_flag ? "--optional_flag $params.optional_flag" : ''
"""
some_command.sh $optional_flag
"""
}
Run it:
nextflow run optional_flag.nf
Extra flags
If you have many conditional input parameters and prefer to store them in a single variable you can use something like extra_flags
here
process test {
echo true
script:
extra_flags = ''
if ( params.optional_flag ) { extra_flags += " --optional_flag ${params.optional_flag}" }
if ( params.optional_flag2 ) { extra_flags += " --optional_flag2 ${params.optional_flag2}" }
if ( params.optional_flag3 ) { extra_flags += " --optional_flag3 ${params.optional_flag3}" }
"""
some_command.sh $extra_flags
"""
}
Run it:
nextflow run extra_flags.nf
Collect
Collect can be used to group multiple files in the same channel and group them together. Here multiple files from the vcfs
channel are all used as input to the same process and added one per line to a plain text file
process test {
input:
file(vcfs) from vcfs.collect()
script:
"""
echo "${vcfs.join("\n")}" > vcfs.txt
"""
}
Run it:
nextflow run collect.nf
Transforming operators
Collect file
paired_info
.collectFile(name: "${params.outdir}/QC/tcga/paired_info.csv", keepHeader: true, skip: 1)
Reduce channel
vcfChannelReport = vcfChannelSnps.map { name, vcf -> vcf}
https://github.com/lifebit-ai/genetic-traits/blob/master/main.nf#L157
Complex mapping
Debugging
Printing channel
Touch files
Output
PublishDir
https://github.com/lifebit-ai/genetic-traits/blob/master/main.nf#L204-L210
Groovy
Last index of
Can be used to get the basename of a file from URL for example
Helper functions
// define helper functions
def isMode(mode) {
params.mode.toLowerCase().contains(mode)
}
def isTsv() {
params.reads.endsWith('tsv')
}
def get_pairs_simplename(simplename) {
simplename = simplename.endsWith('_1') ? simplename.substring(0, simplename.length() - 2) : simplename
simplename = simplename.endsWith('_R1') ? simplename.substring(0, simplename.length() - 3) : simplename
return simplename
}
Configuration
Genomes config
Cloud create
You can use Nextflow to launch an AWS instance (in this case one to run Dragen)
cloud-spot.config
:
aws {
accessKey = ''
secretKey = ''
region = 'eu-west-1'
}
cloud {
imageId = 'ami-0ba4b94467989e99a'
instanceType = 'f1.4xlarge'
userName = 'centos'
keyName = 'dragen'
bootStorageSize = '100 GB'
}
Run it:
nextflow -c cloud-spot.config cloud create cluster_name -c 1
Tips
nextflow console
log.info
Useful operators
groupTuple()
combine()
(by)
map()
Swiss army knife
Re-order or reduce channels (although Nextflow also provides a reduce
function)
set()
into()
.set()
splitCsv()
merge()
flatten()
choice()