diff --git a/.gitignore b/.gitignore old mode 100644 new mode 100755 diff --git a/Makefile b/Makefile old mode 100644 new mode 100755 diff --git a/README.md b/README.md old mode 100644 new mode 100755 diff --git a/apps/__init__.py b/apps/__init__.py old mode 100644 new mode 100755 diff --git a/apps/access/__init__.py b/apps/access/__init__.py old mode 100644 new mode 100755 index f4aa9a8..269bd28 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -5,76 +5,162 @@ from pathlib import Path import shutil import requests - +import re +import json FLAG_TO_APPS = { "dmpmanifest": ("access_manifest", "manifest"), + "bams": ("access legacy", "bam_qc"), "msi": ("access legacy MSI", "microsatellite_instability"), "cnv": ("access legacy CNV", "copy_number_variants"), "sv": ("access legacy SV", "structural_variants"), "snv": ("access legacy SNV", "small_variants"), - "bams": ("access legacy", "bam_qc"), - "nucleo": ("access nucleo", "bam_qc"), + "bams_xs2":("access v2 nucleo", "bam_qc"), + "qc_xs2":("access v2 nucleo qc", "quality_control"), + "qc_agg_xs2":("access v2 nucleo qc agg", "quality_control_aggregate"), + "snv_xs2":("access v2 legacy SNV", "small_variants"), + "msi_xs2":("access v2 legacy MSI", "microsatellite_instability"), + "cnv_xs2":("access v2 legacy CNV", "copy_number_variants"), + "sv_xs2":("access v2 legacy SV", "structural_variants"), + "dmpmanifest_ch": ("cmo_manifest", "manifest"), + "bams_ch": ("cmo-ch nucleo", "bams"), + "qc_ch": ("CMO-CH QC", "quality_control"), + "qc_agg_ch": ("CMO-CH QC Agg", "quality_control_aggregate"), + "chipvar_ch": ("CMO-CH Chip-Var","chipvar"), + "dmpmanifest_heme": ("cmo_manifest", "manifest"), + "bams_heme": ("heme nucleo", "bams"), + "qc_heme": ("heme nucleo qc", "quality_control"), + "qc_agg_heme": ("heme nucleo qc agg", "quality_control_aggregate"), + "chipvar_heme": ("Heme Chip-Var","chipvar") } +REQUEST_PATIENT_APPS = ['access v2 nucleo qc agg', 'cmo_manifest', "CMO-CH QC Agg", "heme nucleo qc agg",'JUNO PIPELINE: access v2 nucleo qc agg', 'JUNO PIPELINE: cmo_manifest', "JUNO PIPELINE: CMO-CH QC Agg", "JUNO PIPELINE: heme nucleo qc agg"] def access_commands(arguments, config): - print('Running ACCESS') - - request_ids, sample_id, apps, show_all_runs = get_arguments(arguments) - for request in request_ids: - tags = '{"cmoSampleIds":"%s"}' % sample_id if sample_id else '{"igoRequestId":"%s"}' % request - if arguments.get('link'): - for (app, app_version) in apps: - (app_name, directory) = FLAG_TO_APPS[app] - operator_run = get_operator_run(app_name, app_version, tags, config, show_all_runs) - if operator_run: - if arguments.get('--single-dir'): - if app == "bams": - link_bams_to_single_dir(operator_run, app, request, sample_id, arguments, config, show_all_runs) - else: - print("Apps other than bams not supported at this time") - else: - link_app(operator_run, directory, request, sample_id, arguments, config, show_all_runs) - - if arguments.get('link-patient'): - for (app, app_version) in apps: - (app_name, directory) = FLAG_TO_APPS[app] - operator_run = get_operator_run(app_name, app_version, tags, config, show_all_runs) - if operator_run: - if(app == "bams"): - link_bams_by_patient_id(operator_run, "bams", request, sample_id, arguments, config, show_all_runs) + print('Running Linking') + request_ids, sample_id, apps, uncompleted_runs, app_prefix = get_arguments(arguments) + if arguments.get('--all-requests') and not arguments.get('--request-ids'): + request = None + # request = request_ids[0] + link_runs(apps, arguments, request, sample_id, config, uncompleted_runs, app_prefix) + elif arguments.get('--request-ids') and not arguments.get('--all-requests'): + for request in request_ids: + link_runs(apps, arguments, request, sample_id, config, uncompleted_runs, app_prefix) + else: + raise ValueError("Must provide either --all-requests or --request-ids") + + +def link_runs(apps, arguments, request, sample_id, config, uncompleted_runs, app_prefix=''): + if arguments.get('link'): + for (app, app_version) in apps: + (app_name, directory) = FLAG_TO_APPS[app] + app_name = f'{app_prefix} {app_name}' + all_requests = arguments.get("--all-requests") or False + tags = build_params(sample_id, request, app_name,all_requests, use_json_tags=False) + operator_runs = get_operator_run(app_name, arguments, app_version, tags, config, uncompleted_runs) + if operator_runs: + if arguments.get('--single-dir'): + if app == "bams": + link_bams_to_single_dir(operator_runs, app, request, sample_id, arguments, config, uncompleted_runs) else: - link_single_sample_workflows_by_patient_id(operator_run, directory, request, sample_id, arguments, - config, show_all_runs) - -def get_operator_run(app_name, app_version=None, tags=None, config=None, show_all_runs=False): - latest_operator_run = { - "tags": tags, + print("Apps other than bams not supported at this time") + else: + link_app(operator_runs, directory, request, sample_id, arguments, config, uncompleted_runs) + + if arguments.get('link-patient'): + for (app, app_version) in apps: + (app_name, directory) = FLAG_TO_APPS[app] + if app_prefix: + app_name = f'{app_prefix} {app_name}' + all_requests = arguments.get("--all-requests") or False + tags = build_params(sample_id, request, app_name,all_requests, use_json_tags=False) + operator_runs = get_operator_run(app_name, arguments, app_version, tags, config, uncompleted_runs) + # operator_runs = operator_runs[0:2] + if operator_runs: + if(app in ["bams", "bams_xs2", "bams_ch", "bams_heme"]): + link_bams_by_patient_id(operator_runs, "bams", request, sample_id, arguments, config, uncompleted_runs) + else: + link_single_sample_workflows_by_patient_id(operator_runs, directory, request, sample_id, arguments, + config, uncompleted_runs) + +def build_params(sample_id, request, app, all_requests, use_json_tags): + params = { "status": "COMPLETED", - "page_size": 1, - "app_name": app_name + "page_size": "1", + "app_name": app, } - if show_all_runs: + if all_requests: + return params + + tags = {} + if sample_id: + tags["cmoSampleIds"] = sample_id + if request: + tags["igoRequestId"] = request + + if tags: + params["tags"] = json.dumps(tags) + + return params + + + + return params +def get_operator_run(app_name, arguments, app_version=None, tags=None, config=None, uncompleted_runs=False): + should_delete = arguments.get("--delete") or False + all_runs = arguments.get("--all-runs") or False + all_requests = arguments.get("--all-requests") or False + + latest_operator_run = tags + # operator_look_ahead(latest_operator_run, config, tags) + if uncompleted_runs: latest_operator_run.pop("status") if app_version: latest_operator_run["app_version"] = app_version + response = requests.get(urljoin(config['beagle_endpoint'], config['api']['operator-runs']), headers={'Authorization': 'Bearer %s' % config['token']}, params=latest_operator_run) - - latest_runs = response.json()["results"] + try: + latest_runs = response.json()["results"] + except: + breakpoint() if not latest_runs: if "igoRequestId" in tags: new_tag = tags.replace("igoRequestId", "requestId") - return get_operator_run(app_name, app_version, tags=new_tag, config=config) + return get_operator_run(app_name, arguments, app_version, tags=new_tag, config=config) else: print("There are no completed operator runs for this request in the following app: %s:%s" % (str(app_name), str(app_version)), file=sys.stderr) return None + if (all_runs and should_delete) or all_requests: + next_response = response.json()['next'] + while next_response: + response = requests.get(next_response, headers={'Authorization': 'Bearer %s' % config['token']}) + next_response = response.json()['next'] + latest_runs.extend(response.json()["results"]) + return latest_runs + +def operator_look_ahead(latest_operator_run, config, tags): + threshold = float(.90) + complete = float(1.0) + try: + request_id = json.loads(tags)['igoRequestId'] + except: + request_id = json.loads(tags)['requestId'] + latest_operator_run_look_ahead = latest_operator_run.copy() + latest_operator_run_look_ahead.pop("status") + response_look_ahead = requests.get(urljoin(config['beagle_endpoint'], config['api']['operator-runs']), + headers={'Authorization': 'Bearer %s' % config['token']}, + params=latest_operator_run) + if response_look_ahead.json()["results"]: + total_r = response_look_ahead.json()["results"][0]["num_total_runs"] + completed_r = response_look_ahead.json()["results"][0]["num_completed_runs"] + percent_c = completed_r / total_r + if (percent_c >= threshold) and (percent_c < complete): + print(f"Warning there is a more recent operator run for request {request_id} that is incomplete, but with {percent_c} of runs completed. This may be the operator run you need for analysis. Consult the request's operator run history and consider using the --all-runs flag if appropriate.") - return latest_runs[0] def open_request_file(request_ids_file): try: @@ -94,7 +180,12 @@ def get_arguments(arguments): request_ids_file = arguments.get('--request-ids-file') sample_id = arguments.get('--sample-id') app_tags = arguments.get('--apps') - show_all_runs = arguments.get('--all-runs') or False + uncompleted_runs = arguments.get('--uncompleted-runs') or False + all_runs = arguments.get('--all-runs') + delete = arguments.get('--delete') or False + app_prefix = arguments.get('--app-prefix') or None + if all_runs and delete is False: + raise ValueError("The --all-runs flag must be used with the --delete flag to avoid accidental linking of multiple runs.") if request_ids_file: request_ids = open_request_file(request_ids_file) apps = [] # [(tag, version), ...] @@ -105,23 +196,36 @@ def get_arguments(arguments): else: apps.append((r[0], None)) - return request_ids, sample_id, apps, show_all_runs - - -def get_runs(operator_run_id, config, show_all_runs): - run_params = { - "operator_run": operator_run_id, - "page_size": 1000, - "status": "COMPLETED" - } - - if show_all_runs: - run_params.pop("status") - - response = requests.get(urljoin(config['beagle_endpoint'], config['api']['run']), - headers={'Authorization': 'Bearer %s' % config['token']}, params=run_params) + return request_ids, sample_id, apps, uncompleted_runs, app_prefix + + +def get_runs(operator_run_id, config, uncompleted_runs, all_requests=False): + response_rslts = [] + seen_request_ids = {} + request_skips = set(["11674_L", "11674_P", "11674_K", "11674_Q_custom"]) + for oprn in operator_run_id: + run_params = { + "operator_run": oprn['id'], + "page_size": 1000, + "status": "COMPLETED" + } + + if uncompleted_runs: + run_params.pop("status") + + response = requests.get(urljoin(config['beagle_endpoint'], config['api']['run']), + headers={'Authorization': 'Bearer %s' % config['token']}, params=run_params) + + if all_requests: + request_id = response.json()["results"][0]['request_id'] + date = response.json()["results"][0]['created_date'] + if (request_id in seen_request_ids and seen_request_ids.get(request_id) > date) or (request_id in request_skips): + continue + else: + seen_request_ids[request_id] = date + response_rslts = response_rslts + response.json()["results"] - return response.json()["results"] + return response_rslts def get_run_by_id(run_id, config): response = requests.get(urljoin(config['beagle_endpoint'], config['api']['run'] + run_id), @@ -133,67 +237,97 @@ def get_run_by_id(run_id, config): def get_files_by_run_id(run_id, config): response = requests.get(urljoin(config['beagle_endpoint'], config['api']['run'] + run_id), headers={'Authorization': 'Bearer %s' % config['token']}) - return response.json()["outputs"] def get_file_path(file): return file["location"][7:] -def link_app(operator_run, directory, request_id, sample_id, arguments, config, show_all_runs): - version = arguments.get("--dir-version") or operator_run["app_version"] +def link_app(operator_runs, directory, request_id, sample_id, arguments, config, uncompleted_runs): + version = arguments.get("--dir-version") should_delete = arguments.get("--delete") or False - - path = Path("./") - path_without_version = path / ("Project_" + request_id) / directory - path = path_without_version / version - path.mkdir(parents=True, exist_ok=True, mode=0o755) - - runs = get_runs(operator_run["id"], config, show_all_runs) + all_requests = arguments.get("--all-requests") or False + runs = get_runs(operator_runs, config, uncompleted_runs, all_requests) + if not runs: return - files = [] # (sample_id, /path/to/file) for run_meta in runs: + if all_requests: + request_id = run_meta["request_id"] + if request_id_prev != request_id: + try: + os.symlink(path.absolute(), path_without_version / "current") + except: + print(f'Could not create current folder for: {path.absolute()}') + request_id_prev = request_id + path = Path("./") + if request_id is None: + pass + else: + path_without_version = path / ("Project_" + request_id) / directory + path = path_without_version / version + + path.mkdir(parents=True, exist_ok=True, mode=0o755) run = get_run_by_id(run_meta["id"], config) if should_delete: try: os.unlink(path / run["id"]) print((path / run["id"]).absolute(), file=sys.stdout) + if len(os.listdir(path)) == 0: + print("no remaining runs in version, deleting directory {} ".format(path), file=sys.stderr) + print(f"deleting {path}") + shutil.rmtree(path) except Exception as e: print("could not delete symlink: {} ".format(path / run["id"]), file=sys.stderr) else: try: + if os.path.islink(path / run["id"]) and not os.path.exists(path / run["id"]): + run_path = path / run["id"] + print(f"Removing broken symlink for: {run_path}") + os.remove(path / run["id"]) os.symlink(run["output_directory"], path / run["id"]) print((path / run["id"]).absolute(), file=sys.stdout) except Exception as e: + breakpoint() print("could not create symlink from '{}' to '{}'".format(run["output_directory"], path / run["id"]), file=sys.stderr) - - try: + try: os.unlink(path_without_version / "current") - except: + except: pass - if not should_delete: os.symlink(path.absolute(), path_without_version / "current") return "Completed" -def link_single_sample_workflows_by_patient_id(operator_run, directory, request_id, sample_id, arguments, config, show_all_runs): - version = arguments.get("--dir-version") or operator_run["app_version"] +def link_single_sample_workflows_by_patient_id(operator_runs, directory, request_id, sample_id, arguments, config, uncompleted_runs): + version = arguments.get("--dir-version") or operator_runs["app_version"] should_delete = arguments.get("--delete") or False - + all_requests = arguments.get("--all-requests") or False path = Path("./") / directory - - runs = get_runs(operator_run["id"], config, show_all_runs) + runs = get_runs(operator_runs, config, uncompleted_runs, all_requests) + seen_request_ids = set() if not runs: return for run_meta in runs: run = get_run_by_id(run_meta["id"], config) - if operator_run['app_name'] == 'cmo_manifest': - sample_path = path / request_id + sample_key = None + + if (operator_runs[0]['app_name'] in REQUEST_PATIENT_APPS) and (run_meta["request_id"] not in seen_request_ids): + sample_path = path / run_meta["request_id"] + seen_request_ids.add(sample_path) else: - sample_id = run["tags"]["cmoSampleIds"][0] if isinstance(run["tags"]["cmoSampleIds"], list) else run["tags"]["cmoSampleIds"] + if "cmoSampleIds" in run["tags"].keys(): + sample_key = "cmoSampleIds" + sample_id = run["tags"][sample_key][0] if isinstance(run["tags"][sample_key], list) else run["tags"][sample_key] + elif "cmoSampleName" in run["tags"].keys(): + sample_key = "cmoSampleName" + sample_id = run["tags"][sample_key][0] if isinstance(run["tags"][sample_key], list) else run["tags"][sample_key] + elif "cmoSampleId" in run["tags"].keys(): + sample_key = "cmoSampleId" + sample_id = run["tags"][sample_key][0] if isinstance(run["tags"][sample_key], list) else run["tags"][sample_key] + else: + raise LookupError(f'Operator run {run["id"]} is missing Sample Meta Data') a, b, _ = sample_id.split("-", 2) patient_id = "-".join([a, b]) sample_path = path / patient_id / sample_id @@ -204,13 +338,22 @@ def link_single_sample_workflows_by_patient_id(operator_run, directory, request_ try: os.unlink(sample_version_path) print(sample_version_path.absolute(), file=sys.stdout) + if len(os.listdir(sample_path)) == 0: + print("no remaining sample versions, deleting sample directory {} ".format(sample_path), file=sys.stderr) + print(f"deleting {sample_path}") + shutil.rmtree(sample_path) except Exception as e: print("could not delete symlink: {} ".format(sample_version_path), file=sys.stderr) else: try: + if os.path.islink(sample_version_path) and not os.path.exists(sample_version_path): + print(f"Removing broken symlink for: {sample_version_path}") + os.remove(sample_version_path) os.symlink(run["output_directory"], sample_version_path) print(sample_version_path.absolute(), file=sys.stdout) + except Exception as e: + breakpoint() print("could not create symlink from '{}' to '{}'".format(sample_version_path.absolute(), run["output_directory"]), file=sys.stderr) try: @@ -223,22 +366,20 @@ def link_single_sample_workflows_by_patient_id(operator_run, directory, request_ return "Completed" -def link_bams_to_single_dir(operator_run, directory, request_id, sample_id, arguments, config, show_all_runs): - version = arguments.get("--dir-version") or operator_run["app_version"] - +def link_bams_to_single_dir(operator_runs, directory, request_id, sample_id, arguments, config, uncompleted_runs): + version = arguments.get("--dir-version") + all_requests = arguments.get("--all-requests") or False path = Path("./") / directory / ("Project_" + request_id) - runs = get_runs(operator_run["id"], config, show_all_runs) + runs = get_runs(operator_runs, config, uncompleted_runs, all_requests) if not runs: return files = [] # (sample_id, /path/to/file) - for run in runs: for file_group in get_files_by_run_id(run["id"], config): files = files + find_files_by_sample(file_group["value"], sample_id=sample_id) - accepted_file_types = ['.bam', '.bai'] for (sample_id, file) in files: file_path = get_file_path(file) @@ -257,11 +398,16 @@ def link_bams_to_single_dir(operator_run, directory, request_id, sample_id, argu sample_path = path sample_version_path = sample_path / version sample_version_path.mkdir(parents=True, exist_ok=True, mode=0o755) + full_path = sample_version_path / file_name try: + if os.path.islink(full_path) and not os.path.exists(full_path): + print(f"Removing broken symlink for: {full_path}") + os.remove(full_path) os.symlink(file_path, sample_version_path / file_name) print((sample_version_path / file_name).absolute(), file=sys.stdout) except Exception as e: + breakpoint() print("Could not create symlink from '{}' to '{}'".format(sample_version_path / file_name, file_path), file=sys.stderr) continue @@ -277,24 +423,29 @@ def link_bams_to_single_dir(operator_run, directory, request_id, sample_id, argu return "Completed" -def link_bams_by_patient_id(operator_run, directory, request_id, sample_id, arguments, config, show_all_runs): - version = arguments.get("--dir-version") or operator_run["app_version"] +def link_bams_by_patient_id(operator_runs, directory, request_id, sample_id, arguments, config, uncompleted_runs): + version = arguments.get("--dir-version") should_delete = arguments.get("--delete") or False + all_requests = arguments.get("--all-requests") or False path = Path("./") / directory - - runs = get_runs(operator_run["id"], config, show_all_runs) + runs = get_runs(operator_runs, config, uncompleted_runs, all_requests) if not runs: return + + add_bai=False + if len(runs) == 1: + manual_name="Run Access Legacy Fastq to Bam (file outputs) - Manual" + if manual_name in runs[0]["name"]: + add_bai=True files = [] # (sample_id, /path/to/file) - for run in runs: for file_group in get_files_by_run_id(run["id"], config): files = files + find_files_by_sample(file_group["value"], sample_id=sample_id) - accepted_file_types = ['.bam', '.bai'] + seen_paths = set() for (sample_id, file) in files: file_path = get_file_path(file) _, file_ext = os.path.splitext(file_path) @@ -311,20 +462,50 @@ def link_bams_by_patient_id(operator_run, directory, request_id, sample_id, argu sample_path = path / patient_id / sample_id sample_version_path = sample_path / version + full_path = sample_version_path / file_name + file_name_index = file_name.replace(".bam", ".bai") + full_path_index = sample_version_path / file_name_index + + if (sample_version_path / file_name) in seen_paths: + breakpoint() + # 11674_L and 11674_K are old runs with bad sample names + print(f"Skipping duplicate file {file_path} for sample {sample_id}", file=sys.stderr) + continue + sample_version_path.mkdir(parents=True, exist_ok=True, mode=0o755) if should_delete: try: shutil.rmtree(sample_version_path) print(sample_version_path.absolute(), file=sys.stdout) + # delete if there are no other versions remaining + # if len(os.listdir(sample_path)) == 1: + # if os.path.islink(sample_path / os.listdir(sample_path)[0]) and not os.path.exists(sample_path / os.listdir(sample_path)[0]): + # print("no remaining sample versions, deleting sample directory {} ".format(sample_path), file=sys.stderr) + # shutil.rmtree(sample_path) + if len(os.listdir(sample_path)) == 0: + print("no remaining sample versions, deleting sample directory {} ".format(sample_path), file=sys.stderr) + print(f"deleting {sample_path}") + shutil.rmtree(sample_path) except Exception as e: print("could not delete folder: {} ".format(sample_version_path), file=sys.stderr) else: try: - os.symlink(file_path, sample_version_path / file_name) - print((sample_version_path / file_name).absolute(), file=sys.stdout) + if os.path.islink(full_path) and not os.path.exists(full_path): + breakpoint() + print(f"Removing broken symlink for: {full_path}") + os.remove(full_path) + if add_bai: + os.remove(full_path_index) + os.symlink(file_path, full_path) + print((full_path).absolute(), file=sys.stdout) + seen_paths.add(full_path) + if add_bai: + os.symlink(file_path, full_path_index) + print((full_path_index).absolute(), file=sys.stdout) except Exception as e: - print("Could not create symlink from '{}' to '{}'".format(sample_version_path / file_name, file_path), file=sys.stderr) + breakpoint() + print("Could not create symlink from '{}' to '{}'".format(full_path, file_path), file=sys.stderr) continue try: @@ -339,7 +520,12 @@ def link_bams_by_patient_id(operator_run, directory, request_id, sample_id, argu pass return "Completed" - +def is_run_manual(run, request_id): + pattern = r"/data1/core006/access/production/runs/Project_.*/bam_qc" + match = re.match(pattern, run["output_directory"]) + if match: + return True + return False def find_files_by_sample(file_group, sample_id = None): def traverse(file_group): files = [] diff --git a/apps/cleaning/__init__.py b/apps/cleaning/__init__.py old mode 100644 new mode 100755 diff --git a/apps/cmoch/__init__.py b/apps/cmoch/__init__.py old mode 100644 new mode 100755 index 26b672f..9cbaa02 --- a/apps/cmoch/__init__.py +++ b/apps/cmoch/__init__.py @@ -10,7 +10,8 @@ "dmpmanifest": ("cmo_manifest", "manifest"), "bams": ("cmo-ch nucleo", "bams"), "qc": ("CMO-CH QC", "quality_control"), - "qc_agg": ("CMO-CH QC Agg", "quality_control_aggregate") + "qc_agg": ("CMO-CH QC Agg", "quality_control_aggregate"), + "chipvar": ("CMO-CH Chip-Var","chipvar") } diff --git a/apps/heme/__init__.py b/apps/heme/__init__.py new file mode 100755 index 0000000..1caef87 --- /dev/null +++ b/apps/heme/__init__.py @@ -0,0 +1,397 @@ +import os +import sys +from collections import defaultdict +from urllib.parse import urljoin +from pathlib import Path +import shutil +import requests + +FLAG_TO_APPS = { + "dmpmanifest": ("cmo_manifest", "manifest"), + "bams": ("heme nucleo", "bams"), + "qc": ("heme nucleo qc", "quality_control"), + "qc_agg": ("heme nucleo qc agg", "quality_control_aggregate"), + "chipvar": ("Heme Chip-Var","chipvar") +} + + +def heme_commands(arguments, config): + print('Running HEME') + + request_ids, sample_id, apps, show_all_runs = get_arguments(arguments) + for request in request_ids: + tags = '{"cmoSampleId":"%s"}' % sample_id if sample_id else '{"igoRequestId":"%s"}' % request + if arguments.get('link'): + for (app, app_version) in apps: + (app_name, directory) = FLAG_TO_APPS[app] + operator_run = get_operator_run( + app_name, app_version, tags, config, show_all_runs) + if operator_run: + if arguments.get('--single-dir'): + if app == "bams": + link_bams_to_single_dir( + operator_run, app, request, sample_id, arguments, config, show_all_runs) + else: + print("Apps other than bams not supported at this time") + else: + link_app(operator_run, directory, request, + sample_id, arguments, config, show_all_runs) + + if arguments.get('link-patient'): + for (app, app_version) in apps: + (app_name, directory) = FLAG_TO_APPS[app] + operator_run = get_operator_run( + app_name, app_version, tags, config, show_all_runs) + if operator_run: + if(app == "bams"): + link_bams_by_patient_id( + operator_run, "bams", request, sample_id, arguments, config, show_all_runs) + else: + link_single_sample_workflows_by_patient_id(operator_run, directory, request, sample_id, arguments, + config, show_all_runs) + + +def get_operator_run(app_name, app_version=None, tags=None, config=None, show_all_runs=False): + latest_operator_run = { + "tags": tags, + "status": "COMPLETED", + "page_size": 1, + "app_name": app_name + } + + if show_all_runs: + latest_operator_run.pop("status") + + if app_version: + latest_operator_run["app_version"] = app_version + + response = requests.get(urljoin(config['beagle_endpoint'], config['api']['operator-runs']), + headers={ + 'Authorization': 'Bearer %s' % config['token']}, + params=latest_operator_run) + + latest_runs = response.json()["results"] + if not latest_runs: + print("There are no completed operator runs for this request in the following app: %s:%s" % + (str(app_name), str(app_version)), file=sys.stderr) + return None + + return latest_runs[0] + +def open_request_file(request_ids_file): + try: + with open(request_ids_file,'r') as file: + request_ids = [] + for line in file: + # Remove leading and trailing whitespaces and split the line by comma + request = line.strip(',\n') + # Append the list of items to the result list + request_ids.append(request) + except FileNotFoundError: + raise FileNotFoundError('Cannot find filename') + return request_ids + +def get_arguments(arguments): + request_ids = arguments.get('--request-ids') + request_ids_file = arguments.get('--request-ids-file') + sample_id = arguments.get('--sample-id') + app_tags = arguments.get('--apps') + show_all_runs = arguments.get('--all-runs') or False + if request_ids_file: + request_ids = open_request_file(request_ids_file) + + apps = [] # [(tag, version), ...] + for app in app_tags: + r = app.split(":") + if len(r) > 1: + apps.append((r[0], r[1])) + else: + apps.append((r[0], None)) + + return request_ids, sample_id, apps, show_all_runs + + +def get_runs(operator_run_id, config, show_all_runs): + run_params = { + "operator_run": operator_run_id, + "page_size": 1000, + "status": "COMPLETED" + } + + if show_all_runs: + run_params.pop("status") + + response = requests.get(urljoin(config['beagle_endpoint'], config['api']['run']), + headers={'Authorization': 'Bearer %s' % config['token']}, params=run_params) + + return response.json()["results"] + + +def get_run_by_id(run_id, config): + response = requests.get(urljoin(config['beagle_endpoint'], config['api']['run'] + run_id), + headers={'Authorization': 'Bearer %s' % config['token']}) + + return response.json() + + +def get_files_by_run_id(run_id, config): + response = requests.get(urljoin(config['beagle_endpoint'], config['api']['run'] + run_id), + headers={'Authorization': 'Bearer %s' % config['token']}) + + return response.json()["outputs"] + + +def get_file_path(file): + return file["location"][7:] + + +def link_app(operator_run, directory, request_id, sample_id, arguments, config, show_all_runs): + + version = arguments.get("--dir-version") or operator_run["app_version"] + should_delete = arguments.get("--delete") or False + + path = Path("./") + path_without_version = path / ("Project_" + request_id) / directory + path = path_without_version / version + path.mkdir(parents=True, exist_ok=True, mode=0o755) + + runs = get_runs(operator_run["id"], config, show_all_runs) + if not runs: + return + + files = [] # (sample_id, /path/to/file) + for run_meta in runs: + run = get_run_by_id(run_meta["id"], config) + if should_delete: + try: + os.unlink(path / run["id"]) + print((path / run["id"]).absolute(), file=sys.stdout) + except Exception as e: + print("could not delete symlink: {} ".format( + path / run["id"]), file=sys.stderr) + else: + try: + os.symlink(run["output_directory"], path / run["id"]) + print((path / run["id"]).absolute(), file=sys.stdout) + except Exception as e: + print("could not create symlink from '{}' to '{}'".format( + run["output_directory"], path / run["id"]), file=sys.stderr) + + try: + os.unlink(path_without_version / "current") + except: + pass + + if not should_delete: + os.symlink(path.absolute(), path_without_version / "current") + return "Completed" + + +def link_single_sample_workflows_by_patient_id(operator_run, directory, request_id, sample_id, arguments, config, show_all_runs): + version = arguments.get("--dir-version") or operator_run["app_version"] + should_delete = arguments.get("--delete") or False + + path = Path("./") / directory + + runs = get_runs(operator_run["id"], config, show_all_runs) + if not runs: + return + + for run_meta in runs: + run = get_run_by_id(run_meta["id"], config) + + if operator_run['app_name'] in ['heme nucleo qc agg', 'cmo_manifest'] : + sample_path = path / request_id + else: + if "cmoSampleIds" in run["tags"].keys(): + cmo_id = "cmoSampleIds" + else: + cmo_id = "cmoSampleId" + sample_id = run["tags"][cmo_id][0] if isinstance( + run["tags"][cmo_id], list) else run["tags"][cmo_id] + a, b, _ = sample_id.split("-", 2) + patient_id = "-".join([a, b]) + sample_path = path / patient_id / sample_id + sample_path.mkdir(parents=True, exist_ok=True, mode=0o755) + sample_version_path = sample_path / version + if should_delete: + try: + os.unlink(sample_version_path) + print(sample_version_path.absolute(), file=sys.stdout) + except Exception as e: + print("could not delete symlink: {} ".format( + sample_version_path), file=sys.stderr) + else: + try: + os.symlink(run["output_directory"], sample_version_path) + print(sample_version_path.absolute(), file=sys.stdout) + except Exception as e: + print("could not create symlink from '{}' to '{}'".format( + sample_version_path.absolute(), run["output_directory"]), file=sys.stderr) + + try: + os.unlink(sample_path / "current") + except: + pass + + if not should_delete: + os.symlink(sample_version_path.absolute(), sample_path / "current") + + return "Completed" + + +def link_bams_to_single_dir(operator_run, directory, request_id, sample_id, arguments, config, show_all_runs): + version = arguments.get("--dir-version") or operator_run["app_version"] + + path = Path("./") / directory / ("Project_" + request_id) + + runs = get_runs(operator_run["id"], config, show_all_runs) + + if not runs: + return + + files = [] # (sample_id, /path/to/file) + + for run in runs: + for file_group in get_files_by_run_id(run["id"], config): + files = files + \ + find_files_by_sample(file_group["value"], sample_id=sample_id) + + accepted_file_types = ['.bam', '.bai'] + for (sample_id, file) in files: + file_path = get_file_path(file) + _, file_ext = os.path.splitext(file_path) + + if file_ext not in accepted_file_types: + continue + + file_name = os.path.basename(file_path) + + sample_id, _ = file_name.split("_", 1) + a, b, _ = sample_id.split("-", 2) + patient_id = "-".join([a, b]) + + sample_path = path + sample_version_path = sample_path / version + sample_version_path.mkdir(parents=True, exist_ok=True, mode=0o755) + + try: + os.symlink(file_path, sample_version_path / file_name) + print((sample_version_path / file_name).absolute(), file=sys.stdout) + except Exception as e: + print("Could not create symlink from '{}' to '{}'".format( + sample_version_path / file_name, file_path), file=sys.stderr) + continue + + try: + os.unlink(sample_path / "current") + except Exception as e: + pass + + try: + os.symlink(sample_version_path.absolute(), sample_path / "current") + except Exception as e: + pass + + return "Completed" + + +def link_bams_by_patient_id(operator_run, directory, request_id, sample_id, arguments, config, show_all_runs): + version = arguments.get("--dir-version") or operator_run["app_version"] + should_delete = arguments.get("--delete") or False + + path = Path("./") / directory + + runs = get_runs(operator_run["id"], config, show_all_runs) + + if not runs: + return + + files = [] # (sample_id, /path/to/file) + + for run in runs: + for file_group in get_files_by_run_id(run["id"], config): + files = files + \ + find_files_by_sample(file_group["value"], sample_id=sample_id) + + accepted_file_types = ['.bam', '.bai'] + for (sample_id, file) in files: + file_path = get_file_path(file) + _, file_ext = os.path.splitext(file_path) + + if file_ext not in accepted_file_types: + continue + + file_name = os.path.basename(file_path) + + sample_id, _ = file_name.split("_", 1) + a, b, _ = sample_id.split("-", 2) + patient_id = "-".join([a, b]) + + sample_path = path / patient_id / sample_id + sample_version_path = sample_path / version + sample_version_path.mkdir(parents=True, exist_ok=True, mode=0o755) + + if should_delete: + try: + shutil.rmtree(sample_version_path) + print(sample_version_path.absolute(), file=sys.stdout) + except Exception as e: + print("could not delete folder: {} ".format( + sample_version_path), file=sys.stderr) + else: + try: + os.symlink(file_path, sample_version_path / file_name) + print((sample_version_path / file_name).absolute(), file=sys.stdout) + except Exception as e: + print("Could not create symlink from '{}' to '{}'".format( + sample_version_path / file_name, file_path), file=sys.stderr) + continue + + try: + os.unlink(sample_path / "current") + except Exception as e: + pass + + if not should_delete: + try: + os.symlink(sample_version_path.absolute(), + sample_path / "current") + except Exception as e: + pass + + return "Completed" + + +def find_files_by_sample(file_group, sample_id=None): + def traverse(file_group): + files = [] + if not file_group: + return [] + if type(file_group) == list: + if len(file_group) > 1: + return traverse(file_group[0]) + traverse(file_group[1:]) + elif file_group: + return traverse(file_group[0]) + elif "file" in file_group: + try: + file_sample_id = file_group["sampleId"] + if "File" == file_group["file"]["class"] and (not sample_id or + file_sample_id == + sample_id): + return [(file_sample_id, file_group["file"])] + [(file_sample_id, + f) for f in file_group["file"]["secondaryFiles"]] + except Exception as e: + print(e, file=sys.stderr) + elif "class" in file_group: + if file_group["class"] == "Directory": + return find_files_by_sample(file_group["listing"], sample_id=file_group["basename"]) + # TODO pull patient id here + elif file_group["class"] == "File": + secondary_files = [(sample_id, f) for f in file_group["secondaryFiles"] + ] if "secondaryFiles" in file_group else [] + return [(sample_id, file_group)] + secondary_files + + return [] + + return traverse(file_group) diff --git a/apps/lims.py b/apps/lims.py old mode 100644 new mode 100755 diff --git a/beaglecli b/beaglecli index 14c07e5..03ee196 100755 --- a/beaglecli +++ b/beaglecli @@ -30,11 +30,12 @@ import csv from apps.access import access_commands from apps.cmoch import cmoch_commands +from apps.heme import heme_commands from apps.lims import lims_commands from apps.cleaning import clean_json_comands -BEAGLE_ENDPOINT = os.environ.get('BEAGLE_ENDPOINT', 'http://voyager:5007') +BEAGLE_ENDPOINT = os.environ.get('BEAGLE_ENDPOINT', 'http://isvvoyagerprod01.mskcc.org:5007') BEAGLE_USER = os.environ.get('BEAGLE_USER', '') BEAGLE_PW = os.environ.get('BEAGLE_PW', '') @@ -94,10 +95,12 @@ Usage: beaglecli tempo-mpgen beaglecli tempo-mpgen override --normals= --tumors= beaglecli lims metadata [--request-id=] - beaglecli access link [--single-dir] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] - beaglecli access link-patient [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] - beaglecli cmoch link [--single-dir] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] - beaglecli cmoch link-patient [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] + beaglecli access link [--single-dir] [--app-prefix=] [--single-dir] [--all-requests] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] [--uncompleted-runs] + beaglecli access link-patient [--all-requests] [--app-prefix=] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] [--uncompleted-runs] + beaglecli cmoch link [--single-dir] [--all-requests] [--app-prefix=] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] + beaglecli cmoch link-patient [--all-requests] [--all-runs] [--app-prefix=] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] + beaglecli heme link [--single-dir] [--all-requests] [--app-prefix=] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] + beaglecli heme link-patient [--all-runs] [--all-requests] [--app-prefix=] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] beaglecli --version Options: @@ -242,7 +245,13 @@ def command(arguments, config): 'api': API })) if arguments.get('cmoch'): - return (cmoch_commands(arguments, { + return (access_commands(arguments, { + 'token': config.token, + 'beagle_endpoint': BEAGLE_ENDPOINT, + 'api': API + })) + if arguments.get('heme'): + return (access_commands(arguments, { 'token': config.token, 'beagle_endpoint': BEAGLE_ENDPOINT, 'api': API diff --git a/requirements.txt b/requirements.txt old mode 100644 new mode 100755 diff --git a/scripts/README.md b/scripts/README.md old mode 100644 new mode 100755 diff --git a/scripts/bin/access_beagle_endpoint.py b/scripts/bin/access_beagle_endpoint.py old mode 100644 new mode 100755 index 30a32b0..423eefd --- a/scripts/bin/access_beagle_endpoint.py +++ b/scripts/bin/access_beagle_endpoint.py @@ -19,29 +19,8 @@ def run_url(self, url): req = requests.get(url, auth=self.auth, verify=False) return req.json() - def patch_url(self, url, body): - headers = {'content-type': 'application/json'} - req = requests.patch(url, data=json.dumps(body), auth=self.auth, headers=headers, verify=False) - print(f"Status {req.status_code}") - return req - - def post_url(self, url, body): - headers = {'content-type': 'application/json'} - req = requests.post(url, data=json.dumps(body), auth=self.auth, headers=headers, verify=False) - print(f"Status {req.status_code}") - return req - - # had to build url weird because the requests docs were busted and I kept running into issues - def get_etl_jobs_by_request(self, request_id): - url = "%s/v0/etl/jobs/?page_size=1000&request_id=%s" % (self.API, request_id) - return self.run_url(url) - - def get_etl_job(self, run_id): - url = "%s/v0/etl/jobs/%s" % (self.API, run_id) - return self.run_url(url) - def get_file_ids(self, request_id): - url = "%s/v0/fs/files/?page_size=1000&metadata=igoRequestId:%s" % (self.API, request_id) + url = "%s/v0/fs/files/?page_size=1000&metadata=igoRequestId:%s" % (self.API, request_id) data = self.run_url(url) file_ids = list() for result in data['results']: diff --git a/scripts/deregister_jobs_and_files.py b/scripts/deregister_jobs_and_files.py index 5c112fb..94fb79f 100755 --- a/scripts/deregister_jobs_and_files.py +++ b/scripts/deregister_jobs_and_files.py @@ -12,83 +12,19 @@ BEAGLE = beagle_api.AccessBeagleEndpoint() -def get_jobs(request_id): - """ - From request_id, get all related run data - """ - data = BEAGLE.get_etl_jobs_by_request(request_id)['results'] - - ids_fetch_samples = set() - - for job in data: - run_type = job['run'] - if run_type == 'beagle_etl.jobs.lims_etl_jobs.fetch_samples': - ids_fetch_samples.add(job['id']) - - return ids_fetch_samples - -def get_children_from_job(run_id): - """ - Retrieves children from main job - """ - data = BEAGLE.get_etl_job(run_id) - return data['children'] - -def get_run_type(run_id): - """ - We're assuming we get just one record here - """ - data = BEAGLE.get_etl_job(run_id) - if 'run' in data: - return data['run'] - return None - - -def get_file_id_from_run_id(run_id): - """ - Get the file id from the run id that imported it - """ - data = BEAGLE.get_etl_job(run_id) - file_path = data['args']['filepath'] - return BEAGLE.get_file_id_by_path(file_path) - if __name__ == "__main__": REQUEST_ID = sys.argv[1] OUTSCRIPT = sys.argv[2] - print("Retrieving root etl job IDs for %s" % REQUEST_ID) - FETCH_SAMPLE_JOBS = get_jobs(REQUEST_ID) - #print(FETCH_SAMPLE_JOBS) - + print("Searching beagle db for igoRequestId %s" % REQUEST_ID) files_to_deregister = set(BEAGLE.get_file_ids(REQUEST_ID)) - runs_to_deregister = set() - - number_of_fetch_jobs = len(FETCH_SAMPLE_JOBS) - print("Compiling child jobs from %i fetched jobs..." % number_of_fetch_jobs) - - for job_id in FETCH_SAMPLE_JOBS: - child_jobs = get_children_from_job(job_id) - for child_job in child_jobs: - run_type = get_run_type(child_job) - if run_type: - if run_type == "beagle_etl.jobs.lims_etl_jobs.create_pooled_normal": - pooled_normal_file_id = get_file_id_from_run_id(child_job) - files_to_deregister.add(pooled_normal_file_id) - runs_to_deregister.add(child_job) - - runs_to_deregister = list(runs_to_deregister.union(set(FETCH_SAMPLE_JOBS))) files_to_deregister = list(files_to_deregister) -# CHILD_JOBS = get_child_jobs(REQUEST_JOB_ID) -# print(CHILD_JOBS) num_files_to_deregister = len(files_to_deregister) - num_runs_to_deregister = len(runs_to_deregister) - print("Got %i files and %i runs to deregister. See beaglecli command output to execute." - % (num_files_to_deregister,num_runs_to_deregister)) - with open(OUTSCRIPT, 'w') as output_file: - for i in runs_to_deregister: - output_file.write("../beaglecli etl delete --job-id=%s\n" % i) + print("Found %i files to deregister; run \n\n\tbash %s\n\nto complete processing." + % (num_files_to_deregister,OUTSCRIPT)) + with open(OUTSCRIPT, 'w') as output_file: for i in files_to_deregister: if i: output_file.write("../beaglecli files delete --file-id=%s\n" % i)