Source code for segmentation_multiprocessing

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
The `SEGMENTATION` module enables automatic segmentation of DICOM or NIfTI images. Currently, TotalSegmentator is the only supported method for segmentation. The module will submit a new job task for each patient using either **SGE** or **SLURM** job schedulers.

Options
-------

The `SEGMENTATION` module can be configured with the following options:

- **verbose**: Enable or disable verbose mode to display detailed process information.
- **timer**: Enable a timer to record the module's execution time.
- **inputFolder**: Path to the folder containing the input images.
- **log**: Path to save a log file with detailed information about the segmentation process.
- **new_log_file**: Create a new log file, overwriting any existing file with the same name.
- **skip**: Path to a file listing subfolders inside `inputFolder` to exclude from processing.
- **skip-segmented-data**: If set to True, the module will skip data that has already been segmented (default: False).
- **multiprocessing**: Specify the number of CPU cores to use for parallel processing.
- **method**: Specify the segmentation method (currently, only "TotalSegmentator" is supported).
- **segmentation-list**: Path to a file listing specific volumes to segment.
- **image_type**: Specify whether the input images are in "DICOM" or "NIFTI" format:
  - For **DICOM**, the module generates an RTSTRUCT file (`RTSTRUCT.dcm`) containing segmentations.
  - For **NIFTI**, the module creates separate segmentations for each structure listed in `segmentation-list`, named as `Mask_<structure_name>.nii.gz`.
- **image_filename**: Name of the image file to segment when working with NIfTI images. For DICOM images, the module looks for images in the "DCM" folder.
- **job_scheduler**: Select the job scheduler for batch processing ("SGE" or "SLURM").

Example Usage
-------------

The following example demonstrates how to use the `SEGMENTATION` module:

.. code-block:: bash

    SEGMENTATION:
    {
        inputFolder: /path/to/NIFTI_folder
        method: TotalSegmentator
        image_type: dicom
        segmentation-list: /path/to/img2radiomics/v0.8.4/list_totalSegmentator_full.txt
        log: /path/to/logs/segmentation.log
    }

In this example:

- **inputFolder**: Specifies the folder containing images to segment.
- **method**: Selects "TotalSegmentator" for autosegmentation.
- **image_type**: Specifies DICOM format, producing an RTSTRUCT file with segmentations.
- **segmentation-list**: Points to a list of structures for segmentation.
- **log**: Specifies a path for the log file.

