Astronomy Workflow: Idea to Implementation

Overview

Teaching: 30 min
Exercises: 30 min
Questions
  • How do I develop a workflow from scratch?

Objectives
  • Create a workflow from scratch and see it run.

The idea

We wish to create a workflow which will process observing data from some telescope, find candidate pulsars, and then run a ML on these candidates to label them as confirmed or rejected. Humans will then sort through the confirmed list to come up with a final decision on which are real and which are not.

The high level workflow therefore looks like this:

Initial workflow

As we think more about how the processing will be done we come up with some intermediate steps for the tasks:

Final workflow

In this final iteration we have indicated with arrows the dependencies of each task, but also where information needs to be passed. For example the “fold data” task needs the multi-frequency data from the “compile” task, as well as the candidate pulsar details from the “find” task in order to do a more detailed measurement of the properties.

The process

Step 1: Initial channel

Within the example files, is a set of files in the observations directory with names like obs_[1-9].dat. The meta-data for each file is stored in the file header. In our example we don’t have any actual data in the files, just meta-data, and the files look like this:

obs_1.dat

#freq: 10 MHz
#date: 2022-11-15
#point: 10deg_10deg

Our initial channel will be a list of all the files in the observations directory that look like obs_*.dat. We will leave the data directory as a user parameter with a default value of observations. We create our channel as follows:

params.data_dir = "observations"

all_obs = Channel.fromPath("${params.data_dir}/*.dat")

Step 2: Extracting meta-data

The first step in our workflow is to extract the metadata from the files. For this we define a process called get_meta. Since this will be the first process in the workflow we set it up to accept input as single files, and it will return a tuple of the frequency and pointing direction for the given file, as well as the file itself.

process get_meta {
    // Convert an input file into a stream with meta data
    // output will be [frequency, point, file]
    input:
    path(obs)

    output:
    tuple (env(FREQ), env(POINT), path(obs))

    script:
    """
    FREQ=\$(grep -e "#freq:"  ${obs} | awk '{print \$2}')
    POINT=\$(grep -e "#point:"  ${obs} | awk '{print \$2}')
    """
}

Things to note above:

To test how we are going so far we can create a stub of a workflow in the same file with some .view() commands on the various channels that we are using.

workflow {
    all_obs.view() // view channel
    get_meta(all_obs) // run process
    get_meta.out.view() // view the channel generated by the output
}

We run the above with nextflow run astro_wf.nf and it gives us the following:

$ nextflow run astro_wf.nf
N E X T F L O W  ~  version 22.10.1
Launching `astro_wf.nf` [zen_einstein] DSL2 - revision: 5338dcdf52
executor >  local (9)
[17/bcae92] process > get_meta (9) [100%] 9 of 9 ✔
/data/some/dir/observations/obs_4.dat
/data/some/dir/observations/obs_2.dat
/data/some/dir/observations/obs_9.dat
/data/some/dir/observations/obs_3.dat
/data/some/dir/observations/obs_7.dat
/data/some/dir/observations/obs_8.dat
/data/some/dir/observations/obs_1.dat
/data/some/dir/observations/obs_6.dat
/data/some/dir/observations/obs_5.dat
[30, 20deg_10deg, /data/some/dir/work/3d/e3663a45357b9818a547c2908701da/obs_6.dat]
[10, 20deg_10deg, /data/some/dir/work/24/69ad9546cacb3ff01b6199977ab80d/obs_4.dat]
[30, 30deg_10deg, /data/some/dir/work/e5/44d3d5698dc36454861601e4665a32/obs_9.dat]
[20, 30deg_10deg, /data/some/dir/work/bd/e29f1747bd01c8cc8f2278f29f1b17/obs_8.dat]
[20, 10deg_10deg, /data/some/dir/work/33/a37ad4d85131c77467126e0d361667/obs_2.dat]
[30, 10deg_10deg, /data/some/dir/work/0f/22afa02fd765ca828341a6aa91630d/obs_3.dat]
[10, 10deg_10deg, /data/some/dir/work/f4/c5618dd7b5aa66b23c3ce4c1ddc1d3/obs_1.dat]
[10, 30deg_10deg, /data/some/dir/work/19/8411e3effcb8eb6b13d1ae450b3256/obs_7.dat]
[20, 20deg_10deg, /data/some/dir/work/17/bcae92db0ed291c969217d979374a8/obs_5.dat]

