Source code for edges_analysis._workflow

"""Functions for dealing with CLI workflow and progressfiles.

The progress file is simply a YAML file consisting purely of a list of steps in the
analysis, as defined by a ``workflow.yaml``. Each `step` in the progress file has the
format::

    name: <name of step>
    function: <function that was run at this step>
    params:
        param1: value
        param2: value
        ...
    write: <<FORMAT_STRING>>.gsh5
    filemap:
    - inputs:
      - input_file1.gsh5
      outputs:
      - output_file1.gsh5
    - inputs:
      - input_file2.gsh5
      outputs:
      - output_file2.gsh5

The ``filemap`` is thus a list of dicts, where each dict has two keys: ``inputs`` and
``outputs``. Each of these is a list of files. Each element in the top-level list of
``filemap`` is an "object" that gets processed together through a processing function.
For example, if the ``filemap`` has two elements, each with one input and one output,
then the processing function will be called twice, once for each element in the list,
and each time a single file will be passed in, and a single file will be passed out.
The most common other case is that the ``filemap`` has one element, with multiple
inputs and a single output -- this happens for example with LST-binning, where we
take multiple files and combine them into one.

The format of the progressfile is thus exactly the same as the user-defined workflow
YAML, except with the addition of this ``filemap``, which is updated as the processing
occurs, and keeps track of the status of the processing.

Some things to note about the progressfile and its filemap. First, if a step has no
``"write"`` parameter, then it won't write any files to disk (the objects it creates
will simply be passed on to the next step in-memory). In this case, the ``filemap`` will
be empty -- no inputs OR outputs. Secondly, the way the CLI uses this progressfile is
as follows:

The first time the CLI is run for a given workflow, a new progressfile is created. The
inputs specified on command-line (via `-i`) are added as inputs to the `convert` step
in the progressfile. The CLI then runs through the steps in the workflow. On each step,
it checks out the files it might need to read for that step using the
:meth:`ProgressFile.get_files_to_read_for_step()` method. This method checks the current
step as it exists in the progressfile for any inputs, and ALSO the outputs of the
previous step, which are assumed to be required for the current step. It will return
any of these files that don't appear as inputs to this or subsequent steps in the
progressfile, where they are already associated with existing outputs (which would mean
they've already been processed). It further trims the list by removing any files that
have already been read in previous steps (since sometimes a step writes back to its
own file). As it processes each step, if the step has a ``"write"`` parameter, it will
write the output files to disk, and add them to the ``filemap`` for the step (and
write to the progressfile).

The next time the CLI is run for the workflow (when a progressfile exists already), it
will first check the progressfile against the workflow to ensure that the workflow
hasn't changed since the last run. If it has, it will delete all the files that were
generated by that modified step and all subsequent steps (both on disk and in the
progressfile). Any steps that have been _appended_ to the workflow will simply be added
on to the progressfile. It will then run the workflow again, starting from the latest
step possible (i.e. the step after the last step that was run successfully AND output
files).

The user can also add more raw input files on the second run-through, which will by
default be appended to the raw files from the original step. In this case, the CLI will
first make sure it deletes any files that were generated by *combining* files down
the pipeline, since these will change with the added files. However, it will *not*
delete files that are simply generated from a single input, as these should still be
valid. To figure out which files it needs to read at each step, it will check the
files it needs using the :meth:`ProgressFile.get_files_to_read_for_step()` method, as
described above. Since files that ahve already been processed will have valid outputs
at subsequent steps, these will be ignored until they are required down the pipe.
"""

from __future__ import annotations

import attrs
import yaml
from copy import deepcopy
from frozendict import frozendict
from jinja2 import Template
from logging import getLogger
from pathlib import Path
from typing import Any, Iterable, Tuple, Union

from .gsdata import GSDATA_PROCESSORS

try:
    from typing import Self
except ImportError:
    from typing_extensions import Self

Pathy = Union[str, Path]


logger = getLogger(__name__)


def _setpath_converter(value: Iterable[Pathy]) -> set[Path]:
    return frozenset(map(Path, value))


