Source code for caelus.io.caelusdict

# -*- coding: utf-8 -*-
# pylint: disable=protected-access, import-outside-toplevel

"""\
Caelus/OpenFOAM Dictionary Implementation
-----------------------------------------
"""

import re

import six

from ..config import cmlenv
from ..utils import osutils, struct
from . import dtypes
from .printer import DictPrinter


[docs] class CaelusDict(struct.Struct): """Caelus Input File Dictionary""" def __str__(self): strbuf = six.StringIO() pprint = DictPrinter(strbuf) pprint(self) return strbuf.getvalue() def _foam_load_include(self, fname, env=None): """Load an include file with given name""" # Prevent circular imports from .dictfile import DictFile fname = fname.strip('"') fname = osutils.abspath(fname) out = DictFile.load(filename=fname).data tmp = out._foam_expand_includes(env) return tmp def _foam_load_etc_include(self, fname, env=None): """Load an `includeEtc` directive""" cenv = env or cmlenv.cml_get_version() efile = cenv.etc_file(fname.strip('"')) return self._foam_load_include(efile) def _foam_expand_includes(self, env=None): """Expand all macros/include directives""" def has_includes(din): """Check whether a dictionary has include directive""" return any( isinstance(dval, dtypes.Directive) and "#include" in dval.directive for dval in din.values() ) def _update(din, dout): if has_includes(din): dout.update(din._foam_expand_includes(env)) else: for k, v in din.items(): if isinstance(v, CaelusDict): _update(v, dout.setdefault(k, CaelusDict())) else: dout[k] = v out = self.__class__() cenv = env or cmlenv.cml_get_version() for k, val in self.items(): if isinstance(val, CaelusDict): dout = out.setdefault(k, CaelusDict()) _update(val, dout) elif ( isinstance(val, dtypes.Directive) and val.directive == "#includeEtc" ): out.update(self._foam_load_etc_include(val.value, cenv)) elif ( isinstance(val, dtypes.Directive) and val.directive == "#include" ): out.update(self._foam_load_include(val.value, cenv)) elif ( isinstance(val, dtypes.Directive) and val.directive == "#includeIfPresent" and osutils.path_exists(val.value.strip('"')) ): out.update(self._foam_load_include(val.value, cenv)) else: out[k] = val return out def _process_removes(self): """Process ``#remove`` directives""" keys = list(self.keys()) removes = [] for k in keys: val = self[k] if isinstance(val, dtypes.Directive) and ( val.directive == "#remove" ): keyre = re.compile(val.value.strip('"')) removes.append(keyre) self.pop(k) keys = list(self.keys()) for k in keys: for krexp in removes: if krexp.match(k) and k in self: self.pop(k) return self def _foam_expand_macros(self, root=None): """Expand all dictionary macros""" _root = root or [] keys = list(self.keys()) # Perform substitutions recursively for ii, k in enumerate(keys): val = self[k] if isinstance(val, dtypes.MacroSubstitution): kk = val.value.strip('"${}') found = False krem = keys[ii:] for dd in reversed(_root): if kk not in dd: continue sdict = dd[kk] for kkk, vvv in sdict.items(): if kkk not in krem: self[kkk] = vvv found = True break if not found: # raise ValueError(f"Cannot substitute {val.value}") print(f"Key not found: {val.value}") else: self.pop(k) elif isinstance(val, CaelusDict): val._foam_expand_macros(_root + [self]) self._process_removes() return self