We can see from the above that the input files are obs_?.dat, and that the get_meta process then turns these into the required tuple of [freq, pointing, file]. Note that when the files are passed to get_meta they are symlinked to a new work folder so the file names remain the same but the directory path is updated.

At this point we can see that the get_meta is working as intended so we can move on.

Step 3: Compiling multi-frequency data

The next step in the workflow is to combine all the files that share the same pointing direction. This will result a multi-frequency data set/file per pointing direction.

There are two things that we need to do here. The first is to take the output of the get_meta process and group it so that all the files that share a common pointing are now within a single tuple. The second is to create a new process that will combine the data sets.

The channel that comes from get_meta is shown above and you can see that it has one item per [freq, pointing, filename], with some of the freq and pointing overlapping. What we want instead is [[freq1, freq2, ...], pointing, [datafile1, datafile2, ...]], which means that instead of 9 items in the channel we have just three (corresponding to the three pointing directions). To achieve this goal we use the groupTuple method on the get_meta.out channel. By default groupTuple will use the first item in a tuple as the key, but we want it to use the pointing which is the second item (index=1). Therefore we modify our workflow to create the new channel called same_pointing, and remove the .view() on the all_obs channel.

workflow {
    // create metadata
    get_meta( all_obs )
    // collect all the files that have the same pointing
    get_meta.out.view()
    same_pointing = get_meta.out.groupTuple( by: 1 ).view()
}

We now re-run the workflow to make sure that we are getting the required channel ‘shape’.

$ nextflow run astro_wf.nf
N E X T F L O W  ~  version 22.10.1
Launching `astro_wf.nf` [fabulous_woese] DSL2 - revision: d4bc8976c3
executor >  local (9)
[8f/01c418] process > get_meta (9) [100%] 9 of 9 ✔
[30, 20deg_10deg, work/59/7cd29865c2ef0d56cd3ca8a3b824f6/obs_6.dat]
executor >  local (9)
[8f/01c418] process > get_meta (9) [100%] 9 of 9 ✔
[30, 20deg_10deg, work/59/7cd29865c2ef0d56cd3ca8a3b824f6/obs_6.dat]
[20, 10deg_10deg, work/cb/b4d7c2370b9fa1babba1dc4f2f1334/obs_2.dat]
[10, 30deg_10deg, work/03/a53409f45b318ba49990b8a33dc29c/obs_7.dat]
[30, 10deg_10deg, work/fe/039db662fe5bd31024c745412791d3/obs_3.dat]
[20, 30deg_10deg, work/26/760d76cf79bf2b970cfaf8d67ea797/obs_8.dat]
[10, 10deg_10deg, work/0d/76acb6d5e7b507af1ab4fe018f500f/obs_1.dat]
[30, 30deg_10deg, work/ee/596bb5bad24c60ab84c7efc74b1bd0/obs_9.dat]
[10, 20deg_10deg, work/42/cc5d006b6a514928ee17333de95e02/obs_4.dat]
[20, 20deg_10deg, work/8f/01c4180f254a1900dfe5135ccfbc50/obs_5.dat]
[[30, 10, 20], 20deg_10deg, [work/59/7cd29865c2ef0d56cd3ca8a3b824f6/obs_6.dat, work/42/cc5d006b6a514928ee17333de95e02/obs_4.dat, work/8f/01c4180f254a1900dfe5135ccfbc50/obs_5.dat]]
[[20, 30, 10], 10deg_10deg, [work/cb/b4d7c2370b9fa1babba1dc4f2f1334/obs_2.dat, work/fe/039db662fe5bd31024c745412791d3/obs_3.dat, work/0d/76acb6d5e7b507af1ab4fe018f500f/obs_1.dat]]
[[10, 20, 30], 30deg_10deg, [work/03/a53409f45b318ba49990b8a33dc29c/obs_7.dat, work/26/760d76cf79bf2b970cfaf8d67ea797/obs_8.dat, work/ee/596bb5bad24c60ab84c7efc74b1bd0/obs_9.dat]]

