# Copyright 2020 University of Groningen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Provides the DeferredFileWriter, which allow writing of files without affecting
existing files, until it is clear the written changes are correct.
"""
from builtins import open as _open
import collections
import os
import pathlib
import shutil
import threading
import tempfile
from .log_helpers import StyleAdapter, get_logger
LOGGER = StyleAdapter(get_logger(__name__))
lock = threading.Lock()
[docs]
class Singleton(type):
"""
Metaclass for creating singleton objects. Taken from [1]_.
.. [1] https://stackoverflow.com/questions/50566934/why-is-this-singleton-implementation-not-thread-safe/50567397
"""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
with lock:
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
[docs]
class DeferredFileWriter(metaclass=Singleton):
"""
A singleton class/object that is intended to prevent writing output to files
that is invalid, due to e.g. warnings further down the pipeline.
If this class is used to open a file for writing, a temporary file is created
and returned instead. Once it's clear the output produced is valid the
:meth:`~vermouth.file_writer.DeferredFileWriter.write` method can be used to
finalize the written changes by moving them to their intended destination.
If a file with that name already exists it is backed up according to the
Gromacs scheme.
"""
def __init__(self):
self.open_files = collections.deque()
self._tmpdir = None
[docs]
def open(self, filename, mode='r', *args, **kwargs):
"""
If mode is either 'w' or 'a', opens and returns a handle to a temporary
file. If mode is 'r' opens and returns a handle to the file specified.
Once :meth:`~vermouth.file_writer.DeferredFileWriter.write` is called
the changes written to all files opened this way are propagated to their
final destination.
Parameters
----------
filename: os.PathLike
The final name of the file to be opened.
mode: str
The mode in which the file is to be opened.
*args: collections.abc.Iterable
Passed to :func:`os.fdopen`.
**kwargs: dict
Passed to :func:`os.fdopen`.
Returns
-------
io.IOBase
An opened file
"""
path = pathlib.Path(filename)
# Make the path absolute, in case the current working directory is
# changed between now and writing. Can't do path.resolve() due to py35
# requiring the file to exist.
path = path.parent.resolve() / path.name
# Let's see if we already opened this file. If so, get the corresponding
# temporary file.
for tmp_path, open_path, _ in self.open_files:
# Can't use Path.samefile, since the files don't have to exist yet
if open_path == path:
return _open(tmp_path, mode, *args, **kwargs)
if '+' in mode or 'a' in mode or 'w' in mode: # Append and write
return self._open_tmp_file(path, *args, mode=mode, **kwargs)
elif 'r' in mode: # Read, do nothing special
return _open(filename, mode, *args, **kwargs)
raise KeyError('Unknown file mode.')
def _open_tmp_file(self, filename, mode='w', *args, **kwargs):
suffix = filename.suffix
with lock:
handle, tmp_path = tempfile.mkstemp(suffix=suffix, dir=self._tmpdir)
self.open_files.append([tmp_path, filename, mode])
if '+' in mode and 'r' in mode:
# If r+, preserve original file contents. Otherwise, truncate.
shutil.copy2(str(filename), tmp_path)
return os.fdopen(handle, mode, *args, **kwargs)
@staticmethod
def _find_free_path(file_path):
"""
Find the first free (backup) path that looks like `file_path`. If
file_path does not exist, returns file_path. Else, generates
`#{name}.{idx}#`, incrementing idx until no file exists.
Parameters
----------
file_path: pathlib.PathLike
Returns
-------
pathlib.Path
The first path that does not exist yet
"""
file_path = pathlib.Path(file_path)
backup_path = pathlib.Path(file_path)
idx = 1
while backup_path.exists():
backup_path = file_path.with_name('#{name}.{idx}#'.format(name=file_path.name, idx=idx))
idx += 1
return backup_path
[docs]
def write(self):
"""
Finalize writing all open files by moving the created temporary files to
their final destinations.
Existing file destinations will be backed up according to the Gromacs
scheme.
"""
while self.open_files:
tmp_path, final_path, mode = self.open_files.popleft()
if 'w' in mode or '+' in mode: # write
self._write_file(tmp_path, final_path)
elif 'r' in mode: # read = error
raise AssertionError("Files opened with mode 'r' should not be "
"treated special")
elif 'a' in mode: # append
self._append_file(tmp_path, final_path, mode)
else:
raise KeyError('Unknown file mode')
def _write_file(self, tmp_path, final_path):
# There is no way to move a file and make it error if the destination
# already exists, so use a lock instead.
with lock:
free_path = self._find_free_path(final_path)
if free_path != final_path:
LOGGER.info('Backing up {} to {}.', final_path, free_path, type='general')
shutil.move(str(final_path), str(free_path))
LOGGER.debug('Writing output to {}.', final_path, type='general')
shutil.move(tmp_path, str(final_path))
@staticmethod
def _append_file(tmp_path, final_path, mode='a'):
"""
Append the contents of tmp_path to final_path and remove tmp_path.
"""
if 'b' in mode:
tmp_mode = 'rb'
else:
tmp_mode = 'r'
with _open(str(final_path), mode=mode) as final_file, _open(tmp_path, mode=tmp_mode) as tmp_file:
final_file.write(tmp_file.read())
os.remove(tmp_path)
[docs]
def close(self):
"""
Remove all produced temporary files.
"""
while self.open_files:
tmp_path, *_ = self.open_files.popleft()
try:
os.remove(tmp_path)
except FileNotFoundError:
pass
def __del__(self):
self.close()
# super().__del__() # object has no __del__
deferred_open = DeferredFileWriter().open