aboutsummaryrefslogtreecommitdiff
path: root/pw_watch/py/pw_watch/watch.py
blob: 59e384ac5125ab580da969e5582eaf7a7301fab9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
#!/usr/bin/env python
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Watch files for changes and rebuild.

pw watch runs Ninja in a build directory when source files change. It works with
any Ninja project (GN or CMake).

Usage examples:

  # Find a build directory and build the default target
  pw watch

  # Find a build directory and build the stm32f429i target
  pw watch python.lint stm32f429i

  # Build pw_run_tests.modules in the out/cmake directory
  pw watch -C out/cmake pw_run_tests.modules

  # Build the default target in out/ and pw_apps in out/cmake
  pw watch -C out -C out/cmake pw_apps

  # Find a directory and build python.tests, and build pw_apps in out/cmake
  pw watch python.tests -C out/cmake pw_apps
"""

import argparse
from dataclasses import dataclass
import errno
import logging
import os
from pathlib import Path
import shlex
import subprocess
import sys
import threading
from threading import Thread
from typing import (Iterable, List, NamedTuple, NoReturn, Optional, Sequence,
                    Tuple)

import httpwatcher  # type: ignore

from watchdog.events import FileSystemEventHandler  # type: ignore[import]
from watchdog.observers import Observer  # type: ignore[import]

import pw_cli.log
import pw_cli.branding
import pw_cli.color
import pw_cli.env
import pw_cli.plugins
import pw_console.python_logging

from pw_watch.watch_app import WatchApp
from pw_watch.debounce import DebouncedFunction, Debouncer

_COLOR = pw_cli.color.colors()
_LOG = logging.getLogger(__package__)
_ERRNO_INOTIFY_LIMIT_REACHED = 28

# Suppress events under 'fsevents', generated by watchdog on every file
# event on MacOS.
# TODO(b/182281481): Fix file ignoring, rather than just suppressing logs
_FSEVENTS_LOG = logging.getLogger('fsevents')
_FSEVENTS_LOG.setLevel(logging.WARNING)

_PASS_MESSAGE = """
  ██████╗  █████╗ ███████╗███████╗██╗
  ██╔══██╗██╔══██╗██╔════╝██╔════╝██║
  ██████╔╝███████║███████╗███████╗██║
  ██╔═══╝ ██╔══██║╚════██║╚════██║╚═╝
  ██║     ██║  ██║███████║███████║██╗
  ╚═╝     ╚═╝  ╚═╝╚══════╝╚══════╝╚═╝
"""

# Pick a visually-distinct font from "PASS" to ensure that readers can't
# possibly mistake the difference between the two states.
_FAIL_MESSAGE = """
   ▄██████▒░▄▄▄       ██▓  ░██▓
  ▓█▓     ░▒████▄    ▓██▒  ░▓██▒
  ▒████▒   ░▒█▀  ▀█▄  ▒██▒ ▒██░
  ░▓█▒    ░░██▄▄▄▄██ ░██░  ▒██░
  ░▒█░      ▓█   ▓██▒░██░░ ████████▒
   ▒█░      ▒▒   ▓▒█░░▓  ░  ▒░▓  ░
   ░▒        ▒   ▒▒ ░ ▒ ░░  ░ ▒  ░
   ░ ░       ░   ▒    ▒ ░   ░ ░
                 ░  ░ ░       ░  ░