The order in which the files are being grouped is different for each pointing direction. Note, however that the mapping between the frequency and filenames are still correct (eg. obs_1.dat is paired with 10 in pointing 10deg_10deg).

Now that the channel is working we can set up a process to join the data for each pointing direction. We set the input to be a tuple with the same shape as the input channel. All process inputs can be a single value/file or a list of values/files. This flexibility means that this process would still work if we input more than three files to this process. In this example, the variable freqs will be a list of values, and obs will be a list of file objects.

Our dummy script is just going to concatenate all the observations into a single file with a name that is like obs_f1_f2_f3.dat. Of course in reality you’ll have some better way of doing this. Finally, our output is going to be the pointing direction and the file that was just generated.

process combine_frequencies {
    // Combine the files so the output has a single pointing with all the frequency information
    input:
    tuple (val(freqs), val(point), path(obs))

    output:
    tuple (val(point), path("obs*dat"))

    script:
    """
    cat ${obs} > obs_${freqs.join("_")}.dat
    """
}

Note that ${obs} and ${freqs} are both lists so we have access to the .join() function.

Let’s now update our workflow description to include the above, and modify the workflow section to show what is going into and coming out of the combine_frequencies process.

workflow {
    // create metadata
    get_meta( all_obs )
    // collect all the files that have the same pointing
    same_pointing = get_meta.out.groupTuple( by: 1 ).view()
    // Combine the frequencies so you have a single file with all frequencies
    combine_frequencies( same_pointing )
    combine_frequencies.out.view()
}

Our output is now:

$ nextflow run astro_wf.nf
N E X T F L O W  ~  version 22.10.1
Launching `astro_wf.nf` [romantic_rosalind] DSL2 - revision: cdeceec8f1
executor >  local (12)
[c4/19d3d3] process > get_meta (9)            [100%] 9 of 9 ✔
[17/f20f77] process > combine_frequencies (1) [100%] 3 of 3 ✔
[[30, 20, 10], 10deg_10deg, [/work/4b/89cc7d4a29c63036def3945a00dfdc/obs_3.dat, /work/f5/2c181e756aacb93620edf61db87781/obs_2.dat, /work/d7/8e86a29a4046159431e7021883ebb8/obs_1.dat]]
[[30, 20, 10], 30deg_10deg, [/work/d7/2ff98b2b97bf9535dbdc47bd90a9a9/obs_9.dat, /work/07/f118faf09efc8738034a4a944392a6/obs_8.dat, /work/77/d9299b3367c8ccbbf47e425abc6133/obs_7.dat]]
[[30, 10, 20], 20deg_10deg, [/work/42/3d0cd8f5f1a3fc9a088f4eade1fcb2/obs_6.dat, /work/4e/02d9f520d266ea5cb5160db6297105/obs_4.dat, /work/c4/19d3d3528fe39a03d0e673969d7239/obs_5.dat]]
[30deg_10deg, /work/52/6bded1ba8fabfaedb5fc1d8bab1ab5/obs_30_20_10.dat]
[20deg_10deg, /work/ac/525ce379788357bd0d996e4cecc444/obs_30_10_20.dat]
[10deg_10deg, /work/17/f20f775aa7866c31077070f1a8715e/obs_30_20_10.dat]

Let’s have a look at how NextFlow is doing the variable replacement for lists in the combine_frequencies process. To do this we open the working directory for the 10deg_10deg pointing (/work/17/f20f775aa7866c31077070f1a8715e/) and look at the .command.sh file.

