aboutsummaryrefslogtreecommitdiff
path: root/llvm_tools/atomic_write_file.py
blob: aa6f112e39735f9ebc61608dc8ab0b6cb5a5555c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# Copyright 2023 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Atomic file writing utilities.

Provides atomic_write(...), which allows atomically replacing the contents
of a file.
"""

import contextlib
import logging
import os
from pathlib import Path
import tempfile
from typing import Union


@contextlib.contextmanager
def atomic_write(fp: Union[Path, str], mode="w", *args, **kwargs):
    """Write to a filepath atomically.

    This works by a temp file swap, created with a .tmp suffix in
    the same directory briefly until being renamed to the desired
    filepath.

    In the event an exception is raised during the write, the
    temporary file is deleted and the original filepath is untouched.

    Examples:
        >>> with atomic_write("my_file.txt", encoding="utf-8") as f:
        >>>     f.write("Hello world!")
        >>>     # my_file.txt is still unmodified
        >>> # "f" is closed here, and my_file.txt is written to.

    Args:
      fp: Filepath to open.
      mode: File mode; can be 'w', 'wb'. Default 'w'.
      *args: Passed to Path.open as nargs.
      **kwargs: Passed to Path.open as kwargs.

    Raises:
      ValueError when the mode is invalid.
    """
    if isinstance(fp, str):
        fp = Path(fp)
    if mode not in ("w", "wb"):
        raise ValueError(f"mode {mode} not accepted")

    # We use mkstemp here because we want to handle the closing and
    # replacement ourselves.
    (fd, tmp_path) = tempfile.mkstemp(
        prefix=fp.name,
        suffix=".tmp",
        dir=fp.parent,
    )
    tmp_path = Path(tmp_path)
    try:
        with os.fdopen(fd, mode=mode, *args, **kwargs) as f:
            yield f
    except:
        try:
            tmp_path.unlink()
        except Exception as e:
            logging.exception("unexpected error removing temporary file %s", e)
        raise
    tmp_path.replace(fp)