"""


# TODO(keir): Figure out a better strategy for exiting. The problem with the
# watcher is that doing a "clean exit" is slow. However, by directly exiting,
# we remove the possibility of the wrapper script doing anything on exit.
def _die(*args) -> NoReturn:
    _LOG.critical(*args)
    sys.exit(1)


class WatchCharset(NamedTuple):
    slug_ok: str
    slug_fail: str


_ASCII_CHARSET = WatchCharset(_COLOR.green('OK  '), _COLOR.red('FAIL'))
_EMOJI_CHARSET = WatchCharset('✔️ ', '💥')


@dataclass(frozen=True)
class BuildCommand:
    build_dir: Path
    targets: Tuple[str, ...] = ()

    def args(self) -> Tuple[str, ...]:
        return (str(self.build_dir), *self.targets)

    def __str__(self) -> str:
        return ' '.join(shlex.quote(arg) for arg in self.args())


def git_ignored(file: Path) -> bool:
    """Returns true if this file is in a Git repo and ignored by that repo.

    Returns true for ignored files that were manually added to a repo.
    """
    file = file.resolve()
    directory = file.parent

    # Run the Git command from file's parent so that the correct repo is used.
    while True:
        try:
            returncode = subprocess.run(
                ['git', 'check-ignore', '--quiet', '--no-index', file],
                stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL,
                cwd=directory).returncode
            return returncode in (0, 128)
        except FileNotFoundError:
            # If the directory no longer exists, try parent directories until
            # an existing directory is found or all directories have been
            # checked. This approach makes it possible to check if a deleted
            # path is ignored in the repo it was originally created in.
            if directory == directory.parent:
                return False

            directory = directory.parent


class PigweedBuildWatcher(FileSystemEventHandler, DebouncedFunction):
    """Process filesystem events and launch builds if necessary."""

    # pylint: disable=R0902 # too many instance attributes
    def __init__(
        self,
        build_commands: Sequence[BuildCommand],
        patterns: Sequence[str] = (),
        ignore_patterns: Sequence[str] = (),
        charset: WatchCharset = _ASCII_CHARSET,
        restart: bool = True,
        jobs: int = None,
        watch_app: bool = False,
    ):
        super().__init__()

        self.current_stdout = ''
        self.patterns = patterns
        self.ignore_patterns = ignore_patterns
        self.build_commands = build_commands
        self.charset: WatchCharset = charset

        self.restart_on_changes = restart
        self.watch_app_enabled = watch_app
        self._current_build: subprocess.Popen

        self._extra_ninja_args = [] if jobs is None else [f'-j{jobs}']

        self.debouncer = Debouncer(self)

        # Track state of a build. These need to be members instead of locals
        # due to the split between dispatch(), run(), and on_complete().
        self.matching_path: Optional[Path] = None
        self.builds_succeeded: List[bool] = []

        if not self.watch_app_enabled:
            self.wait_for_keypress_thread = threading.Thread(
                None, self._wait_for_enter)
            self.wait_for_keypress_thread.start()

    def rebuild(self):
        """ Manual rebuild command triggered from watch app."""
        self._current_build.terminate()
        self._current_build.wait()
        self.debouncer.press('Manual build requested...')

    def _wait_for_enter(self) -> NoReturn:
        try:
            while True:
                _ = input()
                self._current_build.terminate()
                self._current_build.wait()

                self.debouncer.press('Manual build requested...')
        # Ctrl-C on Unix generates KeyboardInterrupt
        # Ctrl-Z on Windows generates EOFError
        except (KeyboardInterrupt, EOFError):
            _exit_due_to_interrupt()

    def _path_matches(self, path: Path) -> bool:
        """Returns true if path matches according to the watcher patterns"""
        return (not any(path.match(x) for x in self.ignore_patterns)
                and any(path.match(x) for x in self.patterns))

    def dispatch(self, event) -> None:
        # There isn't any point in triggering builds on new directory creation.
        # It's the creation or modification of files that indicate something
        # meaningful enough changed for a build.
        if event.is_directory:
            return

        # Collect paths of interest from the event.
        paths: List[str] = []
        if hasattr(event, 'dest_path'):
            paths.append(os.fsdecode(event.dest_path))
        if event.src_path:
            paths.append(os.fsdecode(event.src_path))
        for raw_path in paths:
            _LOG.debug('File event: %s', raw_path)

        # Check whether Git cares about any of these paths.
        for path in (Path(p).resolve() for p in paths):
            if not git_ignored(path) and self._path_matches(path):
                self._handle_matched_event(path)
                return

    def _handle_matched_event(self, matching_path: Path) -> None:
        if self.matching_path is None:
            self.matching_path = matching_path

        self.debouncer.press(
            f'File change detected: {os.path.relpath(matching_path)}')

    # Implementation of DebouncedFunction.run()
    #
    # Note: This will run on the timer thread created by the Debouncer, rather
    # than on the main thread that's watching file events. This enables the
    # watcher to continue receiving file change events during a build.
    def run(self) -> None:
        """Run all the builds in serial and capture pass/fail for each."""

        # Clear the screen and show a banner indicating the build is starting.
        if not self.watch_app_enabled:
            print('\033c', end='')  # TODO(pwbug/38): Not Windows compatible.
        # TODO: fix the banner function to add the color on each line
        for line in pw_cli.branding.banner().splitlines():
            _LOG.info(line)
        _LOG.info(
            _COLOR.green(
                '  Watching for changes. Ctrl-C to exit; enter to rebuild'))
        _LOG.info('')
        _LOG.info('Change detected: %s', self.matching_path)
        if not self.watch_app_enabled:
            print('\033c', end='')  # TODO(pwbug/38): Not Windows compatible.
            sys.stdout.flush()

        self.builds_succeeded = []
        num_builds = len(self.build_commands)
        _LOG.info('Starting build with %d directories', num_builds)

        env = os.environ.copy()
        # Force colors in Pigweed subcommands run through the watcher.
        env['PW_USE_COLOR'] = '1'

        for i, cmd in enumerate(self.build_commands, 1):
            index = f'[{i}/{num_builds}]'
            self.builds_succeeded.append(self._run_build(index, cmd, env))

            if self.builds_succeeded[-1]:
                level = logging.INFO
                tag = '(OK)'
            else:
                level = logging.ERROR
                tag = '(FAIL)'

            _LOG.log(level, '%s Finished build: %s %s', index, cmd, tag)

    def _run_build(self, index: str, cmd: BuildCommand, env: dict) -> bool:
        # Make sure there is a build.ninja file for Ninja to use.
        build_ninja = cmd.build_dir / 'build.ninja'
        if not build_ninja.exists():
            # If this is a CMake directory, prompt the user to re-run CMake.
            if cmd.build_dir.joinpath('CMakeCache.txt').exists():
                _LOG.error('%s %s does not exist; re-run CMake to generate it',
                           index, build_ninja)
                return False

            _LOG.warning('%s %s does not exist; running gn gen %s', index,
                         build_ninja, cmd.build_dir)
            if not self._execute_command(['gn', 'gen', cmd.build_dir], env):
                return False

        command = ['ninja', *self._extra_ninja_args, '-C', *cmd.args()]
        _LOG.info('%s Starting build: %s', index,
                  ' '.join(shlex.quote(arg) for arg in command))

        return self._execute_command(command, env)

    def _execute_command(self, command: list, env: dict) -> bool:
        """Runs a command with a blank before/after for visual separation."""
        if self.watch_app_enabled:
            return self._execute_command_watch_app(command, env)
        print()
        self._current_build = subprocess.Popen(command, env=env)
        returncode = self._current_build.wait()
        print()
        return returncode == 0

    def _execute_command_watch_app(self, command: list, env: dict) -> bool:
        """Runs a command with and outputs the logs."""
        self.current_stdout = ''
        returncode = None
        with subprocess.Popen(command,
                              env=env,
                              stdout=subprocess.PIPE,
                              stderr=subprocess.STDOUT,
                              errors='replace') as proc:
            self._current_build = proc
            self.current_stdout += 'START\n'
            while returncode is None:
                if proc.stdout:
                    output = proc.stdout.readline()
                    if output:
                        self.current_stdout += output

                returncode = proc.poll()
            self.current_stdout += 'CHECK DONE\n'
        return returncode == 0

    # Implementation of DebouncedFunction.cancel()
    def cancel(self) -> bool:
        if self.restart_on_changes:
            self._current_build.terminate()
            self._current_build.wait()
            return True

        return False

    # Implementation of DebouncedFunction.run()
    def on_complete(self, cancelled: bool = False) -> None:
        # First, use the standard logging facilities to report build status.
        if cancelled:
            _LOG.error('Finished; build was interrupted')
        elif all(self.builds_succeeded):
            _LOG.info('Finished; all successful')
        else:
            _LOG.info('Finished; some builds failed')

        # Then, show a more distinct colored banner.
        if not cancelled:
            # Write out build summary table so you can tell which builds passed
            # and which builds failed.
            _LOG.info('')
            _LOG.info(' .------------------------------------')
            _LOG.info(' |')
            for (succeeded, cmd) in zip(self.builds_succeeded,
                                        self.build_commands):
                slug = (self.charset.slug_ok
                        if succeeded else self.charset.slug_fail)
                _LOG.info(' |   %s  %s', slug, cmd)
            _LOG.info(' |')
            _LOG.info(" '------------------------------------")
        else:
            # Build was interrupted.
            _LOG.info('')
            _LOG.info(' .------------------------------------')
            _LOG.info(' |')
            _LOG.info(' |  %s- interrupted', self.charset.slug_fail)
            _LOG.info(' |')
            _LOG.info(" '------------------------------------")

        # Show a large color banner so it is obvious what the overall result is.
        if all(self.builds_succeeded) and not cancelled:
            for line in _PASS_MESSAGE.splitlines():
                _LOG.info(_COLOR.green(line))
        else:
            for line in _FAIL_MESSAGE.splitlines():
                _LOG.info(_COLOR.red(line))

        self.matching_path = None

    # Implementation of DebouncedFunction.on_keyboard_interrupt()
    def on_keyboard_interrupt(self) -> NoReturn:
        _exit_due_to_interrupt()


_WATCH_PATTERN_DELIMITER = ','
_WATCH_PATTERNS = (
    '*.bloaty',
    '*.c',
    '*.cc',
    '*.css',
    '*.cpp',
    '*.cmake',
    'CMakeLists.txt',
    '*.gn',
    '*.gni',
    '*.go',
    '*.h',
    '*.hpp',
    '*.ld',
    '*.md',
    '*.options',
    '*.proto',
    '*.py',
    '*.rst',
    '*.s',
    '*.S',
)


def add_parser_arguments(parser: argparse.ArgumentParser) -> None:
    """Sets up an argument parser for pw watch."""
    parser.add_argument('--patterns',
                        help=(_WATCH_PATTERN_DELIMITER +
                              '-delimited list of globs to '
                              'watch to trigger recompile'),
                        default=_WATCH_PATTERN_DELIMITER.join(_WATCH_PATTERNS))
    parser.add_argument('--ignore_patterns',
                        dest='ignore_patterns_string',
                        help=(_WATCH_PATTERN_DELIMITER +
                              '-delimited list of globs to '
                              'ignore events from'))

    parser.add_argument('--exclude_list',
                        nargs='+',
                        type=Path,
                        help='directories to ignore during pw watch',
                        default=[])
    parser.add_argument('--no-restart',
                        dest='restart',
                        action='store_false',
                        help='do not restart ongoing builds if files change')
    parser.add_argument(
        'default_build_targets',
        nargs='*',
        metavar='target',
        default=[],
        help=('Automatically locate a build directory and build these '
              'targets. For example, `host docs` searches for a Ninja '
              'build directory at out/ and builds the `host` and `docs` '
              'targets. To specify one or more directories, ust the '
              '-C / --build_directory option.'))
    parser.add_argument(
        '-C',
        '--build_directory',
        dest='build_directories',
        nargs='+',
        action='append',
        default=[],
        metavar=('directory', 'target'),
        help=('Specify a build directory and optionally targets to '
              'build. `pw watch -C out tgt` is equivalent to `ninja '
              '-C out tgt`'))
    parser.add_argument(
        '--serve-docs',
        dest='serve_docs',
        action='store_true',
        default=False,
        help='Start a webserver for docs on localhost. The port for this '
        ' webserver can be set with the --serve-docs-port option. '
        ' Defaults to http://127.0.0.1:8000')
    parser.add_argument(
        '--serve-docs-port',
        dest='serve_docs_port',
        type=int,
        default=8000,
        help='Set the port for the docs webserver. Default to 8000.')

    parser.add_argument(
        '--serve-docs-path',
        dest='serve_docs_path',
        type=Path,
        default="docs/gen/docs",
        help='Set the path for the docs to serve. Default to docs/gen/docs'
        ' in the build directory.')
    parser.add_argument(
        '-j',
        '--jobs',
        type=int,
        help="Number of cores to use; defaults to Ninja's default")
    parser.add_argument('--watch-app',
                        dest='watch_app',
                        action='store_true',
                        default=False,
                        help='Start the watch app console.')


def _exit(code: int) -> NoReturn:
    # Note: The "proper" way to exit is via observer.stop(), then
    # running a join. However it's slower, so just exit immediately.
    #
    # Additionally, since there are several threads in the watcher, the usual
    # sys.exit approach doesn't work. Instead, run the low level exit which
    # kills all threads.
    os._exit(code)  # pylint: disable=protected-access


def _exit_due_to_interrupt() -> NoReturn:
    # To keep the log lines aligned with each other in the presence of
    # a '^C' from the keyboard interrupt, add a newline before the log.
    _LOG.info('Got Ctrl-C; exiting...')
    _exit(0)


def _exit_due_to_inotify_watch_limit():
    # Show information and suggested commands in OSError: inotify limit reached.
    _LOG.error('Inotify watch limit reached: run this in your terminal if '
               'you are in Linux to temporarily increase inotify limit.  \n')
    _LOG.info(
        _COLOR.green('        sudo sysctl fs.inotify.max_user_watches='
                     '$NEW_LIMIT$\n'))
    _LOG.info('  Change $NEW_LIMIT$ with an integer number, '
              'e.g., 20000 should be enough.')
    _exit(0)


def _exit_due_to_inotify_instance_limit():
    # Show information and suggested commands in OSError: inotify limit reached.
    _LOG.error('Inotify instance limit reached: run this in your terminal if '
               'you are in Linux to temporarily increase inotify limit.  \n')
    _LOG.info(
        _COLOR.green('        sudo sysctl fs.inotify.max_user_instances='
                     '$NEW_LIMIT$\n'))
    _LOG.info('  Change $NEW_LIMIT$ with an integer number, '
              'e.g., 20000 should be enough.')
    _exit(0)


def _exit_due_to_pigweed_not_installed():
    # Show information and suggested commands when pigweed environment variable
    # not found.
    _LOG.error('Environment variable $PW_ROOT not defined or is defined '
               'outside the current directory.')
    _LOG.error('Did you forget to activate the Pigweed environment? '
               'Try source ./activate.sh')
    _LOG.error('Did you forget to install the Pigweed environment? '
               'Try source ./bootstrap.sh')
    _exit(1)


# Go over each directory inside of the current directory.
# If it is not on the path of elements in directories_to_exclude, add
# (directory, True) to subdirectories_to_watch and later recursively call
# Observer() on them.
# Otherwise add (directory, False) to subdirectories_to_watch and later call
# Observer() with recursion=False.
def minimal_watch_directories(to_watch: Path, to_exclude: Iterable[Path]):
    """Determine which subdirectory to watch recursively"""
    try:
        to_watch = Path(to_watch)
    except TypeError:
        assert False, "Please watch one directory at a time."

    # Reformat to_exclude.
    directories_to_exclude: List[Path] = [
        to_watch.joinpath(directory_to_exclude)
        for directory_to_exclude in to_exclude
        if to_watch.joinpath(directory_to_exclude).is_dir()
    ]

    # Split the relative path of directories_to_exclude (compared to to_watch),
    # and generate all parent paths needed to be watched without recursion.
    exclude_dir_parents = {to_watch}
    for directory_to_exclude in directories_to_exclude:
        parts = list(
            Path(directory_to_exclude).relative_to(to_watch).parts)[:-1]
        dir_tmp = to_watch
        for part in parts:
            dir_tmp = Path(dir_tmp, part)
            exclude_dir_parents.add(dir_tmp)

    # Go over all layers of directory. Append those that are the parents of
    # directories_to_exclude to the list with recursion==False, and others
    # with recursion==True.
    for directory in exclude_dir_parents:
        dir_path = Path(directory)
        yield dir_path, False
        for item in Path(directory).iterdir():
            if (item.is_dir() and item not in exclude_dir_parents
                    and item not in directories_to_exclude):
                yield item, True


def get_common_excludes() -> List[Path]:
    """Find commonly excluded directories, and return them as a [Path]"""
    exclude_list: List[Path] = []

    typical_ignored_directories: List[str] = [
        '.environment',  # Legacy bootstrap-created CIPD and Python venv.
        '.presubmit',  # Presubmit-created CIPD and Python venv.
        '.git',  # Pigweed's git repo.
        '.mypy_cache',  # Python static analyzer.
        '.cargo',  # Rust package manager.
        'environment',  # Bootstrap-created CIPD and Python venv.
        'out',  # Typical build directory.
    ]

    # Preset exclude list for Pigweed's upstream directories.
    pw_root_dir = Path(os.environ['PW_ROOT'])
    exclude_list.extend(pw_root_dir / ignored_directory
                        for ignored_directory in typical_ignored_directories)

    # Preset exclude for common downstream project structures.
    #
    # If watch is invoked outside of the Pigweed root, exclude common
    # directories.
    pw_project_root_dir = Path(os.environ['PW_PROJECT_ROOT'])
    if pw_project_root_dir != pw_root_dir:
        exclude_list.extend(
            pw_project_root_dir / ignored_directory
            for ignored_directory in typical_ignored_directories)

    # Check for and warn about legacy directories.
    legacy_directories = [
        '.cipd',  # Legacy CIPD location.
        '.python3-venv',  # Legacy Python venv location.
    ]
    found_legacy = False
    for legacy_directory in legacy_directories:
        full_legacy_directory = pw_root_dir / legacy_directory
        if full_legacy_directory.is_dir():
            _LOG.warning('Legacy environment directory found: %s',
                         str(full_legacy_directory))
            exclude_list.append(full_legacy_directory)
            found_legacy = True
    if found_legacy:
        _LOG.warning('Found legacy environment directory(s); these '
                     'should be deleted')

    return exclude_list


# pylint: disable=R0913, R0914 # too many arguments and local variables
def watch_setup(default_build_targets: List[str], build_directories: List[str],
                patterns: str, ignore_patterns_string: str,
                exclude_list: List[Path], restart: bool, jobs: Optional[int],
                serve_docs: bool, serve_docs_port: int, serve_docs_path: Path,
                watch_app: bool):
    """Watches files and runs Ninja commands when they change."""
    _LOG.info('Starting Pigweed build watcher')

    # Get pigweed directory information from environment variable PW_ROOT.
    if os.environ['PW_ROOT'] is None:
        _exit_due_to_pigweed_not_installed()
    pw_root = Path(os.environ['PW_ROOT']).resolve()
    if Path.cwd().resolve() not in [pw_root, *pw_root.parents]:
        _exit_due_to_pigweed_not_installed()

    # Preset exclude list for pigweed directory.
    exclude_list += get_common_excludes()

    build_commands = [
        BuildCommand(Path(build_dir[0]), tuple(build_dir[1:]))
        for build_dir in build_directories
    ]

    # If no build directory was specified, check for out/build.ninja.
    if default_build_targets or not build_directories:
        # Make sure we found something; if not, bail.
        if not Path('out').exists():
            _die("No build dirs found. Did you forget to run 'gn gen out'?")

        build_commands.append(
            BuildCommand(Path('out'), tuple(default_build_targets)))

    # Verify that the build output directories exist.
    for i, build_target in enumerate(build_commands, 1):
        if not build_target.build_dir.is_dir():
            _die("Build directory doesn't exist: %s", build_target)
        else:
            _LOG.info('Will build [%d/%d]: %s', i, len(build_commands),
                      build_target)

    _LOG.debug('Patterns: %s', patterns)

    if serve_docs:

        def _serve_docs():
            # Disable logs from httpwatcher and deps
            logging.getLogger('httpwatcher').setLevel(logging.CRITICAL)
            logging.getLogger('tornado').setLevel(logging.CRITICAL)

            docs_path = build_commands[0].build_dir.joinpath(
                serve_docs_path.joinpath('html'))
            httpwatcher.watch(docs_path,
                              host="127.0.0.1",
                              port=serve_docs_port)

        # Spin up an httpwatcher in a new thread since it blocks
        threading.Thread(None, _serve_docs, "httpwatcher").start()

    # Try to make a short display path for the watched directory that has
    # "$HOME" instead of the full home directory. This is nice for users
    # who have deeply nested home directories.
    path_to_log = str(Path().resolve()).replace(str(Path.home()), '$HOME')

    # Ignore the user-specified patterns.
    ignore_patterns = (ignore_patterns_string.split(_WATCH_PATTERN_DELIMITER)
                       if ignore_patterns_string else [])

    env = pw_cli.env.pigweed_environment()
    if env.PW_EMOJI:
        charset = _EMOJI_CHARSET
    else:
        charset = _ASCII_CHARSET

    event_handler = PigweedBuildWatcher(
        build_commands=build_commands,
        patterns=patterns.split(_WATCH_PATTERN_DELIMITER),
        ignore_patterns=ignore_patterns,
        charset=charset,
        restart=restart,
        jobs=jobs,
        watch_app=watch_app,
    )
    return path_to_log, event_handler, exclude_list


# pylint: disable=R0914 # too many local variables
def watch(path_to_log: Path, event_handler: PigweedBuildWatcher,
          exclude_list: List[Path]):
    """Watches files and runs Ninja commands when they change."""
    try:
        # It can take awhile to configure the filesystem watcher, so have the
        # message reflect that with the "...". Run inside the try: to
        # gracefully handle the user Ctrl-C'ing out during startup.

        _LOG.info('Attaching filesystem watcher to %s/...', path_to_log)

        # Observe changes for all files in the root directory. Whether the
        # directory should be observed recursively or not is determined by the
        # second element in subdirectories_to_watch.
        observers = []
        for path, rec in minimal_watch_directories(Path.cwd(), exclude_list):
            observer = Observer()
            observer.schedule(
                event_handler,
                str(path),
                recursive=rec,
            )
            observer.start()
            observers.append(observer)

        event_handler.debouncer.press('Triggering initial build...')
        for observer in observers:
            while observer.is_alive():
                observer.join(1)

    # Ctrl-C on Unix generates KeyboardInterrupt
    # Ctrl-Z on Windows generates EOFError
    except (KeyboardInterrupt, EOFError):
        _exit_due_to_interrupt()
    except OSError as err:
        if err.args[0] == _ERRNO_INOTIFY_LIMIT_REACHED:
            _exit_due_to_inotify_watch_limit()
        if err.errno == errno.EMFILE:
            _exit_due_to_inotify_instance_limit()
        raise err

    _LOG.critical('Should never get here')
    observer.join()


def main() -> None:
    """Watch files for changes and rebuild."""
    parser = argparse.ArgumentParser(
        description=__doc__,
        formatter_class=argparse.RawDescriptionHelpFormatter)
    add_parser_arguments(parser)
    args = parser.parse_args()
    path_to_log, event_handler, exclude_list = watch_setup(**vars(args))

    if args.watch_app:
        watch_logfile = pw_console.python_logging.create_temp_log_file()
        pw_cli.log.install(
            level=logging.DEBUG,
            use_color=True,
            hide_timestamp=False,
            log_file=watch_logfile,
        )
        pw_console.python_logging.setup_python_logging(
            last_resort_filename=watch_logfile)

        watch_thread = Thread(target=watch,
                              args=(path_to_log, event_handler, exclude_list),
                              daemon=True)
        watch_thread.start()
        watch_app = WatchApp(startup_args=vars(args),
                             event_handler=event_handler,
                             loggers=[_LOG])
        watch_app.run()
    else:
        watch(path_to_log, event_handler, exclude_list)


if __name__ == '__main__':
    main()