#!/bin/bash -ue
cat obs_3.dat obs_2.dat obs_1.dat > obs_30_20_10.dat

Here you can see that ${obs} was expended to be obs_3.dat obs_2.dat obs_1.dat, and our tuple of frequencies [30,20,10] was replaced with 30_20_10 thanks to the .join("_") operator.

Step 4: Find candidates

We now have a channel with items like: [pointing, combined data file]. This is in the format that we want for searching for candidates so we don’t need to mutate the channel at all, and can go straight to making the new process. In our dummy script we are just making a random number of candidates using a bash script (the real processing is commented out). Since this script uses a lot of bash variables we can avoid escaping the $ by creating a shell instead of a script section (remember to use ''' not """).

Since the number of candidates that are found is random, and possibly zero, we indicate that the output is optional (remember to use path as file does not have this option). This means that if the process runs and there are no files matching cand*dat then no items are emitted into the output channel but the process still completes without failure.

process find_candidates {
    // Use a periodicity search to find events with significance above 6sigma
    input:
    tuple (val(point), path(obs))

    output:
    tuple (val(point), path("cand*dat"), optional: true)

    shell:
    '''
    #./find_periodic_cands.sh !{obs}

    # Random number from 0-3
    ncand=$(( $RANDOM % 4 ))
    echo $ncand
    for i in $(seq ${ncand}); do
        touch cand_${i}.dat
    done
    '''
}

Again we modify our workflow section so that we can see just the input/output for this process as we test and debug our workflow.

workflow {
    // create metadata
    get_meta( all_obs )
    // collect all the files that have the same pointing
    same_pointing = get_meta.out.groupTuple( by: 1 )
    // Combine the frequencies so you have a single file with all frequencies
    combine_frequencies( same_pointing )
    // Look for periodic signals with an fft
    combine_frequencies.out.view()
    find_candidates( combine_frequencies.out )
    find_candidates.out.view()
}

When I run I get the following output. Your output will differ because the number of candidates is random. Here you can see that I got zero candidates for the 20deg_10_deg pointing, so there are just two items in my output channel.

$ nextflow run astro_wf.nf
N E X T F L O W  ~  version 22.10.1
Launching `astro_wf.nf` [fervent_shockley] DSL2 - revision: 7a02379938
executor >  local (15)
[ac/dfbd2c] process > get_meta (2)            [100%] 9 of 9 ✔
[62/9e9dec] process > combine_frequencies (2) [100%] 3 of 3 ✔
[7e/d651a0] process > find_candidates (1)     [100%] 3 of 3 ✔
[20deg_10deg, /work/e4/f2898b75b400ed0fda4af3a53ced16/obs_10_30_20.dat]
[10deg_10deg, /work/9b/7b5d6f6c6e4c139c46bf23330756ff/obs_30_10_20.dat]
[30deg_10deg, /work/62/9e9decdb609c0cefd36586412f25f9/obs_30_10_20.dat]
[10deg_10deg, [/work/e3/3983bf3b27049a2fbd281786dab0ed/cand_1.dat, /work/e3/3983bf3b27049a2fbd281786dab0ed/cand_2.dat]]
[30deg_10deg, [/work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_1.dat, /work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_2.dat, /work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_3.dat]]

Step 5: Measure candidate properties

This next process requires that we combine two input channels. We need the multi frequency data files which are provided by the combine_frequencies process, as well as the corresponding pulsar candidates provided by the find_candidates process.

Recall the format for these two channels:

We want our new process to work on a single candidate at a time so that if there are two candidates for a single input file, we’ll run the process two times. Thus we want the input channel to look like [pointing, mf_file, candidate_file]. We could just use cross to join the two channels together using pointing as the key, however it will give only one item per pointing, rather than one item per candidate. Therefore we need to modify the find_candidates.out channel so that it has a single [pointing, candidate_file] pair per candidate. The NextFlow operator for this is transpose and we use it like this:

workflow {
    // create metadata
    get_meta( all_obs )
    // collect all the files that have the same pointing
    same_pointing = get_meta.out.groupTuple( by: 1 )
    // Combine the frequencies so you have a single file with all frequencies
    combine_frequencies( same_pointing )
    // Look for periodic signals with an fft
    find_candidates( combine_frequencies.out )
    find_candidates.out.view()
    // tranpose will "flatten" the cands so they have the format [ key, cand_file ]
    flattened_cands = find_candidates.out.transpose()
    flattened_cands.view()
}

This time I run the workflow using the -resume option so that I pick up from my previous run, and will get the same candidates as before.

$ nextflow run astro_wf.nf -resume
N E X T F L O W  ~  version 22.10.1
Launching `astro_wf.nf` [backstabbing_murdock] DSL2 - revision: fc5d3ca357
[4e/b890fe] process > get_meta (9)            [100%] 9 of 9, cached: 9 ✔
[9b/7b5d6f] process > combine_frequencies (3) [100%] 3 of 3, cached: 3 ✔
[e3/3983bf] process > find_candidates (3)     [100%] 3 of 3, cached: 3 ✔
[30deg_10deg, [/work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_1.dat, /work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_2.dat, /work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_3.dat]]
[10deg_10deg, [/work/e3/3983bf3b27049a2fbd281786dab0ed/cand_1.dat, /work/e3/3983bf3b27049a2fbd281786dab0ed/cand_2.dat]]
[30deg_10deg, /work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_1.dat]
[30deg_10deg, /work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_2.dat]
[30deg_10deg, /work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_3.dat]
[10deg_10deg, /work/e3/3983bf3b27049a2fbd281786dab0ed/cand_1.dat]
[10deg_10deg, /work/e3/3983bf3b27049a2fbd281786dab0ed/cand_2.dat]

Now we are in a position to join the two channels together with cross using the pointing as the key:

workflow {
    // create metadata
    get_meta( all_obs )
    // collect all the files that have the same pointing
    same_pointing = get_meta.out.groupTuple( by: 1 )
    // Combine the frequencies so you have a single file with all frequencies
    combine_frequencies( same_pointing )
    // Look for periodic signals with an fft
    find_candidates( combine_frequencies.out )
    // transpose will "flatten" the cands so they have the format [ key, cand_file ]
    flattened_cands = find_candidates.out.transpose()
    flattened_cands.view()
    combine_frequencies.out.view()
    // For each candidate file pair it with observation file
    cand_obs_pairs = combine_frequencies.out.cross(flattened_cands)
    cand_obs_pairs.view()
}

The output can get mixed up since the processes don’t execute in serial, so below I have reordered the outputs and added some comments (##) to show the different channels.

$ nextflow run astro_wf.nf -resume
N E X T F L O W  ~  version 22.10.1
Launching `astro_wf.nf` [festering_minsky] DSL2 - revision: 213dbb16a9
[4e/b890fe] process > get_meta (9)            [100%] 9 of 9, cached: 9 ✔
[9b/7b5d6f] process > combine_frequencies (1) [100%] 3 of 3, cached: 3 ✔
[e3/3983bf] process > find_candidates (2)     [100%] 3 of 3, cached: 3 ✔
## These are from flattened_cands = find_candidates.out.transpose()
[30deg_10deg, /work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_1.dat]
[30deg_10deg, /work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_2.dat]
[10deg_10deg, /work/e3/3983bf3b27049a2fbd281786dab0ed/cand_2.dat]
[30deg_10deg, /work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_3.dat]
[10deg_10deg, /work/e3/3983bf3b27049a2fbd281786dab0ed/cand_1.dat]
## These are from combine_frequencies.out
[30deg_10deg, /work/62/9e9decdb609c0cefd36586412f25f9/obs_30_10_20.dat]
[10deg_10deg, /work/9b/7b5d6f6c6e4c139c46bf23330756ff/obs_30_10_20.dat]
[30deg_10deg, /work/62/9e9decdb609c0cefd36586412f25f9/obs_30_10_20.dat]
[10deg_10deg, /work/9b/7b5d6f6c6e4c139c46bf23330756ff/obs_30_10_20.dat]
[20deg_10deg, /work/e4/f2898b75b400ed0fda4af3a53ced16/obs_10_30_20.dat]
## These are cand_obs_pairs = combine_frequencies.out.cross(flattened_cands)
[[30deg_10deg, /work/62/9e9decdb609c0cefd36586412f25f9/obs_30_10_20.dat], [30deg_10deg, /work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_2.dat]]
[[30deg_10deg, /work/62/9e9decdb609c0cefd36586412f25f9/obs_30_10_20.dat], [30deg_10deg, /work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_3.dat]]
[[10deg_10deg, /work/9b/7b5d6f6c6e4c139c46bf23330756ff/obs_30_10_20.dat], [10deg_10deg, /work/e3/3983bf3b27049a2fbd281786dab0ed/cand_1.dat]]
[[10deg_10deg, /work/9b/7b5d6f6c6e4c139c46bf23330756ff/obs_30_10_20.dat], [10deg_10deg, /work/e3/3983bf3b27049a2fbd281786dab0ed/cand_2.dat]]
[[30deg_10deg, /work/62/9e9decdb609c0cefd36586412f25f9/obs_30_10_20.dat], [30deg_10deg, /work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_1.dat]]

This is almost what we want. Note that we have a duplicated key (pointing) in the output from the cross operator. We could make a compilcated input for our new process, but it’s better to just manipulate the channel to remove the key and simplify the tuple.

The relevant part of our workflow is now:

    // For each candidate file pair it with observation file
    cand_obs_pairs = combine_frequencies.out.cross(flattened_cands)
        //reformat to remove redundant key
        .map{ [ it[0][0], it[0][1], it[1][1] ] }
        // [ pointing, obs_file, candidate_file ]
    cand_obs_pairs.view()

And when we run we get the following:

$ nextflow run astro_wf.nf -resume
N E X T F L O W  ~  version 22.10.1
Launching `astro_wf.nf` [naughty_ramanujan] DSL2 - revision: a493dd4e99
[4e/b890fe] process > get_meta (9)            [100%] 9 of 9, cached: 9 ✔
[e4/f2898b] process > combine_frequencies (1) [100%] 3 of 3, cached: 3 ✔
[e3/3983bf] process > find_candidates (2)     [100%] 3 of 3, cached: 3 ✔
## These are from flattened_cands = find_candidates.out.transpose()
[30deg_10deg, /work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_1.dat]
[30deg_10deg, /work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_2.dat]
[30deg_10deg, /work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_3.dat]
[10deg_10deg, /work/e3/3983bf3b27049a2fbd281786dab0ed/cand_1.dat]
[10deg_10deg, /work/e3/3983bf3b27049a2fbd281786dab0ed/cand_2.dat]
## These are from combine_frequencies.out
[20deg_10deg, /work/e4/f2898b75b400ed0fda4af3a53ced16/obs_10_30_20.dat]
[10deg_10deg, /work/9b/7b5d6f6c6e4c139c46bf23330756ff/obs_30_10_20.dat]
[30deg_10deg, /work/62/9e9decdb609c0cefd36586412f25f9/obs_30_10_20.dat]
## These are cand_obs_pairs = combine_frequencies.out.cross(flattened_cands).map{ [ it[0][0], it[0][1], it[1][1] ] }
[30deg_10deg, /work/62/9e9decdb609c0cefd36586412f25f9/obs_30_10_20.dat, /work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_1.dat]
[30deg_10deg, /work/62/9e9decdb609c0cefd36586412f25f9/obs_30_10_20.dat, /work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_2.dat]
[30deg_10deg, /work/62/9e9decdb609c0cefd36586412f25f9/obs_30_10_20.dat, /work/0a/039acf28cb01c737de9cd2f75c1cc8/cand_3.dat]
[10deg_10deg, /work/9b/7b5d6f6c6e4c139c46bf23330756ff/obs_30_10_20.dat, /work/e3/3983bf3b27049a2fbd281786dab0ed/cand_1.dat]
[10deg_10deg, /work/9b/7b5d6f6c6e4c139c46bf23330756ff/obs_30_10_20.dat, /work/e3/3983bf3b27049a2fbd281786dab0ed/cand_2.dat]

We now make another dummy process to work on the candidates. The resulting .dat file will normally contain some information about the candidate but for now it’s an empty file. Note that even though we can predict the name of the outfile, we can use *dat to capture whatever is generated. By default NextFlow will not copy files that are inputs (eg cand_1.dat or obs_30_10_20.dat) to the output so our glob approach is safe here. If we wanted the inputs to be included there is an option for that (includeInputs: true).

process fold_cands {
    // Fold the candidates on the given period and measure properties
    // for example: SNR, DM, p, pdot, intensity
    input:
    tuple (val(point), path(obs), path(cand))

    output:
    tuple (val(point), path("*dat"))

    script:
    """
    touch ${point}_${cand.baseName}.dat
    """
}

Now we can update our workflow to run this process and view the output channel.

workflow {
    // create metadata
    get_meta( all_obs )
    // collect all the files that have the same pointing
    same_pointing = get_meta.out.groupTuple( by: 1 )
    // Combine the frequencies so you have a single file with all frequencies
    combine_frequencies( same_pointing )
    // Look for periodic signals with an fft
    find_candidates( combine_frequencies.out )
    //find_candidates.out.view()
    // transpose will "flatten" the cands so they have the format [ key, cand_file ]
    flattened_cands = find_candidates.out.transpose()
    // For each candidate file pair it with observation file
    cand_obs_pairs = combine_frequencies.out.cross(flattened_cands)
        //reformat to remove redundant key
        .map{ [ it[0][0], it[0][1], it[1][1] ] }
        // [ pointing, obs_file, candidate_file ]
    // Process the candidate
    fold_cands( cand_obs_pairs )
    fold_cands.out.view()
}

Our output is looking a little more tidy this time.

$ nextflow run astro_wf.nf -resume
N E X T F L O W  ~  version 22.10.1
Launching `astro_wf.nf` [stupefied_watson] DSL2 - revision: 762707769d
executor >  local (5)
[4e/b890fe] process > get_meta (9)            [100%] 9 of 9, cached: 9 ✔
[62/9e9dec] process > combine_frequencies (1) [100%] 3 of 3, cached: 3 ✔
[0a/039acf] process > find_candidates (3)     [100%] 3 of 3, cached: 3 ✔
[b0/0f1f30] process > fold_cands (2)          [100%] 5 of 5 ✔
[30deg_10deg, /work/21/f0710dda186ddfe6212becc1197413/30deg_10deg_cand_2.dat]
[30deg_10deg, /work/c0/5c1866812c6be0f5ddd212b25e122b/30deg_10deg_cand_1.dat]
[30deg_10deg, /work/65/7124a1e4bacaf3256f5f0a1d9022f3/30deg_10deg_cand_3.dat]
[10deg_10deg, /work/94/314719324a38d621893efbe315ac2a/10deg_10deg_cand_1.dat]
[10deg_10deg, /work/b0/0f1f3005842f76d8c1a19e6c0fd004/10deg_10deg_cand_2.dat]

Step 6: Run some ML on the folded candidates

The output channel from the fold_cands is now nice and simple and it contains all the information we need to run our ML process. Again, we are using a bash script instead of the actual ML program, so we’ll use a shell. Note also that we now have two outputs that we are tracking, that they are now directories, and that both are optional. Furthermore, this is the last process in our workflow so we add a new directive to the process called publishDir which tells NextFlow that we want to copy the outputs from this process to a different directory (not in the work directory).

process ML_thing {
    // apply a machine learning algorithm to take the folded data and predict
    // real (positve) or fake (negative) candidates
    publishDir "cands/", mode: 'copy'

    input:
        tuple(val(point), path(candidates))

    output:
        path("positive/*"), optional: true
        path("negative/*"), optional: true

    shell:
    '''
    mkdir positive
    mkdir negative
    for f in !{candidates}; do
        if [ $(( $RANDOM % 2 )) == 0 ]; then
            mv $f positive/
        else
            mv $f negative/
        fi
    done
    '''
}

We can tack this onto the end of the workflow and run!

workflow {
    // create metadata
    get_meta( all_obs )
    // collect all the files that have the same pointing
    same_pointing = get_meta.out.groupTuple( by: 1 )
    // Combine the frequencies so you have a single file with all frequencies
    combine_frequencies( same_pointing )
    // Look for periodic signals with an fft
    find_candidates( combine_frequencies.out )
    //find_candidates.out.view()
    // transpose will "flatten" the cands so they have the format [ key, cand_file ]
    flattened_cands = find_candidates.out.transpose()
    // For each candidate file pair it with observation file
    cand_obs_pairs = combine_frequencies.out.cross(flattened_cands)
        //reformat to remove redundant key
        .map{ [ it[0][0], it[0][1], it[1][1] ] }
        // [ pointing, obs_file, candidate_file ]
    // Process the candidate
    fold_cands( cand_obs_pairs )
    // // Put the candidates through ML
    ML_thing( fold_cands.out )
}
$ nextflow run astro_wf.nf -resume
N E X T F L O W  ~  version 22.10.1
Launching `astro_wf.nf` [mad_swartz] DSL2 - revision: 2c5bab1faf
executor >  local (5)
[4e/b890fe] process > get_meta (9)            [100%] 9 of 9, cached: 9 ✔
[e4/f2898b] process > combine_frequencies (3) [100%] 3 of 3, cached: 3 ✔
[e3/3983bf] process > find_candidates (3)     [100%] 3 of 3, cached: 3 ✔
[c0/5c1866] process > fold_cands (3)          [100%] 5 of 5, cached: 5 ✔
[61/0371b5] process > ML_thing (5)            [100%] 5 of 5 ✔

We have removed all the .view() commands so the output here is much nicer to read. If we want to see our results we look in the direcory cands

$ tree cands
cands
├── negative
│   ├── 10deg_10deg_cand_1.dat
│   ├── 10deg_10deg_cand_2.dat
│   ├── 20deg_10deg_cand_1.dat
│   ├── 20deg_10deg_cand_2.dat
│   ├── 20deg_10deg_cand_3.dat
│   ├── 30deg_10deg_cand_1.dat
│   ├── 30deg_10deg_cand_2.dat
│   └── 30deg_10deg_cand_3.dat
└── positive
    ├── 10deg_10deg_cand_1.dat
    ├── 10deg_10deg_cand_3.dat
    ├── 20deg_10deg_cand_1.dat
    ├── 20deg_10deg_cand_2.dat
    ├── 30deg_10deg_cand_1.dat
    ├── 30deg_10deg_cand_2.dat
    └── 30deg_10deg_cand_3.dat

2 directories, 15 files

Viewing workflow meta-data

Create a .config file

This will create a bunch of useful analysis for your pipeline run when it completes. See next lesson for more about configuration files.

nextflow.config

// turn on all the juicy logging
trace.enabled = true
timeline.enabled = true
report.enabled = true
dag {
    enabled = true
    file='dag.png'
}

If we then rerun our workflow without the -resume option, we’ll get the following files in our directory:

The first thing we can look at is the dag.png file which is a directed analytic graph of our workflow. dag The graph isn’t super fancy, but you can see the processes as ellipses, the channels as linking lines, some with names, and circles on the channels where we have done some channel manipulation.

We can also review the report and timeline.

Key Points

  • get your channels right before working on the process

  • use -resume while developing to reduce execution time

  • report and timeline can be used to tune the execution once your workflow actually works