Introduccion a Nextflow: Process

En el artículo anterior vimos una introducción al concepto de Channel. En este, vamos a profundizar un poco más en el concepto de process

WARNING

Como ya dijimos un process es la unidad básica en el DSL de Nextflow y usamos los canales para conectarlos, por lo que parecería lógico empezar explicandolos primero. Sin embargo he optado por explicar primero los canales porque así ahora podemos hacer cosas más interesantes con ellos

El process es la parte del DSL mediante la que definimos una entrada, una salida y una ejecución de sistema. Es decir, Nextflow es agnóstico de lo que quieras procesar (sólo requiere que se pueda ejecutar en Linux). Tú compones el/los comando(s) a ejecutar y él se encarga de orquestar las ejecuciones

De esta forma tu pipeline puede ser tan simple como buscar una cadena en un fichero (básicamente ejecutar un grep) hasta leer un fichero de gigas y procesarlos de forma paralela, ejecutando un Python por cada línea, etc y todo ello con la ventaja de que el mismo pipeline puedes ejecutarlo en tu local, en un contenedor docker, usando AWS Batch, Google, etc

Contando lineas de un fichero

Vamos a empezar definiendo un proceso simple:

uno.nf
process example {
   input:
      path 'entrada'

   output:
      stdout

   "wc -l $entrada"
}

workflow{
    example( params.param1 ) | view
}

Ejecutamos el pipeline

nextflow run uno.nf --param1 $(pwd)/uno.nf

y veremos que la salida nos dice el número de líneas del fichero uno.nf

N E X T F L O W  ~  version 22.04.5
Launching `uno.nf` [exotic_jones] DSL2 - revision: c79a922618
executor >  local (1)
[28/a1eb27] process > example [100%] 1 of 1 ✔
12 input

Como vemos, el proceso example requiere un fichero de entrada (dentro de su ejecución va a crear una variable entrada que podremos referenciarla dentro de la misma), y que va a emitir una salida tomándola del stdout del comando que va a ejecutar. Más adelante veremos qué otras posibilidades podemos usar

Comando

El comando a ejecutar se puede definir de varias formas:

  • La más simple es con un string al final de la definición del proceso

process example{
  .. ... ...

  'echo hola'
}

El comando que ejecutará example será un simple echo hola

  • Un string multilinea, usando triple comillas

process example{
  .. .. ...

  '''
  echo hola > f
  echo hi >> f
  echo ciao >> f
  cat f
  '''
}

Así mismo el comando a ejecutar puede ser "personalizado" usando los inputs del process (atención al uso de comillas dobles en lugar de simples!!) como en el caso de uno.nf:

"wc -l $entrada"

En este caso Nextflow ejecutará un wc sobre un fichero cuya ruta no está predefinida, sino que se usará la que se le proporcione al process. Esto es así por el uso de comillas dobles que lo que hace es que la cadena sea "compilada" y se use el resultado

Uniendo procesos

Como puedes imaginar, podemos "concatenar" process uniendo las salidas de uno con las entradas de otro (en realidad usando canales)

dos.nf
process lines {

   input:
      path 'input'

   output:
      stdout

   // wc es un programa de la shell
   "wc -l $input"
}

process cut {

   input:
      val lines

   output:
      stdout

   // cut es un programa de la shell
   // indicamos que use el espacio como delimitador y extraiga el primer campo
   // le pasamos una cadena con espacios como entrada, por eso la entrecomillamos
   "cut -d ' ' -f 1 <<< '$lines'"
}


workflow{
    lines( params.param1 )
    | cut
    | view
}

En este caso el workflow creará dos procesos, lines y cut. Este último permanecerá a la espera de recibir valores por su canal de entrada (espera recibir un simple objeto val y lo llamará 'line' ). Por su parte lines (el proceso, no confundir con la variable del proceso cut) permanecerá a la espera de recibir un fichero de entrada (al declarar un path como entrada)

Mediante el caracter pipe "|" indicamos al workflow cómo unir estos procesos de tal forma que la salida de uno sea la entrada de otro

Una vez que se inicia el worflow, Nextflow alimenta el canal params lo que hace que lines se ejecute y su salida alimente a cut

Paralelización. Divide y vencerás

El ejemplo anterior muestra cómo definimos los procesos así como el "flujo de trabajo". Como lines emite un sólo valor (el stdout generado por wc) el worflow es lineal.

En el ejemplo siguiente vamos a ver la capacidad de definir ejecuciones de procesos en paralelo. El ejemplo va a consistir en generar un fichero de N líneas, y por cada línea ejecutaremos un process que "haga algo" con ella (simplemente imprimirla por su stdout). Para poder comprobar la paralelización cada ejecución realizará primero una espera aleatoria mediante un random

tres.nf
process generarFicheroEntrada {
   input:
      val size
   output:
      path 'origin.txt'

   // el script puede ser multilinea
   // y podemos "mezclar" variables del proceso con las del script a ejecutar
   // si es variable del proceso no la escapamos con una barra: $size
   // si es variable del script la escapamos con la barra: \$x
   """
   for x in {0..$size}
   do
       echo \$x >> origin.txt
   done
   """
}


process simulate{
   input:
     val value
   output:
     stdout
   """
     sleep \$((RANDOM % 10))
     echo  $value
   """
}

workflow{
   simulate(
      generarFicheroEntrada(params.size).splitText()
    ) | view
}

Si por ejemplo ejecutas

nextflow run tres.nf --size=10

repetidas veces verás que la salida es diferente en cada ejecución. Esto es debido a que el proceso simulate es ejecutado en paralelo (en realidad todos los valores en paralelo a la vez no, sino en bloques que puedes definir como parametro de splitText)

El workflow de este ejemplo es sencillo de leer pero por detrás se están ejecutando conceptos bastante importantes:

  • se crea un process simulate que permanecerá a la espera de que se vayan generando valores en su canal de entrada. Mientras no reciba un EOF en el canal, el proceso se ejecutará repetidamente por cada valor en el mismo.

  • generarFicheroEntrada leerá de su canal de entrada un valor y enviará por el de salida un Path (no el contenido, sino un objeto Path que apunta al fichero).

  • splitText es un operador que actua sobre los valores emitidos por un canal. Lee sobre el canal de entrada (en este caso un Path) y emite valores, uno por cada linea

La definición del workflow es en realidad una Closure, un bloque de código, lo que permite definir el worflow de muchas formas. Por ejemplo el mismo workflow puede escribirse:

workflow{
   def fichero = generarFicheroEntrada(params.size)

   def split = fichero.splitText()

   simulate( split ).view()
}

Conclusión

En este post hemos visto cómo usar los process para ejecutar comandos de sistema dentro de un workflow. Básicamente su función es generar y ejecutar un fichero .sh con el comando que definas, integrando dicha ejecución en el flujo de trabajo permitiendo unir y controlar todas las ejecuciones.

También hemos visto una pequeña introducción al concepto de workflow el cual trataremos más adelante

Follow comments at Telegram group Or subscribe to the Channel Telegram channel

2019 - 2023 | Mixed with Bootstrap | Baked with JBake v2.6.7