#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
The `SPATIAL_RESAMPLING` module resamples NIfTI images to a specified resolution, providing options for interpolation and voxel size. It uses the Convert3D (c3d) program from ITK-SNAP (http://www.itksnap.org) or SimpleITK for spatial transformations, depending on configuration.
Options
-------
The `SPATIAL_RESAMPLING` module accepts the following options:
- **verbose**: Enable verbose output for more detailed process information (default: False).
- **timer**: Enable a timer to measure execution time (default: False).
- **inputFolder**: Path to the folder containing input NIfTI files.
- **outputFolder**: Path to save the resampled output files.
- **outputFolderSuffix**: Adds a suffix to the `inputFolder` name to create an output folder.
- **with-segmentation**: Specify if folders contain segmentation files.
- **all-data-with-segmentation**: Specify if all folders contain segmentation files (default: True).
- **log**: Path to a log file for saving detailed information about the resampling process.
- **new_log_file**: Create a new log file; if a file with the same name exists, it will be overwritten.
- **skip**: Path to a file listing subfolders to exclude from resampling.
- **include**: Path to a file listing subfolders to include in resampling (all subfolders included by default).
- **multiprocessing**: Specify the number of CPU cores to use for parallel processing.
- **image_interpolation**: Interpolation method for images. Options: `NearestNeighbor`, `Linear`, `Cubic`, `Sinc`, `Gaussian`, `B-Spline` (default: Linear).
- **mask_interpolation**: Interpolation method for masks/segmentation files. Options: `NearestNeighbor`, `Linear`, `Cubic`, `Sinc`, `Gaussian`, `B-Spline` (default: NearestNeighbor).
- **use_c3d**: Use the c3d program for resampling instead of SimpleITK.
- **voxel_size**: Specify the x, y, and z dimensions of the voxels in mm³. Only isotropic resampling is supported (default: 1).
- **suffix_name**: Add a suffix to resampled image and segmentation files (default: "111").
Example Usage
-------------
The following example demonstrates how to use the `SPATIAL_RESAMPLING` module to resample a folder of NIfTI files to 1mm³ isotropic voxel size:
.. code-block:: bash
SPATIAL_RESAMPLING:
{
inputFolder: /path/to/NIFTI_folder
outputFolderSuffix: 111
voxel_size: 1 # 1mm³
image_interpolation: Linear
mask_interpolation: NearestNeighbor
suffix_name: 111
log: /path/to/logs/spatial_resampling.log
}
In this example:
- **inputFolder**: Specifies the folder containing NIfTI files to be resampled.
- **outputFolderSuffix**: Appends the suffix "111" to the name of `inputFolder` to create the output directory.
- **voxel_size**: Sets the voxel size for resampling to 1mm³.
- **log**: Saves a log file at the specified location, recording details of the resampling process.
"""
# Resample NIfTI images and masks to a specific voxel dimension using SimpleITK or c3d
# C3D (Convert3D) is a part of ITK-SNAP and can be downloaded here: http://www.itksnap.org/pmwiki/pmwiki.php?n=Convert3D.Convert3D
#
# Usage: NiftiResampling_multiprocessing.py -i <inputFolder> -o <outputFolder> [options]
# Options:
# -v, --verbose Enable verbose output (default: False)
# --timer Measure execution time (default: False)
# -i, --inputFolder Path to the input folder containing NIfTI images
# -o, --outputFolder Path to the folder where resampled NIfTI images will be saved
# -e Suffix to add to the name of output images and masks (default: "111")
# -s, --size Size of the resampling voxel in mm³ (default: 1mm)
# -I, --interpolation Interpolation method for the image (default: Linear)
# -M, --mask_interpolation Interpolation method for masks (default: NearestNeighbor)
# -j, --n_jobs Number of simultaneous jobs (default: 1)
# --skip Path to a file with filenames to skip during processing
# --include Path to a file with filenames to include (default: include all)
# --no-segmentation Indicate that input folder does not contain segmentation files
# --all-segmentation Specify if all folders contain segmentation files (default: False)
# --log Path to a log file for saving output details
# --new_log Overwrite an existing log file if it exists
# --use_c3d Use c3d program for resampling instead of SimpleITK (default: False)
#
# Help: NiftiResampling_multiprocessing.py -h
import sys, getopt, os
import glob
import subprocess
import shlex
import multiprocessing
import nibabel as nib
import numpy as np
import SimpleITK as sitk
from tqdm import tqdm
from datetime import datetime
from utils import hprint_msg_box
from utils import hprint
from utils import format_list_multiline
[docs]
def main(argv):
inpath = ''
outpath = ''
verbose = False
n_jobs=1 #nb of CPUs
size=1.0 #resampling size in mm
interpolation = "Linear"
mask_interpolation = "NearestNeighbor"
suffix = "111"
path_to_c3d= "c3d"
skip_file_name=''
skip_files=[]
include_file_name=''
include_files=[]
log = ''
new_log = False
NoSegmentation = False
AllSegmentation = False
use_c3d = False
try:
opts, args = getopt.getopt(argv, "hvi:o:j:s:I:M:e:S:",["log=","new_log","inputFolder=","outputFolder=","verbose","help","n_jobs=","size=","interpolation=","mask_interpolation=","skip=","include=","no-segmentation","all-segmentation","use_c3d"])
except getopt.GetoptError:
print('Usage: NiftiResampling_multiprocessing.py -i <inputFolder> -o <outputFolder> [-v] [-e <suffix>] [-s <size>] [-I <interpolation>] [-M <mask_interpolation>] [-j <numJobs>] [--skip <skipFile>] [--include <includeFile>] [--no-segmentation] [--all-segmentation] [--log <logFile>]')
sys.exit(2)
for opt,arg in opts:
if opt in ("-h", "--help"):
print("NAME")
print("\tNiftiResampling_multiprocessing.py\n")
print("SYNOPSIS")
print("\tNiftiResampling_multiprocessing.py [-h|--help][-v|--verbose][-i|--inputFolder <inputfolder>][-o|--outputFolder <outfolder>][-e <suffix name for output imgage and mask>][-s|--size <int>][-I|--interpolation <interpolation method for the image>][-M|--mask_interpolation <interpolation method for the mask>][-j|--n_jobs <number of simultaneous jobs>]\n")
print("DESRIPTION")
print("\tResamples NIfTI images and masks to a specific voxel dimension using c3d\n")
print("OPTIONS")
print("\t -h, --help: print this help page")
print("\t -v, --verbose: False by default")
print("\t -i, --inputFolder: input folder with NIfTI images")
print("\t -o, --outputFolder: output folder to save resampled NIfTI images")
print("\t -e: suffix to use in the name for output images and masks (default \"111\")")
print("\t -s, --size: size of the resampling voxel in mm (default 1mm)")
print("\t -I, --interpolation: interpolation method for the image (default Linear)")
print("\t -M, --mask_interpolation: interpolation method for the mask (default NearestNeighbor)")
print("\t -S, --skip: path to file with filenames to skip when processing resampling")
print("\t --include: path to file with filenames to include (all files included by default)")
print("\t, --no-segmentation: input folder does not contain segmentations")
print("\t, --all-segmentation: input folder contains segmentations for all data (default False)")
print("\t --log: redirect stdout to a log file")
print("\t --new_log: overwrite previous log file")
print("\t -j, --n_jobs: number of simultaneous jobs (default:1)")
sys.exit()
elif opt in ("-i", "--inputFolder"):
inpath = arg
elif opt in ("-o", "--outputFolder"):
outpath = arg
elif opt in ("-v", "--verbose"):
verbose = True
elif opt in ("-j", "--n_jobs"):
n_jobs= int(arg)
elif opt in ("-s", "--size"):
size= float(arg)
elif opt in ("-I", "--interpolation"):
interpolation= arg
elif opt in ("-M", "--mask_interpolation"):
mask_interpolation= arg
elif opt in ("-e"):
suffix= arg
elif opt in ("--log"):
log= arg
elif opt in ("--new_log"):
new_log= True
elif opt in ("-S","--skip"):
skip_file_name= arg
elif opt in ("--include"):
include_file_name= arg
elif opt in ("--no-segmentation"):
NoSegmentation = True
elif opt in ("--all-segmentation"):
AllSegmentation = True
elif opt in ("--use_c3d"):
use_c3d = True
if log != '':
if new_log:
f = open(log,'w+')
else:
f = open(log,'a+')
sys.stdout = f
if skip_file_name != '':
try:
file= open(skip_file_name, 'r')
skip_files = file.read().splitlines()
except:
print("\033[31mERROR! Unable to read the skip file\033[0m")
if include_file_name != '':
try:
file= open(include_file_name, 'r')
include_files = file.read().splitlines()
except:
print("\033[31mERROR! Unable to read the include file\033[0m",flush=True)
if verbose:
msg = (
f"Input path: {inpath}\n"
f"Output path: {outpath}\n"
f"n_jobs: {n_jobs}\n"
f"Skip file: {skip_file_name}\n"
f"Files to skip: {format_list_multiline(skip_files,5)}\n"
f"Include file: {include_file_name}\n"
f"Files to include: {format_list_multiline(include_files,5)}\n"
f"Suffix to use in the name of generated files: {suffix}\n"
f"Interpolation method for the image: {interpolation}\n"
f"Interpolation method for the mask: {mask_interpolation}\n"
f"No segmentation: {NoSegmentation}\n"
f"All data segmented: {AllSegmentation}\n"
)
if use_c3d:
msg += f"Spatial Resampling with c3d program\n"
else:
msg +=f"Spatial Resampling with sitk library\n"
msg += (
f"Log: {log}\n"
f"Overwrite previous log file: {str(new_log)}\n"
f"Verbose: {verbose}\n"
)
hprint_msg_box(msg=msg, indent=2, title=f"SPATIAL_RESAMPLING {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if inpath == '' or outpath == '':
print("\033[31mERROR! Input and output folders need to be specified\033[0m", flush=True)
sys.exit()
elif inpath == outpath:
print("\033[31mERROR! Input and output paths must be different\033[0m", flush=True)
sys.exit()
else:
if n_jobs == 1:
for patient in tqdm(glob.glob(inpath+"/*"),
ncols=100,
desc="NIFTI Spatial resampling",
bar_format="{l_bar}{bar} [time left: {remaining}, time spent: {elapsed}]",
colour="yellow"):
process_patient(patient,outpath,size, interpolation, mask_interpolation, suffix, path_to_c3d, skip_files,include_files, verbose,log,NoSegmentation,AllSegmentation,use_c3d)
else:
with multiprocessing.Pool(n_jobs) as pool:
tqdm(pool.starmap(process_patient,
[(patient,outpath,size, interpolation, mask_interpolation, suffix, path_to_c3d, skip_files,include_files,verbose,log,NoSegmentation,AllSegmentation,use_c3d) for patient in glob.glob(inpath+"/*")]),
ncols=100,
desc="NIFTI Spatial resampling",
bar_format="{l_bar}{bar} [time left: {remaining}, time spent: {elapsed}]",
colour="yellow")
[docs]
def process_patient(patient,outpath,size, interpolation, mask_interpolation, suffix, path_to_c3d,skip_files,include_files, verbose,log,NoSegmentation,AllSegmentation,use_c3d):
if log != '':
f = open(log,'a+')
sys.stdout = f
patientID=os.path.basename(patient)
if len(include_files) > 0: #if file to include are specify
if patientID not in include_files: #if patient is to be excluded
if verbose:
print("\n"+patientID+" ("+patient+") is not in the list of patients to include",flush=True)
return
if len(skip_files) > 0: #if there are files to skip
if patientID in skip_files:
if verbose:
print("\nskip "+patientID+" ("+patient+")",flush=True)
return
if verbose:
hprint(f"Processing {patientID}", patient)
#make directory for patient in output folder
if not os.path.exists(os.path.join(outpath,patientID)):
os.makedirs(os.path.join(outpath,patientID))
for patient_subdirectory in glob.glob(patient+"/*"):
subdirectory=os.path.basename(patient_subdirectory)
if verbose:
print(patientID+": "+subdirectory,flush=True)
#make directory for subfolder
if not os.path.exists(os.path.join(outpath,patientID,subdirectory)):
os.makedirs(os.path.join(outpath,patientID,subdirectory))
nifti_files = glob.glob(patient_subdirectory + "/*nii.gz")
if not nifti_files:
print(f"\033[31mERROR! Empty directory for {patientID} {subdirectory}\033[0m ",flush=True)
else:
for nifti_file in glob.glob(patient_subdirectory+"/*nii.gz"):
if not is_mask(nifti_file): # Skip if it's identified as a mask
if verbose:
hprint(f"Resampling Image {patientID} {subdirectory}", nifti_file)
input_img= os.path.join(patient_subdirectory,nifti_file)
output_img=os.path.join(outpath,patientID,subdirectory,os.path.splitext(os.path.splitext(os.path.basename(nifti_file))[0])[0]+"_"+suffix+".nii.gz")
if use_c3d:
arg_img="-interpolation "+interpolation+" -resample-mm "+str(float(size))+"x"+str(float(size))+"x"+str(float(size))+"mm"
arg_msk="-interpolation "+mask_interpolation+" -resample-mm "+str(float(size))+"x"+str(float(size))+"x"+str(float(size))+"mm"
cmd = path_to_c3d+" "+input_img+ " " + arg_img + " -o " + output_img
try:
subprocess.run(shlex.split(cmd))
except Exception as e:
print("\033[31mERROR!\033[0m "+ cmd,flush=True)
print(e)
else:
try:
resample_image_sitk(input_img, output_img, size, interpolation,cast_type="float32")
except:
print("\033[31mERROR! Spatial resampling with sitk failed\033[0m", flush=True)
print(f"\033[31mSkipping image for {patientID}{subdirectory}\033[0m",flush=True)
if verbose:
print(patientID+": "+subdirectory+" masks",flush=True)
if not NoSegmentation: #if there is mask to resample
for nifti_file in glob.glob(patient_subdirectory+"/*nii.gz"):
if len(glob.glob(os.path.join(patient_subdirectory, "*nii.gz"))) > 1: #there is more than 1 nifti file
if is_mask(nifti_file): #Nifti identified as a mask
if verbose:
hprint(f"Resampling Mask {patientID} {subdirectory}", nifti_file)
input_msk= os.path.join(patient_subdirectory,nifti_file)
mask_name=os.path.splitext(os.path.splitext(os.path.basename(nifti_file))[0])[0]+"_"+suffix+".nii.gz"
output_msk=os.path.join(outpath,patientID,subdirectory,mask_name)
if use_c3d:
cmd = path_to_c3d+" "+ input_msk + " " + arg_msk + " -o " + output_msk
try:
subprocess.run(shlex.split(cmd))
except Exception as e:
print("\033[31mERROR!\033[0m "+cmd,flush=True)
print(e)
else:
try:
resample_image_sitk(input_msk, output_msk, size, mask_interpolation, cast_type="int8")
except:
print("\033[31mERROR! Spatial resampling with sitk failed\033[0m", flush=True)
print(f"\033[31mSkipping image for {patientID}{subdirectory}\033[0m",flush=True)
else:
if AllSegmentation: #all data need to be segemented
print("\033[31mERROR!: No segmentation found in the current subdirectory\033[0m",flush=True)
return
else:
print("\033[33mWARNING!: No segmentation found for data \033[0m"+patient_subdirectory,flush=True)
[docs]
def is_mask(nifti_file, verbose=False):
"""
Function to determine if a NIfTI file is a mask or an image based on the filename and the unique voxel intensities.
The function first checks the filename for common keywords (e.g., 'img', 'msk').
If verbose is True, it will print whether the file is identified as a mask or an image.
"""
# Check the filename for common patterns
filename = os.path.basename(nifti_file).lower() # Convert to lowercase to avoid case sensitivity
if 'img' in filename or 'image' in filename:
if verbose:
print(f"{nifti_file}: Identified as image based on filename.")
return False # Identified as an image
if 'msk' in filename or 'mask' in filename:
if verbose:
print(f"{nifti_file}: Identified as mask based on filename.")
return True # Identified as a mask
# If the filename doesn't give clear clues, check the voxel intensities
img = nib.load(nifti_file)
data = img.get_fdata()
unique_values = np.unique(data)
# Assume masks have few unique values, typically less than 10
if len(unique_values) < 100:
if verbose:
print(f"{nifti_file}: Identified as mask based on unique values (fewer than 100 unique values).")
return True # Consider it a mask
else:
if verbose:
print(f"{nifti_file}: Identified as image based on unique values (more than 100 unique values).")
return False # Consider it an image
[docs]
def resample_image_sitk(input_path, output_path, size, interpolation_type, cast_type=None):
# Load the image using SimpleITK
image = sitk.ReadImage(input_path)
# Optionally cast the image to the specified type
if cast_type:
if cast_type.lower() == "float32":
image = sitk.Cast(image, sitk.sitkFloat32)
elif cast_type.lower() == "int8":
image = sitk.Cast(image, sitk.sitkInt8)
elif cast_type.lower() == "int16":
image = sitk.Cast(image, sitk.sitkInt16)
elif cast_type.lower() == "uint8":
image = sitk.Cast(image, sitk.sitkUInt8)
elif cast_type.lower() == "uint16":
image = sitk.Cast(image, sitk.sitkUInt16)
else:
raise ValueError(f"\033[31mERROR! Unsupported cast type: {cast_type}\033[0m")
# Define the new spacing and calculate the new size
original_size = image.GetSize()
original_spacing = image.GetSpacing()
new_spacing = [float(size)] * 3
new_size = [
int(round(original_size[0] * (original_spacing[0] / new_spacing[0]))),
int(round(original_size[1] * (original_spacing[1] / new_spacing[1]))),
int(round(original_size[2] * (original_spacing[2] / new_spacing[2])))
]
# Set the resampler
resampler = sitk.ResampleImageFilter()
resampler.SetSize(new_size)
resampler.SetOutputSpacing(new_spacing)
resampler.SetOutputOrigin(image.GetOrigin())
resampler.SetOutputDirection(image.GetDirection())
# Choose interpolation type
if interpolation_type.lower() == "nearestneighbor":
resampler.SetInterpolator(sitk.sitkNearestNeighbor)
elif interpolation_type.lower() == "linear":
resampler.SetInterpolator(sitk.sitkLinear)
elif interpolation_type.lower() == "cubic":
resampler.SetInterpolator(sitk.sitkBSpline)
elif interpolation_type.lower() == "sinc":
resampler.SetInterpolator(sitk.sitkHammingWindowedSinc)
elif interpolation_type.lower() == "gaussian":
resampler.SetInterpolator(sitk.sitkGaussian)
elif interpolation_type.lower() == "b-spline":
resampler.SetInterpolator(sitk.sitkBSpline)
else:
raise ValueError(f"\033[31mERROR! Unsupported interpolation type: {interpolation_type}033[0m")
# Execute the resampling
resampled_image = resampler.Execute(image)
# Save the resampled image
sitk.WriteImage(resampled_image, output_path)
if __name__ == "__main__":
main(sys.argv[1:])