"""

# Automatic segmentation of NIfTI or DICOM images.
#
# This module currently supports TotalSegmentator (use --method='TotalSegmentator').
# Future versions may support additional segmentation methods.
#
# Usage:
#     segmentation_multiprocessing.py -i <inputFolder> [-v] [-S <skipFile>] [--include <includeFile>] [-m <method>] 
#                                      [-f <segmentationFile>] [-I <imageName>] [-t <imageType>] [--log <logFile>] 
#                                      [--job_scheduler <scheduler>] [-j <numJobs>] [--skip-segmented-data]
#
# Options:
#     -h, --help                       Show this help message and exit
#     -v, --verbose                    Enable verbose output (default: False)
#     -i, --input <inputFolder>        Path to the folder containing images to segment
#     -S, --skip <skipFile>            Path to a file with filenames to skip during segmentation
#     --include <includeFile>          Path to a file with filenames to include (all files are included by default)
#     -m, --method <algorithm>         Segmentation algorithm to use (default: TotalSegmentator)
#     -f, --file_list_segmentations    File with the names of the segmentations to perform
#     -I, --img_name <imageName>       Name of the image to segment (default: img.nii.gz)
#     -t, --type <imageType>           Type of images to process (NIfTI or DICOM; default: NIfTI)
#     --skip-segmented-data            Skip segmentation of data that are already segmented (default: False)
#     --log <logFile>                  Redirect stdout to a log file
#     --new_log                        Overwrite previous log file if it exists
#     --job_scheduler <scheduler>      Job scheduler to use for segmentation tasks (SGE or SLURM; default: SGE)
#     -j, --n_jobs <numJobs>           Number of simultaneous jobs (default: 1). If a job scheduler is used, 
#                                      multiprocessing is not typically useful.
# Help:
#     segmentation_multiprocessing.py -h


import sys, getopt, os
from tqdm import tqdm
import glob
import multiprocessing
import subprocess
import time
from datetime import datetime
from utils import hprint_msg_box
from utils import hprint
from utils import eprint
from utils import format_list_multiline

[docs] def main(argv): inputFolder = '' verbose = False skip_file_name='' skip_files=[] include_file_name='' include_files=[] n_jobs=1 log = '' method = 'TotalSegmentator' segmentation_file_name='' segmentation_list = [] img_name = 'img.nii.gz' img_type= 'nifti' job_scheduler='SGE' skipSegmented= False new_log = False try: opts, args = getopt.getopt(argv, "hvi:j:S:m:f:I:t:",["log=","new_log","verbose","help","input=","n_jobs=","skip=","include=","method=","file_list_segmentations=","img_name=","type=","job_scheduler=","skip-segmented-data"]) except getopt.GetoptError: print('Usage: segmentation_multiprocessing.py -i <inputFolder> [-v] [-S <skipFile>] [--include <includeFile>] [-m <method>] [-f <segmentationFile>] [-I <imageName>] [-t <imageType>] [--log <logFile>] [--job_scheduler <scheduler>] [-j <numJobs>] [--skip-segmented-data]', flush=True) sys.exit(2) for opt,arg in opts: if opt in ("-h", "--help"): print("NAME",flush=True) print("\tsegmentation_mutiprocessing.py\n",flush=True) print("SYNOPSIS",flush=True) print("\tsegmentation_multiprocessing.py [-h|--help][-v|--verbose][-i|--input <inputfolder>][-m|--method <algorithm>][-j|--n_jobs <number of simultaneous jobs>]\n", flush=True) print("DESRIPTION",flush=True) print("\tAutomatic segmentation of NIfTI or DICOM images\n", flush=True) print("OPTIONS",flush=True) print("\t -h, --help: print this help page",flush=True) print("\t -v, --verbose: False by default",flush=True) print("\t -i, --input: input folder with images",flush=True) print("\t -S, --skip: path to file with filenames to skip",flush=True) print("\t --include: path to file with filenames to include (all files included by default)",flush=True) print("\t -m, --method: algorithm to use for segmentation (default: TotalSegmentator)", flush=True) print("\t -f, --file_list_segmentations: file with the name of the segmentations to perform",flush=True) print("\t -I, --img_name: name of the image to process (default: img.nii.gz)",flush=True) print("\t -t, --type: type of the images to process (NIfTI or DICOM, default: NIfTI)", flush=True) print("\t --skip-segmented-data: skip data that are already segmented", flush=True) print("\t --log: redirect stdout to a log file", flush=True) print("\t --new_log: overwrite previous log file", flush=True) print("\t --job_scheduler, use SGE or SLURM to schedule jobs (default SGE)",flush=True) print("\t -j, --n_jobs: number of simultaneous jobs (default: 1 - if a job_scheduler is used, multiprocessing is not useful)", flush=True) sys.exit() elif opt in ("-i", "--input"): inputFolder = arg elif opt in ("-v", "--verbose"): verbose = True elif opt in ("-j", "--n_jobs"): n_jobs= int(arg) elif opt in ("-S","--skip"): skip_file_name= arg elif opt in ("--include"): include_file_name= arg elif opt in ("-f","--file_list_segmentations"): segmentation_file_name= arg elif opt in ("-I","--img_name"): img_name= arg elif opt in ("-m","--method"): method= arg elif opt in ("-t","--type"): img_type= arg elif opt in ("--log"): log= arg elif opt in ("--new_log"): new_log= True elif opt in ("--job_scheduler"): job_scheduler = arg elif opt in ("--skip-segmented-data"): skipSegmented = True if log != '': if new_log: f = open(log,'w+') else: f = open(log,'a+') sys.stdout = f if skip_file_name != '': try: file= open(skip_file_name, 'r') skip_files = file.read().splitlines() except: print("\033[31mERROR! Unable to read the skip file\033[0m",flush=True) if include_file_name != '': try: file= open(include_file_name, 'r') include_files = file.read().splitlines() except: print("\033[31mERROR! Unable to read the include file\033[0m",flush=True) if segmentation_file_name != '': try: file= open(segmentation_file_name, 'r') segmentation_list = file.read().splitlines() except: print("\033[31mERROR! Unable to read the segmentation list file\033[0m",flush=True) else: print("Please use -f or --file_list_segmentations to provide a file containing the name of the segmentations to perform",flush=True) if verbose: msg = ( f"Input folder: {inputFolder}\n" f"n_jobs: {n_jobs}\n" f"Skip file: {skip_file_name}\n" f"Files to skip: {format_list_multiline(skip_files,5)}\n" f"Include file: {include_file_name}\n" f"Files to include: {format_list_multiline(include_files,5)}\n" f"Method: {method}\n" f"File with list of segmentation: {segmentation_file_name}\n" f"Structures to segment: {format_list_multiline(segmentation_list,5)}\n" f"Image type: {img_type}\n" f"Skip segmented data: {skipSegmented}\n" f"Job scheduler: {job_scheduler}\n" f"Log: {log}\n" f"Overwrite previous log file: {str(new_log)}\n" f"Verbose: {verbose}\n" ) hprint_msg_box(msg=msg, indent=2, title=f"SEGMENTATION {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") if n_jobs == 1: for patient in tqdm(glob.glob(inputFolder+"/*"), ncols=100, desc="Perform image segmentation", bar_format="{l_bar}{bar} [time left: {remaining}, time spent: {elapsed}]", colour="yellow"): image_segmentation(inputFolder,patient,img_name,img_type,method,segmentation_list,skip_files,include_files,skipSegmented, verbose,log,job_scheduler) else: with multiprocessing.Pool(n_jobs) as pool: tqdm(pool.starmap(image_segmentation, [(inputFolder,patient,img_name,img_type,method,segmentation_list,skip_files,include_files,skipSegmented, verbose,log,job_scheduler) for patient in glob.glob(inputFolder+"/*")]), ncols=100, desc="Perform image segmentation", bar_format="{l_bar}{bar} [time left: {remaining}, time spent: {elapsed}]", colour="yellow") while True: #Wait jobs for segmentation are completed before exiting if os.path.exists(os.path.join(inputFolder,'job_ids.txt')): try: with open(os.path.join(inputFolder,'job_ids.txt'), 'r') as file: if not file.read(): if verbose: print("Segmentations completed", flush=True) break #exit loop when all job_id are completed and removed from job-ids.txt else: if verbose: print("Segmentation jobs are currently pending/running",flush=True) except: print("\033[33mWARNING! Unable to open the file 'job_ids.txt'. Please check your data to verify if segmentation was performed correctly.\033[0m", flush=True) else: print("\033[33mWARNING! The file 'job_ids.txt' was not found. Data were probably not segmented correctly.\033[0m", flush=True) break remove_finished_jobs(inputFolder,job_scheduler) time.sleep(60) #Remove job_ids.txt after completed the segmentations try: os.remove(os.path.join(inputFolder,'job_ids.txt')) except: print("\033[33mWARNING! The file 'job_ids.txt' was not removed\033[0m", flush=True) #add prefix Mask_ at the begining of the name of all segmentation print("Add prefix to segmentation names",flush=True) for patient in tqdm(glob.glob(inputFolder+"/*"), ncols=100, desc="Rename segmentations", bar_format="{l_bar}{bar} [time left: {remaining}, time spent: {elapsed}]", colour="yellow"): add_prefix(patient,segmentation_list,img_type)
[docs] def image_segmentation(path,patient,img_name, img_type, method,segmentation_list,skip_files,include_files,skipSegmented, verbose,log,job_scheduler): if log != '': f = open(log,'a+') sys.stdout = f ListSegmentationsString = " ".join(str(item) for item in segmentation_list) patientID=os.path.basename(patient) if len(include_files) > 0: #if file to include are specify if patientID not in include_files: #if patient is to be excluded if verbose: print("\n" + patientID + " (" + patient + ") is not in the list of patients to include", flush=True) return if len(skip_files) > 0: #if there are files to skip if patientID in skip_files: if verbose: print("\nskip "+patientID+" ("+patient+")",flush=True) return if verbose: hprint(f"Processing {patientID}", patient) for patient_subdirectory in glob.glob(patient+"/*"): subdirectory=os.path.basename(patient_subdirectory) if verbose: print(patientID+": "+subdirectory,flush=True) if not skipSegmented or (img_type.lower()=='dicom' and not os.path.exists(os.path.join(patient_subdirectory, "RTSTRUCT.dcm"))) or (len(glob.glob(os.path.join(patient_subdirectory, "*nii.gz"))) < 2) : #run the segmentation if there is no rules to skip segmented data or data or check if data are not already segmented (RTSTRUCT file or more than one nifti file) if method == 'TotalSegmentator': if job_scheduler in ('SGE','Sge','sge'): try: cmd = "qsub -terse SGE/qsub_TotalSegmentator_GPU.sh " + img_type +" "+os.path.join(patient_subdirectory,img_name) +" " + patient_subdirectory + " " + '"' + ListSegmentationsString + '"' job_id=subprocess.check_output(cmd,shell=True) print("JOB_ID=",str(job_id.decode().strip()),flush=True) with open(os.path.join(path,"job_ids.txt"),'a+') as f: f.write(str(job_id.decode().strip())+'\n') except: print("\033[31mERROR! Total Segmentator did not run properly\033[0m",flush=True) print(f"\033[31mSkipping image {patientID} {subdirectory}\033[0m",flush=True) eprint("Skipping "+patientID+" "+subdirectory+" (ERROR TotalSegmentator)") continue elif job_scheduler in ('SLURM','Slurm','slurm'): try: cmd = "sbatch SLURM/sbatch_TotalSegmentator_GPU.sh " + img_type +" "+os.path.join(patient_subdirectory,img_name) +" " + patient_subdirectory + " " + '"' + ListSegmentationsString + '"' job_id=subprocess.check_output(cmd,shell=True) print('JOB_ID=',str(job_id.decode().strip().split()[-1]),flush=True) with open(os.path.join(path,"job_ids.txt"),'a+') as f: f.write(str(job_id.decode().strip().split()[-1])+'\n') except: print("No GPU available, trying with CPU.") try: cmd = "sbatch SLURM/sbatch_TotalSegmentator_CPU.sh " + img_type +" "+os.path.join(patient_subdirectory,img_name) +" " + patient_subdirectory + " " + '"' + ListSegmentationsString + '"' job_id=subprocess.check_output(cmd,shell=True) with open(os.path.join(path,"job_ids.txt"),'a+') as f: f.write(str(job_id.decode().strip().split()[-1])+'\n') f.close() except: print("\033[31mERROR! Total Segmentator did not run properly\033[0m", flush=True) print(f"\033[31mSkipping image {patientID} {subdirectory}\033[0m",flush=True) eprint("Skipping "+patientID+" "+subdirectory+" (ERROR TotalSegmentator)") continue elif job_scheduler in ('NONE','None','none'): try: cmd = "NoJobScheduler/TotalSegmentator_GPU.sh " + img_type +" "+os.path.join(patient_subdirectory,img_name) +" " + patient_subdirectory + " " + '"' + ListSegmentationsString + '"' job_id=subprocess.Popen(cmd, shell=True) print('JOB_ID=',str(job_id.pid),flush=True) with open(os.path.join(path,"job_ids.txt"),'a+') as f: f.write(str(job_id.pid)+'\n') f.close() except: print("No GPU available, trying with CPU.") try: cmd = "NoJobScheduler/TotalSegmentator_CPU.sh " + img_type +" "+os.path.join(patient_subdirectory,img_name) +" " + patient_subdirectory + " " + '"' + ListSegmentationsString + '"' job_id=subprocess.Popen(cmd, shell=True) with open(os.path.join(path,"job_ids.txt"),'a+') as f: f.write(str(job_id.pid)+'\n') f.close() except: print("\033[31mERROR! Total Segmentator did not run properly\033[0m", flush=True) print(f"\033[31mSkipping image {patientID} {subdirectory}\033[0m",flush=True) eprint("Skipping "+patientID+" "+subdirectory+" (ERROR TotalSegmentator)") continue else: print("\033[31mERROR! The job scheduler", job_scheduler, "is not available\033[0m", flush=True) else: print("\033[31mERROR! Segmentation algorithm", method, "is not available\033[0m", flush=True) else: print("\033[33mWARNING!: Skip segmentation for "+patient_subdirectory,"\033[0m",flush=True)
[docs] def add_prefix(patient,segmentation_list,img_type): #add prefix Mask_ at the begining of the name of all segmentation segmentation_list_lower = [name.lower() for name in segmentation_list] for patient_subdirectory in glob.glob(patient+"/*"): if img_type == 'dicom': os.rename(os.path.join(patient_subdirectory,'segmentations.dcm'),os.path.join(patient_subdirectory,'RTSTRUCT.dcm')) else: #nifti for filename in os.listdir(patient_subdirectory): filename_without_extension_lower= filename.split('.')[0].lower() if filename_without_extension_lower in segmentation_list_lower: new_filename = f'Mask_{filename}' os.rename(os.path.join(patient_subdirectory,filename),os.path.join(patient_subdirectory,new_filename))
[docs] def remove_finished_jobs(path,job_scheduler): if job_scheduler in ('SGE','Sge','sge'): current_job_ids=subprocess.check_output("qstat | awk '{print $1}'",shell=True) elif job_scheduler in ('SLURM','Slurm','slurm'): current_job_ids=subprocess.check_output("squeue -u $(whoami) | awk '{print $1}'",shell=True) elif job_scheduler in ('NONE','None','none'): current_job_ids=subprocess.check_output("pgrep -u $(whoami)", shell=True) else: print("\033[31mERROR! The job scheduler ",job_scheduler," is not available\033[0m",flush=True) current_job_ids='' list_current_job_ids=current_job_ids.decode().split('\n') #add [2:] at the end to remove to header lines try: with open(os.path.join(path,'job_ids.txt'),'r') as file, open(os.path.join(path,'.tmp_job_ids.txt'),'w') as tmp: for line in file: try: id_value = str(line.rstrip()) if not id_value in list_current_job_ids: continue #not copy this line in tmp file tmp.write(line) except: print("\033[31mERROR! Check the job_ids.txt file\033[0m", flush=True) os.rename(os.path.join(path,'.tmp_job_ids.txt'),os.path.join(path,'job_ids.txt')) except: sys.exit()
if __name__ == "__main__": main(sys.argv[1:])