#!/usr/bin/env python
# -*- coding: utf-8 -*-
# dose.py
"""Routines to access and modify DICOM RT Dose."""
# Copyright (c) 2009-2016 Aditya Panchal
# Copyright (c) 2019-2020 Dan Cutright
# This file is part of dicompyler-core, released under a BSD license.
# See the file license.txt included with this distribution, also
# available at https://github.com/dicompyler/dicompyler-core/
#
# This code was adapted from dicom_dose_sum.py from DVH Analytics:
# https://github.com/cutright/DVH-Analytics/
from copy import deepcopy
import numpy as np
from dicompylercore import dicomparser
from pydicom.uid import generate_uid
from pydicom.datadict import dictionary_VR, keyword_dict
from dicompylercore.config import (
dicompyler_uid_prefix_rtdose,
mpl_available,
scipy_available,
)
from datetime import datetime
from pydicom.sequence import Sequence
from pydicom.dataset import Dataset
from warnings import warn
if scipy_available:
from scipy.ndimage import map_coordinates
[docs]class DoseGrid:
"""Class that stores DICOM-RT dose grids, performs addition/scaling."""
def __init__(
self,
rt_dose,
order=1,
mode="constant",
cval=0.0,
):
""" Initialization of a DoseGrid from a DICOM-RT Dose file or dataset.
Parameters
----------
rt_dose : pydicom Dataset or filename
DICOM RT Dose used to determine the structure dose grid data.
order : int, optional
The order of the spline interpolation (if needed), default is 1.
The order has to be in the range 0-5.
0: the nearest grid point, 1: trilinear, 2 to 5: spline
See scipy.ndimage.map_coordinates documentation for more details
mode : 'constant' or 'nearest', optional
The mode parameter determines how the other dose grid is extended
beyond its boundaries. Default is ``'constant'``. Behavior for
these values is as follows:
``'constant'`` (k k k k | a b c d | k k k k)
The other dose grid is extended by filling all values beyond
the edge with the same constant value, defined by the cval
parameter.
``'nearest'`` (a a a a | a b c d | d d d d)
The input is extended by replicating the last pixel.
Additional modes are available, see scipy.ndimage.map_coordinates
documentation for more details.
cval : scalar, optional
Value to fill past edges of input if mode is ‘constant’.
Default is 0.0.
"""
self.ds = dicomparser.DicomParser(rt_dose).ds
self.interp_param = {"order": order, "mode": mode, "cval": cval}
self.summation_type = None
self.sop_class_uid = getattr(self.ds, 'SOPClassUID', '')
self.sop_instance_uid = getattr(self.ds, 'SOPInstanceUID', '')
self.other_sop_class_uid = None
self.other_sop_instance_uid = None
if self.ds.Modality == "RTDOSE":
self.x_axis = (
np.arange(self.ds.Columns) * self.ds.PixelSpacing[0]
+ self.ds.ImagePositionPatient[0]
)
self.y_axis = (
np.arange(self.ds.Rows) * self.ds.PixelSpacing[1]
+ self.ds.ImagePositionPatient[1]
)
self.z_axis = (
np.array(self.ds.GridFrameOffsetVector)
+ self.ds.ImagePositionPatient[2]
)
# x and z are swapped in the pixel_array
pixel_array = self.ds.pixel_array * self.ds.DoseGridScaling
self.dose_grid = np.swapaxes(pixel_array, 0, 2)
else:
raise AttributeError(
"The DoseGrid class requires an RTDOSE file or dataset. "
"%s was detected" % self.ds.Modality
)
####################################################
# Basic properties
####################################################
@property
def shape(self):
"""Get the x, y, z dimensions of the dose grid"""
return (
self.ds.Columns,
self.ds.Rows,
len(self.ds.GridFrameOffsetVector),
)
@property
def axes(self):
"""Get the x, y, z axes of the dose grid (in mm)"""
return [self.x_axis, self.y_axis, self.z_axis]
@property
def scale(self):
"""Get the dose grid resolution (xyz)"""
diffs = np.diff(self.ds.GridFrameOffsetVector)
if not np.all(np.isclose(diffs, [diffs[0]]*len(diffs))):
raise NotImplementedError(
"Non-uniform GridFrameOffsetVector detected. Interpolated "
"summation of non-uniform dose-grid scales is not supported."
)
return np.array(
[
self.ds.PixelSpacing[0],
self.ds.PixelSpacing[1],
self.ds.GridFrameOffsetVector[1]
- self.ds.GridFrameOffsetVector[0],
]
)
@property
def offset(self):
"""Get the coordinates of the dose grid origin (mm)"""
return np.array(self.ds.ImagePositionPatient, dtype="float")
@property
def max_boundary_dose(self):
"""Get the max boundary dose"""
return max_boundary_value(self.dose_grid)
@property
def max_boundary_relative_dose(self):
return self.max_boundary_dose / np.max(self.dose_grid)
####################################################
# Tools
####################################################
def __add__(self, other):
"""Overload + operator to sum this dose grid with the other dose grid
Parameters
----------
other : DoseGrid
Another DoseGrid object.
"""
new = deepcopy(self)
new.add(other)
return new
def __mul__(self, factor):
"""Overload * operator to scale this dose grid by the provided factor
Parameters
----------
factor : int, float
Scale the dose grid by this value.
"""
new = deepcopy(self)
new.multiply(factor)
return new
def __rmul__(self, factor):
return self.__mul__(factor)
[docs] def multiply(self, factor):
"""
Scale the dose grid.
Parameters
----------
factor : int, float
Multiply the dose grid by this factor.
"""
if factor < 0:
raise NotImplementedError("Negative doses are not supported.")
self.dose_grid *= factor
self.dose_grid_post_processing()
[docs] def dose_grid_post_processing(self, other=None):
"""Set the pixel data and store UIDs from other DoseGrid"""
self.set_pixel_data()
if hasattr(self.ds, "DVHSequence"):
del self.ds.DVHSequence
if other is not None:
self.other_sop_class_uid = other.sop_class_uid
self.other_sop_instance_uid = other.sop_instance_uid
[docs] def is_coincident(self, other):
"""Check dose grid spatial coincidence.
Parameters
----------
other : DoseGrid
Another DoseGrid object.
"""
return (
self.ds.PixelSpacing == other.ds.PixelSpacing
and self.ds.ImagePositionPatient == other.ds.ImagePositionPatient
and self.ds.pixel_array.shape == other.ds.pixel_array.shape
and self.ds.GridFrameOffsetVector == other.ds.GridFrameOffsetVector
)
[docs] def set_pixel_data(self):
"""Update the PixelData with the current dose_grid"""
self.ds.BitsAllocated = 32
self.ds.BitsStored = 32
self.ds.HighBit = 31
self.ds.DoseGridScaling = (
np.max(self.dose_grid) / np.iinfo(np.uint32).max
)
pixel_data = (
np.swapaxes(self.dose_grid, 0, 2) / self.ds.DoseGridScaling
)
self.ds.PixelData = np.uint32(pixel_data).tobytes()
[docs] def save_dcm(self, file_path):
"""Save the pydicom.FileDataset to file"""
self.update_dicom_tags()
self.ds.save_as(file_path)
[docs] def get_ijk_points(self, other_axes):
"""Convert axes from another DoseGrid into ijk of this DoseGrid.
Parameters
----------
other_axes : list
The x, y, and z axis arrays.
Returns
-------
np.vstack
Array of other_axes in this ijk space.
"""
ijk_axes = [
(np.array(axis) - self.offset[a]) / self.scale[a]
for a, axis in enumerate(other_axes)
]
j, i, k = np.meshgrid(ijk_axes[1], ijk_axes[0], ijk_axes[2])
return np.vstack((i.ravel(), j.ravel(), k.ravel()))
####################################################
# Dose Summation
####################################################
[docs] def add(self, other, force=False):
"""
Add another dose grid to this dose grid, with interpolation if needed
Parameters
----------
other : DoseGrid
Another DoseGrid object.
force : bool
Set to True to ignore differences in DoseSummationType, DoseType,
DoseUnits, ImageOrientationPatient
"""
attrs = [
"DoseSummationType",
"DoseType",
"DoseUnits",
"ImageOrientationPatient",
]
attr_check = [
validate_attr_equality(self.ds, other.ds, attr) for attr in attrs
]
if not force and not all(attr_check):
mismatches = [
attr for i, attr in enumerate(attrs) if attr_check[i]
]
raise NotImplementedError(
"Dose summation of dose grids with these mismatched "
"attributes is not recommended: %s. Use "
"DoseGrid.add(other, force=True) to ignore"
% ",".join(mismatches)
)
if self.is_coincident(other):
self._direct_sum(other)
else:
if not scipy_available:
raise ImportError(
"scipy must be installed to perform interpolated dose sum."
)
self._interp_sum(other)
def _direct_sum(self, other):
"""Directly sum two coincident dose grids
Parameters
----------
other: DoseGrid
Another DoseGrid object.
"""
self.dose_grid += other.dose_grid
self.summation_type = "DIRECT"
self.dose_grid_post_processing(other)
def _interp_sum(self, other):
"""
Interpolate the other dose grid to this dose grid's axes,
then perform direct summation
Parameters
----------
other: DoseGrid
Another DoseGrid object.
"""
self.dose_grid += self.interp_entire_grid(other)
self.summation_type = "INTERPOLATED"
self.dose_grid_post_processing(other)
[docs] def interp_entire_grid(self, other):
"""
Interpolate the other dose grid to this dose grid's axes in one
operation
Parameters
----------
other: DoseGrid
Another DoseGrid object.
Returns
-------
np.array
The other dose grid interpolated to this dose grid's axes
"""
return map_coordinates(
input=other.dose_grid,
coordinates=other.get_ijk_points(self.axes),
**self.interp_param
).reshape(self.shape)
[docs] def show(self, z=None):
"""Show the dose grid using Matplotlib if present.
Parameters
----------
z : float, optional
slice position to display initially, by default None
"""
if not mpl_available:
raise ImportError(
"Matplotlib could not be loaded. Install and try again.")
return self
import matplotlib.pyplot as plt
from matplotlib.widgets import Slider
# Extract the list of planes (z) from the dose grid
planes = (
np.array(self.ds.GridFrameOffsetVector)
* self.ds.ImageOrientationPatient[0]
* self.ds.ImageOrientationPatient[4]
) + self.ds.ImagePositionPatient[2]
# Set up the plot
fig = plt.figure()
ax = fig.add_subplot(111)
rtdose = dicomparser.DicomParser(self.ds)
# Get the middle slice if the z is not provided
z = planes[planes.size // 2] if z is None else z
zplane = rtdose.GetDoseGrid(z) * self.ds.DoseGridScaling
# Flag to invert slider min/max if GFOV is decreasing (i.e. FFS)
reverse = planes[0] > planes[-1]
im = ax.imshow(zplane, cmap="jet",)
# Create a slider to change the (z)
axslice = fig.add_axes([0.34, 0.01, 0.50, 0.02])
slider = Slider(
ax=axslice,
label="Slice Position (mm):",
valmin=planes[-1] if reverse else planes[0],
valmax=planes[0] if reverse else planes[-1],
valinit=z,
valstep=np.diff(planes)[0],
)
def updateslice(z):
"""Update the data to show on the plot."""
im.set_data(rtdose.GetDoseGrid(z) * self.ds.DoseGridScaling)
plt.draw()
slider.on_changed(updateslice)
plt.show()
return self
[docs]def set_dicom_tag_value(ds, tag, value):
"""Set or update a DICOM tag value in the pydicom dataset.
Parameters
----------
ds : pydicom Dataset
The pydicom dataset for the tag to be added/updated to.
tag : str, int or tuple
DICOM tag or keyword to be added.
value : any
New value for the tag's element.
"""
try:
ds[tag].value = value
except KeyError:
if tag in keyword_dict: # Keyword provided rather than int or tuple
tag = keyword_dict[tag]
ds.add_new(tag, dictionary_VR(tag), value)
[docs]def add_dicom_sequence(ds, seq_keyword, data_set_dict):
"""Add a sequence to a data set.
Parameters
----------
ds : pydicom Dataset
The pydicom dataset for the sequence to be added to.
seq_keyword : str
The DICOM keyword for the sequence.
data_set_dict : dict
Dictionary of tags and values for the sequence element.
"""
seq_ds = Dataset()
for tag, value in data_set_dict.items():
set_dicom_tag_value(seq_ds, tag, value)
if hasattr(ds, seq_keyword):
getattr(ds, seq_keyword).append(seq_ds)
else:
setattr(ds, seq_keyword, Sequence([seq_ds]))
[docs]def validate_attr_equality(obj_1, obj_2, attr):
"""Assess the equality of the provided attr between two objects.
Send warning if unequal.
Parameters
----------
obj_1 : object
Any object with an `attr` attribute that is comparable by !=
obj_2 : object
Any object with an `attr` attribute that is comparable by !=
attr : str
The attribute to be compared between obj_1 and obj_2
"""
val_1 = getattr(obj_1, attr)
val_2 = getattr(obj_2, attr)
if val_1 != val_2:
warn("Different %s values detected:\n%s\n%s" % (attr, val_1, val_2))
return False
return True
[docs]def max_boundary_value(arr):
"""Get the greatest value on the boundary of a 3D numpy array
Parameters
----------
arr : numpy.array
Any 3-dimensional array-like object
Returns
-------
float
Maximum value along any side of the input array
"""
return np.max(
[
np.max([np.max(arr[i, :, :]) for i in [0, -1]]),
np.max([np.max(arr[:, j, :]) for j in [0, -1]]),
np.max([np.max(arr[:, :, k]) for k in [0, -1]]),
]
)