Source code for caelus.post.funcobj.sampling

# -*- coding: utf-8 -*-
# pylint: disable=too-few-public-methods

"""\
Sets and surfaces sampling
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This module implements the python interface to OpenFOAM's ``sets`` and
``surfaces`` sampling objects.

.. currentmodule: caelus.post.funcobj.sampling
.. autosummary::
   :nosignatures:

   SampledSets
   SampledSurfaces
   SampledSet
   SampledSurface

The different classes are illustrated using this example functionObject entry
in ``motorBike`` tutorial example::

   cuttingPlane
   {
       type            surfaces;
       libs            (sampling);
       writeControl    writeTime;

       surfaceFormat   vtk;
       fields          ( p U );

       interpolationScheme cellPoint;

       surfaces
       {
           yNormal
           {
               type            cuttingPlane;
               planeType       pointAndNormal;
               pointAndNormalDict
               {
                   point   (0 0 0);
                   normal  (0 1 0);
               }
               interpolate     true;
           }
       }
   }

When the above object is accessed via
:class:`~caelus.post.funcobj.functions.PostProcessing` class, the
``cuttingPlane`` object is represented by :class:`SampledSurfaces`, and the
``yNormal`` object is represented by :class:`SampledSurface` instance. Similar
relationship exists between :class:`SampledSets` and :class:`SampledSet`.

"""

import abc
import itertools
from pathlib import Path

import pandas as pd

from ...utils.vtk_helpers import pyvista
from .funcobj import DictMeta, FunctionObject


