# Copyright 2020 University of Groningen
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
Provides the DeferredFileWriter, which allow writing of files without affecting
existing files, until it is clear the written changes are correct.

from builtins import open as _open

import collections
import os
import pathlib
import shutil
import threading
import tempfile

from .log_helpers import StyleAdapter, get_logger

LOGGER = StyleAdapter(get_logger(__name__))

lock = threading.Lock()

[docs] class Singleton(type): """ Metaclass for creating singleton objects. Taken from [1]_. .. [1] """ _instances = {} def __call__(cls, *args, **kwargs): if cls not in cls._instances: with lock: if cls not in cls._instances: cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) return cls._instances[cls]
[docs] class DeferredFileWriter(metaclass=Singleton): """ A singleton class/object that is intended to prevent writing output to files that is invalid, due to e.g. warnings further down the pipeline. If this class is used to open a file for writing, a temporary file is created and returned instead. Once it's clear the output produced is valid the :meth:`~vermouth.file_writer.DeferredFileWriter.write` method can be used to finalize the written changes by moving them to their intended destination. If a file with that name already exists it is backed up according to the Gromacs scheme. """ def __init__(self): self.open_files = collections.deque() self._tmpdir = None
[docs] def open(self, filename, mode='r', *args, **kwargs): """ If mode is either 'w' or 'a', opens and returns a handle to a temporary file. If mode is 'r' opens and returns a handle to the file specified. Once :meth:`~vermouth.file_writer.DeferredFileWriter.write` is called the changes written to all files opened this way are propagated to their final destination. Parameters ---------- filename: os.PathLike The final name of the file to be opened. mode: str The mode in which the file is to be opened. *args: Passed to :func:`os.fdopen`. **kwargs: dict Passed to :func:`os.fdopen`. Returns ------- io.IOBase An opened file """ path = pathlib.Path(filename) # Make the path absolute, in case the current working directory is # changed between now and writing. Can't do path.resolve() due to py35 # requiring the file to exist. path = path.parent.resolve() / # Let's see if we already opened this file. If so, get the corresponding # temporary file. for tmp_path, open_path, _ in self.open_files: # Can't use Path.samefile, since the files don't have to exist yet if open_path == path: return _open(tmp_path, mode, *args, **kwargs) if '+' in mode or 'a' in mode or 'w' in mode: # Append and write return self._open_tmp_file(path, *args, mode=mode, **kwargs) elif 'r' in mode: # Read, do nothing special return _open(filename, mode, *args, **kwargs) raise KeyError('Unknown file mode.')
def _open_tmp_file(self, filename, mode='w', *args, **kwargs): suffix = filename.suffix with lock: handle, tmp_path = tempfile.mkstemp(suffix=suffix, dir=self._tmpdir) self.open_files.append([tmp_path, filename, mode]) if '+' in mode and 'r' in mode: # If r+, preserve original file contents. Otherwise, truncate. shutil.copy2(str(filename), tmp_path) return os.fdopen(handle, mode, *args, **kwargs) @staticmethod def _find_free_path(file_path): """ Find the first free (backup) path that looks like `file_path`. If file_path does not exist, returns file_path. Else, generates `#{name}.{idx}#`, incrementing idx until no file exists. Parameters ---------- file_path: pathlib.PathLike Returns ------- pathlib.Path The first path that does not exist yet """ file_path = pathlib.Path(file_path) backup_path = pathlib.Path(file_path) idx = 1 while backup_path.exists(): backup_path = file_path.with_name('#{name}.{idx}#'.format(, idx=idx)) idx += 1 return backup_path
[docs] def write(self): """ Finalize writing all open files by moving the created temporary files to their final destinations. Existing file destinations will be backed up according to the Gromacs scheme. """ while self.open_files: tmp_path, final_path, mode = self.open_files.popleft() if 'w' in mode or '+' in mode: # write self._write_file(tmp_path, final_path) elif 'r' in mode: # read = error raise AssertionError("Files opened with mode 'r' should not be " "treated special") elif 'a' in mode: # append self._append_file(tmp_path, final_path, mode) else: raise KeyError('Unknown file mode')
def _write_file(self, tmp_path, final_path): # There is no way to move a file and make it error if the destination # already exists, so use a lock instead. with lock: free_path = self._find_free_path(final_path) if free_path != final_path:'Backing up {} to {}.', final_path, free_path, type='general') shutil.move(str(final_path), str(free_path)) LOGGER.debug('Writing output to {}.', final_path, type='general') shutil.move(tmp_path, str(final_path)) @staticmethod def _append_file(tmp_path, final_path, mode='a'): """ Append the contents of tmp_path to final_path and remove tmp_path. """ if 'b' in mode: tmp_mode = 'rb' else: tmp_mode = 'r' with _open(str(final_path), mode=mode) as final_file, _open(tmp_path, mode=tmp_mode) as tmp_file: final_file.write( os.remove(tmp_path)
[docs] def close(self): """ Remove all produced temporary files. """ while self.open_files: tmp_path, *_ = self.open_files.popleft() try: os.remove(tmp_path) except FileNotFoundError: pass
def __del__(self): self.close()
# super().__del__() # object has no __del__ deferred_open = DeferredFileWriter().open