[docs] @attrs.define(hash=True) class FileMapEntry: """A single entry in a filemap (input -> output).""" inputs: frozenset[Path] = attrs.field(converter=_setpath_converter) outputs: frozenset[Path] = attrs.field(converter=_setpath_converter)
[docs] def asdict(self) -> dict[str, list[str]]: """Get the filemap entry as a dictionary. Useful for serializing to YAML. """ return { "inputs": list(map(str, self.inputs)), "outputs": list(map(str, self.outputs)), }
_FileMapType = Tuple[Iterable[Pathy], Iterable[Pathy]] def _filemap_converter( value: Iterable[FileMapEntry | dict | _FileMapType], ) -> set[FileMapEntry]: if isinstance(value, FileMap): return value.maps out = [] for v in value: if isinstance(v, FileMapEntry): out.append(v) elif isinstance(v, dict): out.append(FileMapEntry(**v)) else: out.append(FileMapEntry(inputs=v[0], outputs=v[1])) return set(out)
[docs] @attrs.define class FileMap: """An object representing the full file-map for a processing step. Essentially just a set of FileMapEntry objects. """ maps: set[FileMapEntry] = attrs.field(factory=set, converter=_filemap_converter) def __iter__(self): """Iterate over the filemap.""" return iter(self.maps) def __len__(self): """Get the length of the filemap.""" return len(self.maps) def __bool__(self): """Check if the filemap is empty.""" return bool(self.maps)
[docs] def add(self, maps: Iterable[FileMapEntry | dict | _FileMapType]): """Add to the filemap.""" self.maps.update(_filemap_converter(maps))
[docs] def remove(self, fl: Pathy): """Remove an input file from the filemap.""" for fmap in list(self.maps): if Path(fl) in fmap.inputs: self.maps.remove(fmap)
[docs] def as_yamlable(self) -> list[_FileMapType]: """Get the filemap as a YAML-serializable object.""" return [m.asdict() for m in self.maps]
[docs] def clear(self): """Clear the filemap.""" self.maps.clear()
class WorkflowProgressError(RuntimeError): """Exception raised when the workflow and progress files are discrepant.""" pass
[docs] @attrs.define() class WorkflowStep: """A single step in a workflow.""" function: str = attrs.field(converter=str.lower) name: str = attrs.field(converter=str.lower) params: frozendict = attrs.field(factory=frozendict) filemap: FileMap = attrs.field(factory=FileMap, converter=FileMap) write: str | None = attrs.field(default=None) @name.default def _default_name(self): return self.function @function.validator def _validate_function(self, attribute, value): if value != "convert" and value not in GSDATA_PROCESSORS: raise ValueError( f"Unknown function {value}. Available: {GSDATA_PROCESSORS.keys()}" )
[docs] def get_all_outputs(self) -> set[Path]: """Get a list of output files from a step.""" out = set() for fmap in self.filemap: out = out.union(fmap.outputs) return out
[docs] def get_all_inputs(self) -> set[Path]: """Get a list of input files from a step.""" out = set() for fmap in self.filemap: out = out.union(fmap.inputs) return out
[docs] def asdict(self, files: bool = True) -> dict[str, Any]: """Get the step as a dictionary.""" d = attrs.asdict(self, recurse=False) if not files: d.pop("filemap") return d
[docs] def compat(self, other: WorkflowStep) -> bool: """Check if two steps are compatible.""" return self.asdict(files=False) == other.asdict(files=False)
@property def _function(self) -> callable: return GSDATA_PROCESSORS[self.function] @property def kind(self) -> str: """Get the kind of step.""" return self._function.kind def __call__(self, *data): """Call the step function.""" return self._function(*data, **self.params)
[docs] def get_output_path(self, outdir: Path, infile: Path) -> Path | None: """Get the output path for the step.""" if self.write is None: return None # Now, use templating to create the actual filename fname = self.write.format( prev_stem=infile.stem, prev_dir=infile.parent, fncname=self.function, **self.params, ) fname = Path(fname) if not fname.is_absolute(): fname = Path(outdir) / fname return fname
[docs] def has_input(self, fl: Pathy) -> bool: """Check if the step has a given input file.""" inputs = self.get_all_inputs() return Path(fl) in inputs
[docs] def has_output(self, fl: Pathy) -> bool: """Check if the step has a given output file.""" outputs = self.get_all_outputs() return Path(fl) in outputs
[docs] def get_outputs_for_input(self, fl: Pathy) -> set[Path]: """Get the outputs for a given input file.""" outs = set() for fmap in self.filemap: if Path(fl) in fmap.inputs: outs.update(fmap.outputs) return outs
[docs] def add_to_filemap(self, maps: Iterable[FileMapEntry | dict | _FileMapType]): """Add to the filemap.""" self.filemap.add(maps)
[docs] def remove_from_filemap(self, fl: Pathy): """Remove an input file from the filemap.""" self.filemap.remove(fl)
[docs] @attrs.define() class Workflow: steps: list[WorkflowStep] = attrs.field(factory=list) @steps.validator def _validate_steps(self, attribute, value): for step in value: if not isinstance(step, WorkflowStep): raise TypeError( f"Workflow steps must be of type WorkflowStep, got {type(step)}" ) all_names = [step.name for step in value] for name in all_names: if all_names.count(name) > 1: raise ValueError( f"Duplicate step name {name}. " "Please give one of the steps an explicit 'name'." )
[docs] @classmethod def read(cls, workflow: Pathy) -> Self: """Read a workflow from a file.""" with open(workflow) as fl: workflowd = yaml.load(fl, Loader=yaml.FullLoader) global_params = workflowd.pop("globals", {}) with open(workflow) as fl: txt = Template(fl.read()) txt = txt.render(globals=global_params) workflow = yaml.load(txt, Loader=yaml.FullLoader) steps = workflow.pop("steps") steps = [WorkflowStep(**step) for step in steps] return cls(steps=steps)
[docs] def write_as_progressfile(self, progressfile: Pathy): """Write the workflow as a progressfile.""" progress = [attrs.asdict(step, recurse=False) for step in self.steps] # Make all the paths strings for YAML serialization. for step in progress: step["filemap"] = step["filemap"].as_yamlable() step["params"] = dict(step["params"]) # convert frozendict to dict with open(progressfile, "w") as fl: yaml.dump(progress, fl)
def __getitem__(self, key: int | str): """Get a step from the workflow.""" if isinstance(key, int): return self.steps[key] elif isinstance(key, str): return self.steps[[s.name for s in self.steps].index(key)] def __setitem__(self, key: int | str, value: WorkflowStep): """Set a step in the workflow.""" if isinstance(key, int): self.steps[key] = value elif isinstance(key, str): self.steps[[s.name for s in self.steps].index(key)] = value else: raise TypeError(f"Invalid key type {type(key)}. Must be int or str.") # re-validate self._validate_steps("steps", self.steps) def __delitem__(self, key): """Delete a step in the workflow.""" if isinstance(key, str): key = [s.name for s in self.steps].index(key) del self.steps[key]
[docs] def clear_after(self, key): """Clear all steps after the given one.""" if isinstance(key, str): key = [s.name for s in self.steps].index(key) while len(self) > key: del self[key]
def __contains__(self, key: str) -> bool: """Check if a step is in the workflow.""" return key in [s.name for s in self.steps] def __iter__(self): """Iterate over the steps in the workflow.""" return iter(self.steps)
[docs] def append(self, step: WorkflowStep): """Append a step to the workflow.""" self.steps.append(step) # re-validate self._validate_steps("steps", self.steps)
[docs] def index(self, key: str) -> int: """Get the index of a step.""" return [s.name for s in self.steps].index(key)
[docs] def insert(self, index: int, step: WorkflowStep): """Insert a step into the workflow.""" self.steps.insert(index, step) # re-validate self._validate_steps("steps", self.steps)
def __len__(self): """Get the length of the workflow.""" return len(self.steps)
[docs] @attrs.define() class ProgressFile: path: Path = attrs.field(converter=Path) workflow: Workflow = attrs.field(factory=list) @path.validator def _validate_path(self, attribute, value): if not value.exists(): raise ValueError(f"Progress file {value} does not exist.")
[docs] @classmethod def create(cls, progressfile: Pathy, workflow: Workflow, inputs: list[Path] = None): """Create a new progressfile.""" # Make a copy workflow = deepcopy(workflow) if any(bool(s.filemap) for s in workflow.steps): raise ValueError( "Cannot create a new progressfile for a workflow with filemaps " "already set." ) # Now, add the inputs to the convert step. if "convert" in workflow: workflow["convert"].add_to_filemap([([p], []) for p in inputs]) workflow.write_as_progressfile(progressfile) return cls(path=progressfile, workflow=workflow)
[docs] @classmethod def read(cls, progressfile: Pathy) -> Self: """Read the progressfile.""" with open(progressfile) as openfile: progress = yaml.load(openfile, Loader=yaml.FullLoader) progress = Workflow([WorkflowStep(**p) for p in progress]) return cls(path=progressfile, workflow=progress)
def __getitem__(self, key: int | str): """Get a step from the workflow.""" return self.workflow[key] def __contains__(self, key: str) -> bool: """Check if a step is in the workflow.""" return key in self.workflow def __iter__(self): """Iterate over the steps in the workflow.""" return iter(self.workflow) def __len__(self): """Get the length of the workflow.""" return len(self.workflow)
[docs] def has_input(self, fl: Pathy) -> bool: """Whether a given file is an input to the progressfile.""" fl = Path(fl) return any(step.has_input(fl) for step in self)
[docs] def add_inputs(self, inputs: Iterable[Path]): """Add inputs to the progressfile.""" inputs = list(map(Path, inputs)) if "convert" not in self: raise ValueError( "Cannot add inputs to a progressfile with no convert step." ) current = self["convert"].get_all_inputs() if all(p in current for p in inputs): # Everything's there already, do nothing. return new = current.union(inputs) self["convert"].add_to_filemap([([p], []) for p in new]) # We also need to remove files that are gotten by combining datafiles, # because if we're adding new inputs, these files will end up changing. blastoff = False for step in self: if step.name == "convert": continue if step.kind == "gather": blastoff = True if blastoff: for fl in step.get_all_outputs(): if Path(fl).exists(): Path(fl).unlink() step.filemap.clear() self.workflow.write_as_progressfile(self.path)
[docs] def remove_inputs(self, inputs: Iterable[Path]): """Remove inputs from the progressfile.""" inputs = list(map(Path, inputs)) if "convert" not in self: raise ValueError( "Cannot remove inputs from a progressfile with no convert step." ) current = self["convert"].get_all_inputs() if not any(p in current for p in inputs): # Everything's there already, do nothing. return for inp in inputs: self["convert"].remove_from_filemap(inp) # We also need to remove files that are gotten by combining datafiles, # because if we're remove new inputs, these files will end up changing. blastoff = False for step in self: if step.name == "convert": continue if step.kind == "gather": blastoff = True if blastoff: for fl in step.get_all_outputs(): if Path(fl).exists(): Path(fl).unlink() step.filemap.clear() self.workflow.write_as_progressfile(self.path)
[docs] def update_step(self, key: str, filemap: FileMap | _FileMapType): """Update the progress file with new filemaps for a step.""" try: step = self[key] except (KeyError, ValueError): raise ValueError(f"Progress file has no step called '{key}'") step.filemap.add(filemap) self.workflow.write_as_progressfile(self.path)
[docs] def harmonize_with_workflow( self, workflow: Workflow, error: bool = True, start: str = None, ) -> Self: """Check the compatibility of the current steps with the progressfile.""" start_changing = False for i, step in enumerate(workflow): if i >= len(self): # This is the case when new steps are added to the workflow since # last run. self.workflow.append(step) else: ps = self[i] if not start_changing and not ps.compat(step): start_changing = True if error: raise WorkflowProgressError( "The workflow is in conflict with the progressfile at step " f"'{step.name}'. To remove conflicting outputs and adopt " "new workflow, run with --ignore-conflicting. To keep the " "existing outputs and branch off with the new workflow, run" " the 'fork' command." ) else: logger.warning( f"Removing progress for step {step.name} and beyond " "because this step has changed since the last run." ) if step.name == start: start_changing = True if start_changing: # Get rid of all steps after this one. self.workflow.clear_after(i) outputs = ps.get_all_outputs() for fl in outputs: # ensure file is in outdir. Raw input files (eg. ACQ files) # should not be deleted. if self.path.parent in fl.parents: Path(fl).unlink(missing_ok=True) self.workflow.append(step) self.workflow.write_as_progressfile(self.path)
[docs] def get_files_to_read_for_step(self, stepname: str) -> set[Path]: """Get all the files we need to read for a given step.""" # First, get most recent outputs. current_index = self.workflow.index(stepname) # The potential files we need for this step are the listed inputs to this step # AND the outputs of the last step, if any. NOTE: we don't need to check # steps BEFORE the last one, because if they had outputs and the last one # didn't, it needed to have been run, and therefore those outputs will already # exist as data that has been read. potential_files = self.workflow[current_index].get_all_inputs() if current_index > 0: potential_files.update(self.workflow[current_index - 1].get_all_outputs()) def _check_fl(fl): # Check if an input file (fl) for the current step appears as an input file # for a later step, and whether all the output files for that input file # exist. If so, we don't need to read the file again. for i, step in enumerate(self): if i < current_index: continue out = step.get_outputs_for_input(fl) # Ensure it *has* an output associated, AND that all the output files # exist. if out and all(x.exists() for x in out): return False return True return {fl for fl in potential_files if _check_fl(fl.absolute())}