[docs] class SampledData(metaclass=DictMeta): """Base class for a single sampling object.""" def __init__(self, name, fobj_dict, parent): """Initialize data from input dictionary. Args: name (str): User-defined name for this sampling instance fobj_dict (CaelusDict): Input parameter dictionary parent (Sampling): Parent collection instance """ #: User-defined name for this sampling instance self.name = name #: Input dictionary containing data for this instance self.data = fobj_dict #: The parent sets/surfaces group instance self.parent = parent @property def fields(self): """Return the names of fields requested by user""" return self.data.get("fields", self.parent.fields) def __repr__(self): return f"<{self.__class__.__name__}: {self.name}>"
[docs] class SampledSet(SampledData): """A concrete ``set`` instance. Currently only ``raw``, ``vtk``, and ``csv`` formats are supported for ``setFormat`` if the user intends to load the dataset through this class. Example: >>> post = PostProcessing() # Access post-processing instance >>> sets = post['sampledSets1'] # Get the sets group >>> probes = sets['probe1'] # Access line probe data >>> df = probes() # Get dataframe for latest time >>> df1 = probes('10') # Get dataframe for different time """ _dict_properties = [ ( 'type', None, ( 'uniform', 'face', 'midPoint', 'midPointAndFace', 'cloud', 'patchCloud', 'patchSeed', 'polyLine', 'triSurfaceMeshPointSet', ), ), ('axis', None, "x y z xyz distance".split()), ('points', None), ] def __init__(self, name, fobj_dict, parent): """Initialize data from input dictionary. Args: name (str): User-defined name for this sampling instance fobj_dict (CaelusDict): Input parameter dictionary parent (Sampling): Parent collection instance """ super().__init__(name, fobj_dict, parent) self._cache = {} @property def num_coord_cols(self): """Return the number of expected columns for coordinates. If the ``axis`` is ``xyz`` then returns 3, else returns 1 for all other axis options. """ return 3 if self.axis == "xyz" else 1 @property def coord_cols(self): """Return names of the coordinates column. If ``axis == "xyz"`` then returns a list of 3 columns, else returns the column name defined by axis. """ return "x y z".split() if self.axis == "xyz" else [self.axis] def _file_fmt(self): """Return a shell wildcard glob expression for files. The wildcard glob is based on the user-defined name for this set instance and the ``setFormat`` option. """ ext_map = dict( raw=".xy", vtk=".vtk", csv=".csv", ) outfmt = self.parent.setFormat if outfmt not in ext_map: raise RuntimeError(f"{outfmt} not yet supported") return f"{self.name}*{ext_map[outfmt]}" def _extract_field_names(self, fname): """Extract field names from a file name.""" skip = len(self.name) + 1 filtered = fname.stem[skip:] return filtered.split("_") def _process_field_names(self, fields, ncols): """Return names for components. Args: fields (list): List of field names in the file ncols (int): Number of columns seen per field Return: list: A list of column names for a data file. """ if ncols == 3: return [ f"{x}_{y}" for x, y in itertools.product(fields, "x y z".split()) ] return [f"{x}_{y}" for x, y in itertools.product(fields, range(ncols))] def _load_raw_file(self, fname): """Load a raw format file. Loads files of the format ``<name>_<field>.xy``. Args: fname (str): Name of the file to read. Returns: pd.DataFrame: Pandas dataframe with the dataset. """ coords = self.coord_cols fields = self._extract_field_names(fname) df = pd.read_table(fname, delimiter=" ", index_col=False, header=None) ncols = (len(df.columns) - len(coords)) / len(fields) fnames = ( fields if ncols == 1 else self._process_field_names(fields, ncols) ) df.columns = coords + fnames return df def _load_vtk_file(self, fname): """Load a legacy VTK file and return data. Loads files of the format ``<name>_<field>.vtk``. Args: fname (str): Name of the file to read. Returns: pd.DataFrame: Pandas dataframe with the dataset. """ mesh = pyvista().read(fname) df = pd.DataFrame(mesh.points, columns="x y z".split()) for k in mesh.point_data.keys(): val = mesh.point_data[k] if val.ndim > 1: fnames = self._process_field_names(k, val.shape[-1]) df.loc[:, fnames] = val else: df[k] = val return df def __call__(self, time=None): """Load data for this set at a given time.""" reader_map = dict( raw=self._load_raw_file, vtk=self._load_vtk_file, ) dtime = str(time) if time else self.parent.latest_time if dtime in self._cache: return self._cache[dtime] dpath = Path(self.parent.root) / dtime if not dpath.exists(): raise FileNotFoundError(f"No data found: {dpath}") flist = dpath.glob(self._file_fmt()) file_fmt = self.parent.setFormat file_reader = reader_map[file_fmt] frames = [file_reader(ff) for ff in flist] if not frames: raise RuntimeError(f"Error loading data for {self.name}") df = pd.concat(frames, axis=1) df1 = df.loc[:, ~df.columns.duplicated()] self._cache = dict([(dpath.stem, df1)]) return df1
[docs] class SampledSurface(SampledData): """A concrete ``surface`` instance. Currently only ``vtk`` output format is supported for reading data. A ``pyvista.Mesh`` instance is returned and can be interacted using ``vtk.vtkPolyData`` methods. Example: >>> post = PostProcessing() # Access post-processing instance >>> surfaces = post['cuttingPlane'] # Get the surfaces group >>> plane = surfaces['yNormal'] # Access plane data >>> patch = plane() # Get dataframe for latest time >>> patch1 = plane('10') # Get dataframe for different time """ _dict_properties = [ ('type', None), ] def __init__(self, name, fobj_dict, parent): """Initialize data from input dictionary. Args: name (str): User-defined name for this sampling instance fobj_dict (CaelusDict): Input parameter dictionary parent (Sampling): Parent collection instance """ super().__init__(name, fobj_dict, parent) self._cache = {} def __call__(self, time=None): """Return the dataframe associated with a given time.""" dtime = str(time) if time else self.parent.latest_time if dtime in self._cache: return self._cache[dtime] dpath = Path(self.parent.root) / dtime fname = dpath / (self.name + ".vtp") if not fname.exists(): raise FileNotFoundError(f"Surface output not found: {fname}") mesh = pyvista().read(fname) self._cache = dict([(dpath.stem, mesh)]) return mesh
[docs] class Sampling(FunctionObject): """Base class for sets and surfaces sampling groups.""" _funcobj_libs = ["sampling"] _dict_properties = [ ('fields', None), ( 'interpolationScheme', 'cell', "cell cellPoint cellPointFace pointMVC cellPatchConstrained".split(), ), ] def __init__(self, name, obj_dict, *, casedir=None): super().__init__(name, obj_dict, casedir=casedir) #: Mapping of sampling instances to their names. self.samples = {}
[docs] def keys(self): """Return the names of the sampling entries""" return self.samples.keys()
def __getitem__(self, key): """Return the instance corresponding to user given name.""" return self.samples[key]
[docs] class SampledSets(Sampling): """A ``sets`` functionObjects entry. This class provides an interface to a group of sampled set instances. The instances are of type :class:`SampledSet`. """ _funcobj_type = "sets" _dict_properties = [ ('setFormat', 'raw', "raw gnuplot xmgr jplot vtk ensight csv".split()), ] def __init__(self, name, obj_dict, *, casedir=None): super().__init__(name, obj_dict, casedir=casedir) for entry in self.data.sets: for k, v in entry.items(): self.samples[k] = SampledSet(k, v, self)
[docs] class SampledSurfaces(Sampling): """A ``surfaces`` functionObjects entry. This class provides an interface to a group of sampled surface (planes or patches) instances. The instances are of type :class:`SampledSurface`. """ _funcobj_type = "surfaces" _dict_properties = [ ('surfaceFormat', 'vtk'), ] def __init__(self, name, obj_dict, *, casedir=None): super().__init__(name, obj_dict, casedir=casedir) surfaces = self.data.surfaces if isinstance(surfaces, list): for surf in surfaces: for k, v in surf.items(): self.samples[k] = SampledSurface(k, v, self) else: for k, v in surfaces.items(): self.samples[k] = SampledSurface(k, v, self)