import logging
import math
import os
from copy import deepcopy
from typing import Dict, Literal, Tuple, Union
import numpy as np
import fdsreader.utils.fortran_data as fdtype
from fdsreader import settings
from fdsreader.fds_classes import Mesh
from fdsreader.utils import Quantity
_HANDLED_FUNCTIONS = {np.mean: (lambda pl: pl.mean)}
[docs]
def implements(np_function):
"""Decorator to register an __array_function__ implementation for Smoke3Ds."""
def decorator(func):
_HANDLED_FUNCTIONS[np_function] = func
return func
return decorator
[docs]
class SubSmoke3D:
"""Part of a smoke3d output for a single mesh.
:ivar mesh: The mesh containing the data.
:ivar upper_bounds: Numpy ndarray containing the maxmimum data value for each timestep.
:ivar times: Numpy ndarray containing all time steps for which data has been written out.
"""
def __init__(self, file_path: str, mesh: Mesh, upper_bounds: np.ndarray, times: np.ndarray):
self.mesh = mesh
self.upper_bounds = upper_bounds
self.times = times
self._file_path = file_path # Path to the binary data file
@property
def data(self) -> np.ndarray:
"""Method to lazy load the Smoke3D data of a single mesh."""
if not hasattr(self, "_data"):
with open(self._file_path, "rb") as infile:
dtype_header = fdtype.new((("i", 8),))
dtype_nchars = fdtype.new((("i", 2),))
header = fdtype.read(infile, dtype_header, 1)[0][0]
nx, ny, nz = int(header[3]), int(header[5]), int(header[7])
data_shape = (nx + 1, ny + 1, nz + 1)
self._data = np.empty((self.times.size,) + data_shape, dtype=np.float32)
for t in range(self.times.size):
fdtype.read(infile, fdtype.FLOAT, 1) # Skip time value
nchars_out = int(fdtype.read(infile, dtype_nchars, 1)[0][0][1])
if nchars_out > 0:
dtype_data = fdtype.new((("u", nchars_out),))
rle_data = fdtype.read(infile, dtype_data, 1)[0][0]
decoded_data = np.empty((nx + 1) * (ny + 1) * (nz + 1))
# Decode run-length-encoded data (see "RLE" subroutine in smvv.f90)
i = 0
mark = np.uint8(255)
out_pos = 0
while i < nchars_out:
if rle_data[i] == mark:
value = rle_data[i + 1]
repeats = rle_data[i + 2]
i += 3
else:
value = rle_data[i]
repeats = 1
i += 1
decoded_data[out_pos : out_pos + repeats] = value
out_pos += repeats
self._data[t, :, :, :] = decoded_data.reshape(data_shape, order="F")
return self._data
[docs]
def clear_cache(self):
"""Remove all data from the internal cache that has been loaded so far to free memory."""
if hasattr(self, "_data"):
del self._data
[docs]
class Smoke3D(np.lib.mixins.NDArrayOperatorsMixin):
"""Smoke3D file data container including metadata. Consists of multiple subsmokes, one for each
mesh.
:ivar times: Numpy ndarray containing all time steps for which data has been written out.
:ivar quantity: :class:`Quantity` object containing information about the recorded quantity and its unit.
"""
def __init__(self, root_path: str, times: np.ndarray, quantity: Quantity):
self._root_path = root_path
self.times = times
self.quantity = quantity
# List of all subsmokes this Smoke3D consists of (one per mesh).
self._subsmokes: Dict[str, SubSmoke3D] = dict()
def _add_subsmoke(self, filename: str, mesh: Mesh, upper_bounds: np.ndarray) -> SubSmoke3D:
subsmoke = SubSmoke3D(os.path.join(self._root_path, filename), mesh, upper_bounds, self.times)
self._subsmokes[mesh.id] = subsmoke
# If lazy loading has been disabled by the user, load the data instantaneously instead
if not settings.LAZY_LOAD:
_ = subsmoke.data
return subsmoke
[docs]
def to_global(
self, masked: bool = False, fill: float = 0, return_coordinates: bool = False
) -> Union[np.ndarray, Tuple[np.ndarray, Dict[Literal["x", "y", "z"], np.ndarray]]]:
"""Creates a global numpy ndarray from all subsmokes.
:param masked: Whether to apply the obstruction mask to the data or not.
:param fill: The fill value to use for masked entries. Only used when masked=True.
:param return_coordinates: If true, return the matching coordinate for each value on the generated grid.
"""
if len(self._subsmokes) == 0:
if return_coordinates:
return np.array([]), {d: np.array([]) for d in ("x", "y", "z")}
else:
return np.array([])
coord_min = {"x": math.inf, "y": math.inf, "z": math.inf}
coord_max = {"x": -math.inf, "y": -math.inf, "z": -math.inf}
for dim in ("x", "y", "z"):
for subsmoke in self._subsmokes.values():
co = subsmoke.mesh.coordinates[dim]
coord_min[dim] = min(co[0], coord_min[dim])
coord_max[dim] = max(co[-1], coord_max[dim])
# The global grid will use the finest mesh as base and duplicate values of the coarser
# meshes. Therefore, we first find the finest mesh and calculate the step size in each
# dimension.
step_sizes_min = {
"x": coord_max["x"] - coord_min["x"],
"y": coord_max["y"] - coord_min["y"],
"z": coord_max["z"] - coord_min["z"],
}
step_sizes_max = {"x": 0, "y": 0, "z": 0}
steps = dict()
global_max = {"x": -math.inf, "y": -math.inf, "z": -math.inf}
for dim in ("x", "y", "z"):
for subsmoke in self._subsmokes.values():
step_size = subsmoke.mesh.coordinates[dim][1] - subsmoke.mesh.coordinates[dim][0]
step_sizes_min[dim] = min(step_size, step_sizes_min[dim])
step_sizes_max[dim] = max(step_size, step_sizes_max[dim])
global_max[dim] = max(subsmoke.mesh.coordinates[dim][-1], global_max[dim])
for dim in ("x", "y", "z"):
if step_sizes_min[dim] == 0:
step_sizes_min[dim] = math.inf
steps[dim] = 1
else:
steps[dim] = (
max(int(round((coord_max[dim] - coord_min[dim]) / step_sizes_min[dim])), 1) + 1
) # + step_sizes_max[dim] / step_sizes_min[dim]
grid = np.full((self.n_t, steps["x"], steps["y"], steps["z"]), np.nan)
for subsmoke in self._subsmokes.values():
subsmoke_data = subsmoke.data.copy()
if masked:
mask = subsmoke.mesh.get_obstruction_mask(self.times)
start_idx = {
dim: int(round((subsmoke.mesh.coordinates[dim][0] - coord_min[dim]) / step_sizes_min[dim]))
for dim in ("x", "y", "z")
}
end_idx = {
dim: int(round((subsmoke.mesh.coordinates[dim][-1] - coord_min[dim]) / step_sizes_min[dim]))
for dim in ("x", "y", "z")
}
temp_data = dict()
temp_mask = dict()
for axis in range(3):
dim = ("x", "y", "z")[axis]
# Temporarily save border points to add them back to the array again later
if np.isclose(subsmoke.mesh.coordinates[dim][-1], global_max[dim]):
temp_data_slices = [slice(s) for s in subsmoke_data.shape]
end_idx[dim] += 1
temp_data_slices[axis + 1] = slice(subsmoke_data.shape[axis + 1] - 1, None)
temp_data[dim] = subsmoke_data[tuple(temp_data_slices)]
if masked:
temp_mask[dim] = mask[tuple(temp_data_slices)]
# We ignore border points unless they are actually on the border of the simulation space as all
# other border points actually appear twice, as the subslices overlap. This only
# applies for face_centered slices, as cell_centered slices will not overlap.
reduced_shape_slices = (slice(subsmoke.data.shape[0]),) + tuple(
slice(1, None) for s in subsmoke.data.shape[1:]
)
subsmoke_data = subsmoke_data[reduced_shape_slices]
if masked:
mask = mask[reduced_shape_slices]
n_repeat = max(
int(
round(
(subsmoke.mesh.coordinates[dim][1] - subsmoke.mesh.coordinates[dim][0])
/ step_sizes_min[dim]
)
),
1,
)
if n_repeat > 1:
subsmoke_data = np.repeat(subsmoke_data, n_repeat, axis=axis + 1)
if masked:
mask = np.repeat(mask, n_repeat, axis=axis + 1)
for axis in range(3):
dim = ("x", "y", "z")[axis]
# Add border points back again if needed
if np.isclose(subsmoke.mesh.coordinates[dim][-1], global_max[dim]):
temp_data_slices = [slice(s) for s in subsmoke_data.shape]
temp_data_slices[axis + 1] = slice(None)
subsmoke_data = np.concatenate(
(subsmoke_data, temp_data[dim][tuple(temp_data_slices)]), axis=axis + 1
)
if masked:
mask = np.concatenate((mask, temp_mask[dim][tuple(temp_data_slices)]), axis=axis + 1)
# If the slice should be masked, we set all cells at which an obstruction is in the
# simulation space to the fill value set by the user
if masked:
subsmoke_data = np.where(mask, subsmoke_data, fill)
grid[:, start_idx["x"] : end_idx["x"], start_idx["y"] : end_idx["y"], start_idx["z"] : end_idx["z"]] = (
subsmoke_data.reshape(
(
self.n_t,
end_idx["x"] - start_idx["x"],
end_idx["y"] - start_idx["y"],
end_idx["z"] - start_idx["z"],
)
)
)
if return_coordinates:
coordinates = dict()
for dim_index, dim in enumerate(("x", "y", "z")):
coordinates[dim] = np.linspace(coord_min[dim], coord_max[dim], grid.shape[dim_index + 1])
if return_coordinates:
return grid, coordinates
else:
return grid
def __getitem__(self, mesh: Mesh):
"""Returns the :class:`SubSmoke` that contains data for the given mesh."""
return self.get_subsmoke(mesh)
@property
def n_t(self) -> int:
"""Get the number of timesteps for which data was output."""
return len(self.times)
[docs]
def get_subsmoke(self, mesh: Mesh):
"""Returns the :class:`SubSmoke` that contains data for the given mesh."""
return self._subsmokes[mesh.id]
@property
def subsmokes(self):
"""Returns a list with one SubSmoke3D object per mesh."""
return list(self._subsmokes.values())
@property
def vmax(self):
"""Maximum value of all data at any time."""
curr_max = max(np.max(subsmoke3d.upper_bounds) for subsmoke3d in self._subsmokes.values())
if curr_max == np.float32(-1e33):
return max(np.max(subsmoke3d.data) for subsmoke3d in self._subsmokes.values())
return curr_max
[docs]
@implements(np.mean)
def mean(self) -> np.ndarray:
"""Calculates the mean value of all Smoke3D data for this quantity."""
return np.sum([np.mean(subsmoke.data) for subsmoke in self._subsmokes.values()]) / len(self._subsmokes)
[docs]
@implements(np.std)
def std(self) -> np.ndarray:
"""Calculates the standard deviation of all Smoke3D data for this quantity."""
mean = self.mean
sum = np.sum([np.sum(np.power(subsmoke.data - mean, 2)) for subsmoke in self._subsmokes.values()])
N = np.sum([subsmoke.data.size for subsmoke in self._subsmokes.values()])
return np.sqrt(sum / N)
[docs]
def clear_cache(self):
"""Remove all data from the internal cache that has been loaded so far to free memory."""
for subsmoke in self._subsmokes.values():
subsmoke.clear_cache()
def __array__(self):
"""Method that will be called by numpy when trying to convert the object to a numpy ndarray."""
raise UserWarning(
"Smoke3Ds can not be converted to numpy arrays, but they support all typical numpy"
" operations such as np.multiply. If a 'global' array containing all subsmokes is"
" required, please use the 'to_global' method and use the returned numpy-array explicitly."
)
def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
"""Method that will be called by numpy when using a ufunction with a Smoke3D as input.
:returns: A new smoke3d on which the ufunc has been applied.
"""
if method != "__call__":
logging.warning(
"The %s method has been used which is not explicitly implemented. Correctness of"
" results is not guaranteed. If you require this feature to be implemented please"
" submit an issue on Github where you explain your use case.",
method,
)
input_list = list(inputs)
for i, inp in enumerate(inputs):
if isinstance(inp, self.__class__):
del input_list[i]
new_smoke3d = deepcopy(self)
for subsmoke in self._subsmokes.values():
subsmoke._data = ufunc(subsmoke.data, input_list[0], **kwargs)
return new_smoke3d
def __array_function__(self, func, types, args, kwargs):
"""Method that will be called by numpy when using an array function with a Slice as input.
:returns: The output of the array function.
"""
if func not in _HANDLED_FUNCTIONS:
return NotImplemented
# Note: this allows subclasses that don't override __array_function__ to handle Smoke3Ds.
if not all(issubclass(t, self.__class__) for t in types):
return NotImplemented
return _HANDLED_FUNCTIONS[func](*args, **kwargs)
# __array_function__ implementations
# ...