Source code for caelus.post.logs

# -*- coding: utf-8 -*-

"""\
Caelus Log Analyzer
-------------------

This module provides utilities to parse and extract information from solver
outputs (log files) that can be used to monitor and analyze the convergence of
runs. It implements the :class:`SolverLog` class that can be used to access
time histories of residuals for various fields of interest.

Example:
    >>> logs = SolverLog()
    >>> print ("Available fields: ", logs.fields)
    >>> ux_residuals = logs.residual("Ux")

The actual extraction of the logs is performed by :class:`LogProcessor` which
uses regular expressions to match lines of interest and convert them into
tabular files suitable for loading with ``numpy.loadtxt`` or
``pandas.read_table``.
"""

import json
import logging
import os
from collections import OrderedDict
from os.path import join

import numpy as np

from ..utils import osutils
from ..utils.coroutines import coroutine, grep

_lgr = logging.getLogger(__name__)


[docs] class LogProcessor(object): """Process the log file and extract information for analysis. This is a low-level utility to parse log files and extract information using regular expressions from the log file. Users should interact with solver output using the :class:`SolverLog` class. """ # Regular expressions that match common lines of interest in a CML solver # output expressions = dict( time=r"^Time = (\S+)", courant=r"^Courant Number mean: (\S+) max: (\S+)", residual=r"(\S+): *Solving for (\S+), Initial residual = (\S+), Final residual = (\S+), No Iterations (\S+)", bounding=r"^bounding (\S+), min:\s*(\S+)\s+max:\s*(\S+)\s+average:\s*(\S+)", continuity=r"time step continuity errors : sum local = (\S+), global = (\S+), cumulative = (\S+)", exec_time=r"ExecutionTime = (\S+) s ClockTime = (\S+) s", convergence=r"(\S+) solution converged in (\S+) iterations", completion=r"^End$", exiting="^.*(CAELUS|OpenFOAM).*exiting", fatal_error="^.*(CAELUS|OpenFOAM).*FATAL ERROR", ) def __init__(self, logfile, case_dir=None, logs_dir="logs"): """ Args: logfile (str): Name of the Caelus log file casedir (path): Path to the case directory (default: cwd) logs_dir (path): Relative path to the directory where logs are written """ #: Absolute path to the case directory self.case_dir = case_dir or os.getcwd() #: Absolute path to the directory containing processed logs self.logs_dir = osutils.ensure_directory(join(self.case_dir, logs_dir)) #: User-supplied log file (relative to case directory) self.logfile = join(self.case_dir, logfile) #: Track the latest time that was processed by the utility self.time = 0.0 #: Time as a string (for output) self.time_str = "0" #: (variable, subIteration) pairs tracking the number of predictor #: subIterations for each flow variable self.subiter_map = {} #: Open file handles for the residual outputs self.res_files = OrderedDict() #: Open file handles for bounding outputs self.bound_files = OrderedDict() #: List of user-defined rules to process self._user_rules = [] #: Extra actions for pre-defined expressions self._extra_rules = {} #: Flag indicating convergence message in logs self.converged = False #: Flag indicating solver completion in logs (if End is found) self.solve_completed = False #: Flag indicating whether the solver failed self.failed = False #: Timestep when the steady state solver converged self.converged_time = -1 #: Flag indicating one timestep self._tick = False #: Flag indicating user exit (Ctrl+C) in watch mode self._user_exit = False def __call__(self): """Process log file""" pat_builtin = [grep(*x) for x in self._init_builtins()] patterns = pat_builtin + self._user_rules self._process_file(patterns) if self.solve_completed: self._save_state()
[docs] def watch_file(self, target=None, wait_time=0.1): """Process a log file for an in-progress run. This method takes one parameter, ``target``, a coroutine that is called at the end of every timestep. See :class:`~caelus.post.plots.LogWatcher` for an example of using target to plot residuals for monitoring the run. Args: target (coroutine): A consumer acting on the data wait_time (seconds): Wait time between checking the log file for updates """ import signal import time def _signal_handler(*args): """Handle quit signal for plot watching""" self._user_exit = True signal.signal(signal.SIGINT, _signal_handler) pat_builtin = [grep(*x) for x in self._init_builtins()] patterns = pat_builtin + self._user_rules with open(self.logfile) as fh: while (not self.solve_completed) and (not self._user_exit): line = fh.readline() if not line: time.sleep(wait_time) elif line.strip(): for pat in patterns: pat.send(line) if self._tick and target is not None: target.send(self.solve_completed) self._tick = False if self.solve_completed: self._save_state() return self._user_exit
[docs] def add_rule(self, regexp, actions): """Add a user-defined rule for processing Args: regexp (str): A string that can be compiled into a regexp action (func): A coroutine that can consume matching patterns """ act_list = actions if hasattr(actions, "append") else [actions] self._user_rules.append(grep(regexp, act_list))
[docs] def extend_rule(self, line_type, actions): """Extend a pre-defined regexp with extra functions The default action for LogProcessor is to output processed lines into files. Additional actions on pre-defined lines (e.g., "time") can be hooked via this method. Args: line_type (str): Pre-defined line type actions (list): A list of coroutines that receive the matching lines """ act_list = actions if hasattr(actions, "append") else [actions] if line_type not in self.expressions: raise RuntimeError("No pre-defined line type: %s" % line_type) self._extra_rules.setdefault(line_type, []).extend(act_list)
def _init_builtins(self): """Helper function to initialize builtin patterns""" for k, rexp in self.expressions.items(): func = getattr(self, "%s_processor" % k)() yield (rexp, [func] + self._extra_rules.get(k, [])) def _process_file(self, patterns): """Helper function to process logs of a completed run""" with open(self.logfile) as fh: for line in fh: for pat in patterns: pat.send(line) @property def current_state(self): """Return the current state of the logs processor""" curr_state = dict( logfile=os.path.basename(self.logfile), time=self.time, converged=self.converged, solve_completed=self.solve_completed, converged_time=self.converged_time, failed=self.failed, fields=list(self.res_files.keys()), bounding_fields=list(self.bound_files.keys()), ) return curr_state def _save_state(self, filename=".logs_state.json"): """Save state of the logs for future introspection""" curr_state = self.current_state with open(join(self.logs_dir, filename), 'w') as fh: json.dump(curr_state, fh)
[docs] @coroutine def time_processor(self): """Processor for the Time line in log files""" while True: rexp = yield self.time = float(rexp.group(1)) self.time_str = rexp.group(1) # Reset subIteration counters for k in self.subiter_map: self.subiter_map[k] = 0 self._tick = False
[docs] @coroutine def residual_processor(self): """Process a residual line and output data to the relevant file.""" def get_file(field, solver): """Helper method to get the file handle for a field. On first invocation, it creates the file with headers. On subsequent invocations it just returns the relevant file handle. """ if not field in self.res_files: fh = open(join(self.logs_dir, field + ".dat"), 'w') fh.write("# Field: %s; Solver: %s\n" % (field, solver)) fh.write( "Time SubIteration InitialResidual FinalResidual NoIterations\n" ) self.res_files[field] = fh return self.res_files[field] # end get_file try: while True: rexp = yield solver = rexp.group(1) # e.g., PCB, GAMG, etc. field = rexp.group(2) # Ux, Uy, p, etc. icorr = self.subiter_map.get(field, 0) + 1 self.subiter_map[field] = icorr fh = get_file(field, solver) fh.write( self.time_str + "\t%d\t" % icorr + "\t".join([rexp.group(i) for i in range(3, 6)]) + "\n" ) except GeneratorExit: for fh in self.res_files.values(): if not fh.closed: fh.close()
[docs] @coroutine def bounding_processor(self): """Process the bounding lines""" def get_file(field): """Helper method to get the file handle for a field. On first invocation, it creates the file with headers. On subsequent invocations it just returns the relevant file handle. """ if not field in self.bound_files: fh = open( join(self.logs_dir, "bounding_" + field + ".dat"), 'w' ) fh.write("# Bounding Field: %s\n" % (field)) fh.write("Time SubIteration Min Max Average\n") self.bound_files[field] = fh return self.bound_files[field] # end get_file try: while True: rexp = yield field = rexp.group(1) icorr = self.subiter_map.get(field, 0) fh = get_file(field) fh.write( self.time_str + "\t%d\t" % icorr + "\t".join(rexp.group(i) for i in range(2, 5)) + "\n" ) except GeneratorExit: for fh in self.bound_files.values(): if not fh.closed: fh.close()
[docs] @coroutine def continuity_processor(self): """Process continuity error lines from log file""" with open(join(self.logs_dir, "continuity_errors.dat"), 'w') as fh: fh.write( "Time SubIteration LocalError GlobalError CumulativeError\n" ) while True: rexp = yield icorr = self.subiter_map.get('continuity', 0) + 1 self.subiter_map['continuity'] = icorr fh.write( self.time_str + "\t%d\t" % icorr + "\t".join(x for x in rexp.groups()) + "\n" )
[docs] @coroutine def exec_time_processor(self): """Process execution/clock time lines""" with open(join(self.logs_dir, "clock_time.dat"), 'w') as fh: fh.write("Time ExecutionTime ClockTime\n") while True: rexp = yield fh.write( self.time_str + "\t" + "\t".join(x for x in rexp.groups()) + "\n" ) self._tick = True
[docs] @coroutine def courant_processor(self): """Process Courant Number lines""" with open(join(self.logs_dir, "courant.dat"), 'w') as fh: fh.write("Time CoMean CoMax\n") while True: rexp = yield fh.write( self.time_str + "\t" + "\t".join(x for x in rexp.groups()) + "\n" )
[docs] @coroutine def convergence_processor(self): """Process convergence information (steady solvers only)""" while True: rexp = yield self.converged = True self.converged_time = int(rexp.group(2))
[docs] @coroutine def completion_processor(self): """Process End line indicating solver completion""" while True: _ = yield self.solve_completed = True
[docs] @coroutine def exiting_processor(self): """Process exiting option""" while True: _ = yield self.failed = True self.converged = False self.solve_completed = False
[docs] @coroutine def fatal_error_processor(self): """Process CAELUS FATAL ERROR line""" while True: _ = yield self.failed = True self.converged = False self.solve_completed = False
[docs] class SolverLog(object): """Caelus solver log file interface. :class:`SolverLog` extracts information from solver outputs and allows interaction with the log data as ``numpy.ndarray`` or ``pandas.Dataframe`` objects. """ def __init__( self, case_dir=None, logs_dir="logs", force_reload=False, logfile=None ): """ Args: case_dir (path): Absolute path to case directory logs_dir (path): Path to logs directory relative to case_dir force_reload (bool): If True, force reread of the log file even if the logs were processed previously. logfile (file): If force_reload, then log file to process Raises: RuntimeError: An error is raised if no logs directory is available and the user has not provided a logfile that can be processed on the fly during initialization. """ self.casedir = case_dir or os.getcwd() self.logs_dir = os.path.join(self.casedir, logs_dir) has_logs = os.path.exists( os.path.join(self.logs_dir, ".logs_state.json") ) data = {} if not has_logs and logfile is None: raise RuntimeError( "Cannot find processed logs data. " "Provide a valid log file." ) elif has_logs: data = json.load( open(os.path.join(self.logs_dir, ".logs_state.json")) ) if logfile: inpfile = os.path.basename(logfile) oldfile = os.path.basename(data.get("logfile", None)) if (oldfile is None) or (inpfile != oldfile): force_reload = True self.solve_completed = False self.failed = False self.fields = [] self.bounding_fields = [] if force_reload or not has_logs: logs = LogProcessor(logfile, self.casedir, logs_dir) logs() data = logs.current_state for key, val in data.items(): setattr(self, key, val) if not self.solve_completed: _lgr.warning("Solve not completed in %s", self.casedir) if self.failed: _lgr.warning("Detected failed run in %s", self.casedir)
[docs] def residual(self, field, all_cols=False): """Return the residual time-history for a field""" if field not in self.fields: raise KeyError( "Invalid field name: %s. Valid fields are: %s" % (field, self.fields) ) fname = os.path.join(self.logs_dir, field + ".dat") data = np.loadtxt(fname, skiprows=2) return data if all_cols else data[:, :3]
[docs] def bounding_var(self, field): """Return the bounding information for a field""" if field not in self.bounding_fields: raise KeyError( "Invalid field name: %s. Valid fields are: %s" % (field, self.bounding_fields) ) fname = os.path.join(self.logs_dir, "bounding_" + field + ".dat") data = np.loadtxt(fname, skiprows=2) return data
[docs] def continuity_errors(self): """Return the time history of continuity errors""" fname = os.path.join(self.logs_dir, "continuity_errors.dat") data = np.loadtxt(fname, skiprows=1) return data
[docs] def clock_time(self): """Return the clock time table""" fname = os.path.join(self.logs_dir, "clock_time.dat") data = np.loadtxt(fname, skiprows=1) return data