Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Seqpro refactor #148

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 91 additions & 71 deletions sequence_processing_pipeline/ConvertJob.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,32 @@
from os.path import join, exists
from sequence_processing_pipeline.Job import Job
from sequence_processing_pipeline.PipelineError import (PipelineError,
JobFailedError)
import logging
import re
from jinja2 import BaseLoader, Environment, TemplateNotFound
import pathlib
from os.path import join, exists, getmtime


# taken from https://jinja.palletsprojects.com/en/3.0.x/api/#jinja2.BaseLoader
class KISSLoader(BaseLoader):
def __init__(self, path):
# pin the path for loader to the location sequence_processing_pipeline
# (the location of this file), along w/the relative path to the
# templates directory.
self.path = join(pathlib.Path(__file__).parent.resolve(), path)

def get_source(self, environment, template):
path = join(self.path, template)
if not exists(path):
raise TemplateNotFound(template)
mtime = getmtime(path)
with open(path) as f:
source = f.read()
return source, path, lambda: mtime == getmtime(path)


logging.basicConfig(level=logging.DEBUG)


class ConvertJob(Job):
Expand Down Expand Up @@ -39,12 +62,23 @@ def __init__(self, run_dir, output_path, sample_sheet_path, queue_name,
self.node_count = node_count
self.nprocs = nprocs
self.wall_time_limit = wall_time_limit

# TODO: This value is currently a string e.g.: '1gb' or '10gb' read
# in from the configuration json file. However this param should be
# changed to process_mem_in_gb or similar and the string changed to
# a numerical value.
self.pmem = pmem
self.bcl_tool = bcl_tool_path
self.qiita_job_id = qiita_job_id
# CHARLIE
self.job_script_path = join(self.output_path, f"{self.job_name}.sh")
self.suffix = 'fastq.gz'

# for projects that use sequence_processing_pipeline as a dependency,
# jinja_env must be set to sequence_processing_pipeline's root path,
# rather than the project's root path.
self.jinja_env = Environment(loader=KISSLoader('templates'))

tmp = False
for executable_name in ['bcl2fastq', 'bcl-convert']:
if executable_name in self.bcl_tool:
Expand All @@ -63,76 +97,62 @@ def __init__(self, run_dir, output_path, sample_sheet_path, queue_name,
self._generate_job_script()

def _generate_job_script(self):
"""
Generate a Torque job script for processing supplied root_directory.
:return: The path to the newly-created job-script.
"""
lines = []

lines.append("#!/bin/bash")
lines.append(f"#SBATCH --job-name {self.qiita_job_id}_{self.job_name}")
lines.append(f"#SBATCH -p {self.queue_name}")
lines.append(f'#SBATCH -N {self.node_count}')
lines.append(f'#SBATCH -n {self.nprocs}')
lines.append("#SBATCH --time %d" % self.wall_time_limit)

# send an email to the list of users defined below when a job starts,
# terminates, or aborts. This is used to confirm that the package's
# own reporting mechanism is reporting correctly.
lines.append("#SBATCH --mail-type=ALL")

# list of users to be contacted independently of this package's
# notification system, when a job starts, terminates, or gets aborted.
lines.append("#SBATCH --mail-user [email protected]")

lines.append(f"#SBATCH --mem-per-cpu {self.pmem}")

lines.append("set -x")
lines.append('date')
lines.append('hostname')
lines.append(f'cd {self.root_dir}')

if self.modules_to_load:
lines.append("module load " + ' '.join(self.modules_to_load))

# Assume that the bcl-convert tool is named 'bcl-convert' and choose
# accordingly.
if 'bcl-convert' in self.bcl_tool:
lines.append(('%s '
'--sample-sheet "%s" '
'--output-directory %s '
'--bcl-input-directory . '
'--bcl-num-decompression-threads 16 '
'--bcl-num-conversion-threads 16 '
'--bcl-num-compression-threads 16 '
'--bcl-num-parallel-tiles 16 '
'--bcl-sampleproject-subdirectories true '
'--force') % (self.bcl_tool,
self.sample_sheet_path,
self.output_path))

# equivalent cp for bcl-conversion (see below) needed.
else:
lines.append(('%s '
'--sample-sheet "%s" '
'--minimum-trimmed-read-length 1 '
'--mask-short-adapter-reads 1 '
'-R . '
'-o %s '
'--loading-threads 16 '
'--processing-threads 16 '
'--writing-threads 16 '
'--create-fastq-for-index-reads '
'--ignore-missing-positions ') %
(self.bcl_tool,
self.sample_sheet_path,
self.output_path))

with open(self.job_script_path, 'w') as f:
for line in lines:
# remove long spaces in some lines.
line = re.sub(r'\s+', ' ', line)
f.write(f"{line}\n")
# bypass generating job script for a force-fail job, since it is
# not needed.
if self.force_job_fail:
return None

template = self.jinja_env.get_template("convert_job.sh")

job_name = f'{self.qiita_job_id}_{self.job_name}'

with open(self.job_script_path, mode="w", encoding="utf-8") as f:
if 'bcl-convert' in self.bcl_tool:
cmd_line = (f'{self.bcl_tool} '
f'--sample-sheet "{self.sample_sheet_path}" '
f'--output-directory {self.output_path} '
'--bcl-input-directory . '
'--bcl-num-decompression-threads 16 '
'--bcl-num-conversion-threads 16 '
'--bcl-num-compression-threads 16 '
'--bcl-num-parallel-tiles 16 '
'--bcl-sampleproject-subdirectories true '
'--force')
# equivalent cp for bcl-conversion (see below) needed.
else:
cmd_line = (f'{self.bcl_tool} '
f'--sample-sheet "{self.sample_sheet_path}" '
'--minimum-trimmed-read-length 1 '
'--mask-short-adapter-reads 1 '
'-R . '
f'-o {self.output_path} '
'--loading-threads 16 '
'--processing-threads 16 '
'--writing-threads 16 '
'--create-fastq-for-index-reads '
'--ignore-missing-positions ')

params = {'job_name': job_name,
'queue_name': self.queue_name,
'node_count': self.node_count,
'nprocs': self.nprocs,
'wall_time_limit': self.wall_time_limit,
'mem_per_cpu': self.pmem,
'run_dir': self.root_dir,
'sheet_path': self.sample_sheet_path,
'cmd_line': cmd_line}

# generate a string of linux system modules to load before
# processing begins.
if self.modules_to_load:
# if {{modules_to_load}} is defined, not empty and not false,
# then the line "module load <modules to load>" will be
# added to the template.
params['modules_to_load'] = ' '.join(self.modules_to_load)

f.write(template.render(**params))

return self.job_script_path

def run(self, callback=None):
"""
Expand Down
3 changes: 1 addition & 2 deletions sequence_processing_pipeline/NuQCJob.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from jinja2 import BaseLoader, TemplateNotFound
from jinja2 import BaseLoader, Environment, TemplateNotFound
from metapool import load_sample_sheet
from os import stat, makedirs, rename
from os.path import join, basename, dirname, exists, abspath, getmtime
Expand All @@ -10,7 +10,6 @@
import logging
from sequence_processing_pipeline.Commands import split_similar_size_bins
from sequence_processing_pipeline.util import iter_paired_files
from jinja2 import Environment
import glob
import re
from sys import executable
Expand Down
17 changes: 17 additions & 0 deletions sequence_processing_pipeline/templates/convert_job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash
#SBATCH -J {{job_name}}
#SBATCH -p {{queue_name}}
#SBATCH -N {{node_count}}
#SBATCH -n {{nprocs}}
#SBATCH --time {{wall_time_limit}}
#SBATCH --mail-type=ALL
#SBATCH --mail-user [email protected]
#SBATCH --mem-per-cpu {{mem_per_cpu}}
set -x
date
hostname
cd {{run_dir}}
{% if modules_to_load %}
module load {{modules_to_load}}
{% endif %}
{{cmd_line}}
21 changes: 12 additions & 9 deletions sequence_processing_pipeline/tests/test_ConvertJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,7 @@ def tearDown(self):
rmtree(self.good_output_path)

def test_creation(self):
self.maxDiff = None
run_dir = self.base_path('211021_A00000_0000_SAMPLE')
inv_input_directory = self.base_path('inv_input_directory')
qiita_id = 'abcdabcdabcdabcdabcdabcdabcdabcd'
Expand All @@ -934,13 +935,15 @@ def test_creation(self):
'ConvertJob.sh')) as f:
obs = ''.join(f.readlines())

# ssp should be just the value of the self.path() partial function by
# itself. For readability, SCRIPT_EXP addresses the '/' separator.
# Hence, the trailing '/' is redundant and should be removed here.
self.assertEqual(obs,
SCRIPT_EXP.format(ssp=self.base_path('').rstrip('/'),
gop=self.good_output_path,
run_dir=run_dir))
# substitute variables in expected output with the run-time values
# that we expect.
exp = SCRIPT_EXP.replace("{run_dir}", run_dir).\
replace("{gop}", self.good_output_path).\
replace("{ssp}/", self.base_path(''))

# remove trailing whitespace from the ends of each parameter, since
# it's not important.
self.assertEqual(obs.rstrip(), exp.rstrip())

def test_error_msg_from_logs(self):
run_dir = self.base_path('211021_A00000_0000_SAMPLE')
Expand Down Expand Up @@ -998,7 +1001,7 @@ def test_parse_sample_sheet(self):

SCRIPT_EXP = ''.join([
'#!/bin/bash\n',
'#SBATCH --job-name abcdabcdabcdabcdabcdabcdabcdabcd_ConvertJob\n',
'#SBATCH -J abcdabcdabcdabcdabcdabcdabcdabcd_ConvertJob\n',
'#SBATCH -p qiita\n',
'#SBATCH -N 1\n',
'#SBATCH -n 16\n',
Expand All @@ -1009,7 +1012,7 @@ def test_parse_sample_sheet(self):
'set -x\n',
'date\n',
'hostname\n',
'cd {run_dir}\n',
'cd {run_dir}\n\n',
'tests/bin/bcl-convert --sample-sheet "{ssp}/good-sample-sheet.csv" '
'--output-directory {gop}/ConvertJob '
'--bcl-input-directory . '
Expand Down
Loading