diff options
author | Wyatt Hepler <hepler@google.com> | 2021-04-08 10:56:31 -0700 |
---|---|---|
committer | CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2021-04-14 21:19:25 +0000 |
commit | bea166e06a4b19ad09636bfb3f114f0e05841573 (patch) | |
tree | 9bbbb9d5197e6c6373dbce418586d4fa923560af | |
parent | 3e28092cd59477cf09727d6e6357cc993d565f9b (diff) | |
download | pigweed-bea166e06a4b19ad09636bfb3f114f0e05841573.tar.gz |
pw_cli: Generalize the plugin system
- Move the global plugin variables into a class. This makes it possible
to use multiple plugin registries, potentially for different purposes.
- Add tests for the pw.plugins.Registry class.
- Expose the Plugin class. This allows using plugins for ways
other than as command line utilities.
- Always include __init__ functions in Sphinx autodoc.
Change-Id: Id8aa40234f81c1da46ec51e5590b16c561934be3
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/40761
Commit-Queue: Wyatt Hepler <hepler@google.com>
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
Reviewed-by: Rob Mohr <mohrr@google.com>
-rw-r--r-- | docs/conf.py | 11 | ||||
-rw-r--r-- | pw_cli/docs.rst | 35 | ||||
-rw-r--r-- | pw_cli/py/BUILD.gn | 6 | ||||
-rw-r--r-- | pw_cli/py/plugins_test.py | 189 | ||||
-rw-r--r-- | pw_cli/py/pw_cli/__main__.py | 10 | ||||
-rw-r--r-- | pw_cli/py/pw_cli/arguments.py | 4 | ||||
-rw-r--r-- | pw_cli/py/pw_cli/plugins.py | 485 | ||||
-rw-r--r-- | pw_cli/py/pw_cli/pw_command_plugins.py | 71 | ||||
-rwxr-xr-x | pw_doctor/py/pw_doctor/doctor.py | 4 |
9 files changed, 606 insertions, 209 deletions
diff --git a/docs/conf.py b/docs/conf.py index 1885a994a..7eaf37486 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -114,3 +114,14 @@ texinfo_documents = [ # Markdown files imported using m2r aren't marked as "referenced," so exclude # them from the error reference checking. exclude_patterns = ['README.md'] + + +def do_not_skip_init(app, what, name, obj, would_skip, options): + if name == "__init__": + return False # never skip __init__ functions + + return would_skip + + +def setup(app): + app.connect("autodoc-skip-member", do_not_skip_init) diff --git a/pw_cli/docs.rst b/pw_cli/docs.rst index 40489a285..35378e08c 100644 --- a/pw_cli/docs.rst +++ b/pw_cli/docs.rst @@ -271,7 +271,40 @@ pw_cli Python package The ``pw_cli`` Pigweed module includes the ``pw_cli`` Python package, which provides utilities for creating command line tools with Pigweed. -pw_cli log +pw_cli.log ---------- .. automodule:: pw_cli.log :members: + +pw_cli.plugins +-------------- +:py:mod:`pw_cli.plugins` provides general purpose plugin functionality. The +module can be used to create plugins for command line tools, interactive +consoles, or anything else. Pigweed's ``pw`` command uses this module for its +plugins. + +To use plugins, create a :py:class:`pw_cli.plugins.Registry`. The registry may +have an optional validator function that checks plugins before they are +registered (see :py:meth:`pw_cli.plugins.Registry.__init__`). + +Plugins may be registered in a few different ways. + + * Register with a direct function call. See + :py:meth:`pw_cli.plugins.Registry.register` and + :py:meth:`pw_cli.plugins.Registry.register_by_name`. + * Register from plugins files. See + :py:meth:`pw_cli.plugins.Registry.register_file` and + :py:meth:`pw_cli.plugins.Registry.register_directory`. Plugins files use a + simple format: + + .. code-block:: + + # Comments start with "#". Blank lines are ignored. + name_of_the_plugin module.name module_member + + another_plugin some_module some_function + +Module reference +^^^^^^^^^^^^^^^^ +.. automodule:: pw_cli.plugins + :members: diff --git a/pw_cli/py/BUILD.gn b/pw_cli/py/BUILD.gn index 2f04bde33..b7ed63c58 100644 --- a/pw_cli/py/BUILD.gn +++ b/pw_cli/py/BUILD.gn @@ -29,8 +29,12 @@ pw_python_package("py") { "pw_cli/log.py", "pw_cli/plugins.py", "pw_cli/process.py", + "pw_cli/pw_command_plugins.py", "pw_cli/requires.py", ] - tests = [ "envparse_test.py" ] + tests = [ + "envparse_test.py", + "plugins_test.py", + ] pylintrc = "$dir_pigweed/.pylintrc" } diff --git a/pw_cli/py/plugins_test.py b/pw_cli/py/plugins_test.py new file mode 100644 index 000000000..908bc4b2e --- /dev/null +++ b/pw_cli/py/plugins_test.py @@ -0,0 +1,189 @@ +# Copyright 2021 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Tests for pw_cli.plugins.""" + +from pathlib import Path +import sys +import tempfile +import types +from typing import Dict, Iterator +import unittest + +from pw_cli import plugins + + +def _no_docstring() -> int: + return 123 + + +def _with_docstring() -> int: + """This docstring is brought to you courtesy of Pigweed.""" + return 456 + + +def _create_files(directory: str, files: Dict[str, str]) -> Iterator[Path]: + for relative_path, contents in files.items(): + path = Path(directory) / relative_path + path.parent.mkdir(exist_ok=True, parents=True) + path.write_text(contents) + yield path + + +class TestPlugin(unittest.TestCase): + """Tests for plugins.Plugins.""" + def test_target_name_attribute(self) -> None: + self.assertEqual( + plugins.Plugin('abc', _no_docstring).target_name, + f'{__name__}._no_docstring') + + def test_target_name_no_name_attribute(self) -> None: + has_no_name = 'no __name__' + self.assertFalse(hasattr(has_no_name, '__name__')) + + self.assertEqual( + plugins.Plugin('abc', has_no_name).target_name, + '<unknown>.no __name__') + + +_TEST_PLUGINS = { + 'TEST_PLUGINS': (f'test_plugin {__name__} _with_docstring\n' + f'other_plugin {__name__} _no_docstring\n'), + 'nested/in/dirs/TEST_PLUGINS': + f'test_plugin {__name__} _no_docstring\n', +} + + +class TestPluginRegistry(unittest.TestCase): + """Tests for plugins.Registry.""" + def setUp(self) -> None: + self._registry = plugins.Registry( + validator=plugins.callable_with_no_args) + + def test_register(self) -> None: + self.assertIsNotNone(self._registry.register('a_plugin', + _no_docstring)) + self.assertIs(self._registry['a_plugin'].target, _no_docstring) + + def test_register_by_name(self) -> None: + self.assertIsNotNone( + self._registry.register_by_name('plugin_one', __name__, + '_no_docstring')) + self.assertIsNotNone( + self._registry.register('plugin_two', _no_docstring)) + + self.assertIs(self._registry['plugin_one'].target, _no_docstring) + self.assertIs(self._registry['plugin_two'].target, _no_docstring) + + def test_register_by_name_undefined_module(self) -> None: + with self.assertRaisesRegex(plugins.Error, 'No module named'): + self._registry.register_by_name('plugin_two', 'not a module', + 'something') + + def test_register_by_name_undefined_function(self) -> None: + with self.assertRaisesRegex(plugins.Error, 'does not exist'): + self._registry.register_by_name('plugin_two', __name__, 'nothing') + + def test_register_fails_validation(self) -> None: + with self.assertRaisesRegex(plugins.Error, 'must be callable'): + self._registry.register('plugin_two', 'not function') + + def test_run_with_argv_sets_sys_argv(self) -> None: + def stash_argv() -> int: + self.assertEqual(['pw go', '1', '2'], sys.argv) + return 1 + + self.assertIsNotNone(self._registry.register('go', stash_argv)) + + original_argv = sys.argv + self.assertEqual(self._registry.run_with_argv('go', ['1', '2']), 1) + self.assertIs(sys.argv, original_argv) + + def test_run_with_argv_registered_plugin(self) -> None: + with self.assertRaises(KeyError): + self._registry.run_with_argv('plugin_one', []) + + def test_register_cannot_overwrite(self) -> None: + self.assertIsNotNone(self._registry.register('foo', lambda: None)) + self.assertIsNotNone( + self._registry.register_by_name('bar', __name__, '_no_docstring')) + + with self.assertRaises(plugins.Error): + self._registry.register('foo', lambda: None) + + with self.assertRaises(plugins.Error): + self._registry.register('bar', lambda: None) + + def test_register_directory_innermost_takes_priority(self) -> None: + with tempfile.TemporaryDirectory() as tempdir: + paths = list(_create_files(tempdir, _TEST_PLUGINS)) + self._registry.register_directory(paths[1].parent, 'TEST_PLUGINS') + + self.assertEqual(self._registry.run_with_argv('test_plugin', []), 123) + + def test_register_directory_only_searches_up(self) -> None: + with tempfile.TemporaryDirectory() as tempdir: + paths = list(_create_files(tempdir, _TEST_PLUGINS)) + self._registry.register_directory(paths[0].parent, 'TEST_PLUGINS') + + self.assertEqual(self._registry.run_with_argv('test_plugin', []), 456) + + def test_register_directory_with_restriction(self) -> None: + with tempfile.TemporaryDirectory() as tempdir: + paths = list(_create_files(tempdir, _TEST_PLUGINS)) + self._registry.register_directory(paths[0].parent, 'TEST_PLUGINS', + Path(tempdir, 'nested', 'in')) + + self.assertNotIn('other_plugin', self._registry) + + def test_register_same_file_multiple_times_no_error(self) -> None: + with tempfile.TemporaryDirectory() as tempdir: + paths = list(_create_files(tempdir, _TEST_PLUGINS)) + self._registry.register_file(paths[0]) + self._registry.register_file(paths[0]) + self._registry.register_file(paths[0]) + + self.assertIs(self._registry['test_plugin'].target, _with_docstring) + + def test_help_uses_function_or_module_docstring(self) -> None: + self.assertIsNotNone(self._registry.register('a', _no_docstring)) + self.assertIsNotNone(self._registry.register('b', _with_docstring)) + + self.assertIn(__doc__, '\n'.join(self._registry.detailed_help(['a']))) + + self.assertNotIn(__doc__, + '\n'.join(self._registry.detailed_help(['b']))) + self.assertIn(_with_docstring.__doc__, + '\n'.join(self._registry.detailed_help(['b']))) + + def test_empty_string_if_no_help(self) -> None: + fake_module_name = f'{__name__}.fake_module_for_test{id(self)}' + fake_module = types.ModuleType(fake_module_name) + self.assertIsNone(fake_module.__doc__) + + sys.modules[fake_module_name] = fake_module + + try: + + function = lambda: None + function.__module__ = fake_module_name + self.assertIsNotNone(self._registry.register('a', function)) + + self.assertEqual(self._registry['a'].help(full=False), '') + self.assertEqual(self._registry['a'].help(full=True), '') + finally: + del sys.modules[fake_module_name] + + +if __name__ == '__main__': + unittest.main() diff --git a/pw_cli/py/pw_cli/__main__.py b/pw_cli/py/pw_cli/__main__.py index 799cbc450..ad6ceeb58 100644 --- a/pw_cli/py/pw_cli/__main__.py +++ b/pw_cli/py/pw_cli/__main__.py @@ -19,7 +19,7 @@ import sys from typing import NoReturn import pw_cli.log -from pw_cli import arguments, plugins +from pw_cli import arguments, plugins, pw_command_plugins _LOG = logging.getLogger(__name__) @@ -38,15 +38,15 @@ def main() -> NoReturn: _LOG.debug('Executing the pw command from %s', args.directory) os.chdir(args.directory) - plugins.register(args.directory) + pw_command_plugins.register(args.directory) if args.help or args.command is None: - print(arguments.format_help(), file=sys.stderr) + print(pw_command_plugins.format_help(), file=sys.stderr) sys.exit(0) try: - sys.exit(plugins.run(args.command, args.plugin_args)) - except plugins.Error as err: + sys.exit(pw_command_plugins.run(args.command, args.plugin_args)) + except (plugins.Error, KeyError) as err: _LOG.critical('Cannot run command %s.', args.command) _LOG.critical('%s', err) sys.exit(2) diff --git a/pw_cli/py/pw_cli/arguments.py b/pw_cli/py/pw_cli/arguments.py index 7a0f1b2b0..f9e2358cf 100644 --- a/pw_cli/py/pw_cli/arguments.py +++ b/pw_cli/py/pw_cli/arguments.py @@ -39,9 +39,9 @@ def print_banner() -> None: print(banner(), file=sys.stderr) -def format_help() -> str: +def format_help(registry: plugins.Registry) -> str: """Returns the pw help information as a string.""" - return f'{_parser().format_help()}\n{plugins.command_help()}' + return f'{_parser().format_help()}\n{registry.short_help()}' class _ArgumentParserWithBanner(argparse.ArgumentParser): diff --git a/pw_cli/py/pw_cli/plugins.py b/pw_cli/py/pw_cli/plugins.py index 09ea5d5a7..540b09ca3 100644 --- a/pw_cli/py/pw_cli/plugins.py +++ b/pw_cli/py/pw_cli/plugins.py @@ -1,4 +1,4 @@ -# Copyright 2020 The Pigweed Authors +# Copyright 2021 The Pigweed Authors # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of @@ -11,28 +11,40 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. -"""Registry for plugins.""" +"""Provides general purpose plugin functionality. + +As used in this module, a plugin is a Python object associated with a name. +Plugins are registered in a Registry. The plugin object is typically a function, +but can be anything. + +Plugins may be loaded in a variety of ways: + +- Listed in a plugins file in the file system (e.g. as "name module target"). +- Registered in a Python file using a decorator (@my_registry.plugin). +- Registered directly or by name with function calls on a registry object. + +This functionality can be used to create plugins for command line tools, +interactive consoles, or anything else. Pigweed's pw command uses this module +for its plugins. +""" -import argparse import collections +import collections.abc import importlib import inspect import logging -import os from pathlib import Path import sys from textwrap import TextWrapper -from typing import Callable, Dict, List, Iterable, Optional, Union - -from pw_cli import arguments +import types +from typing import Any, Callable, Dict, List, Iterable, Iterator, Optional, Set _LOG = logging.getLogger(__name__) - -REGISTRY_FILE = 'PW_PLUGINS' +_BUILT_IN = '<built-in>' class Error(Exception): - """Failed to register a Pigweed plugin.""" + """Indicates that a plugin is invalid or cannot be registered.""" def __str__(self): """Displays the error as a string, including the __cause__ if present. @@ -45,145 +57,312 @@ class Error(Exception): f'({type(self.__cause__).__name__}: {self.__cause__})') -class _Plugin: - """A plugin for the pw command.""" - def __init__(self, name: str, module: str, function: str, - source: Union[Path, str]): - self.name = name - self.source = source +def _get_module(member: object) -> types.ModuleType: + """Gets the module or a dummy module if the module isn't found.""" + module = inspect.getmodule(member) + return module if module else types.ModuleType('<unknown>') + + +class Plugin: + """Represents a Python entity registered as a plugin. - # Attempt to access the module and function. Catch any errors that might - # occur, since a bad plugin shouldn't break the rest of the pw command. + Each plugin resolves to a Python object, typically a function. + """ + @classmethod + def from_name(cls, name: str, module_name: str, member_name: str, + source: Optional[Path]) -> 'Plugin': + """Creates a plugin by module and attribute name. + + Args: + name: the name of the plugin + module_name: Python module name (e.g. 'foo_pkg.bar') + member_name: the name of the member in the module + source: path to the plugins file that declared this plugin, if any + """ + + # Attempt to access the module and member. Catch any errors that might + # occur, since a bad plugin shouldn't be a fatal error. try: - self._module = importlib.import_module(module) + module = importlib.import_module(module_name) except Exception as err: - raise Error(f'Failed to import module "{module}"') from err + raise Error(f'Failed to import module "{module_name}"') from err try: - self._function: Callable[[], int] = getattr(self._module, function) + member = getattr(module, member_name) except AttributeError as err: raise Error( - f'The function "{module}.{function}" does not exist') from err + f'"{module_name}.{member_name}" does not exist') from err - try: - params = inspect.signature(self._function).parameters - except TypeError: - raise Error( - 'Plugin functions must be callable, but ' - f'{module}.{function} is a {type(self._function).__name__}') + return cls(name, member, source) - positional_args = sum(p.default == p.empty for p in params.values()) - if positional_args: - raise Error( - f'Plugin functions cannot have any required positional ' - f'arguments, but {module}.{function} has {positional_args}') + def __init__(self, name: str, target: Any, source: Path = None) -> None: + """Creates a plugin for the provided target.""" + self.name = name + self._module = _get_module(target) + self.target = target + self.source = source + + @property + def target_name(self) -> str: + return (f'{self._module.__name__}.' + f'{getattr(self.target, "__name__", self.target)}') - def run(self, args: List[str]) -> int: + @property + def source_name(self) -> str: + return _BUILT_IN if self.source is None else str(self.source) + + def run_with_argv(self, argv: Iterable[str]) -> int: + """Sets sys.argv and calls the plugin function. + + This is used to call a plugin as if from the command line. + """ original_sys_argv = sys.argv - sys.argv = [f'pw {self.name}', *args] + sys.argv = [f'pw {self.name}', *argv] try: - return self._function() + return self.target() finally: sys.argv = original_sys_argv - def help(self) -> str: - """Returns a brief description of this plugin.""" - return self._function.__doc__ or self._module.__doc__ or '' + def help(self, full: bool = False) -> str: + """Returns a description of this plugin from its docstring.""" + docstring = self.target.__doc__ or self._module.__doc__ or '' + return docstring if full else next(iter(docstring.splitlines()), '') - def details(self) -> List[str]: - return [ - f'help {self.help()}', - f'module {self._module.__name__}', - f'function {self._function.__name__}', - f'source {self.source}', - ] + def details(self, full: bool = False) -> Iterator[str]: + yield f'help {self.help(full=full)}' + yield f'module {self._module.__name__}' + yield f'target {getattr(self.target, "__name__", self.target)}' + yield f'source {self.source_name}' + def __repr__(self) -> str: + return (f'{self.__class__.__name__}(name={self.name!r}, ' + f'target={self.target_name}' + f'{f", source={self.source_name!r}" if self.source else ""})') -# This is the global CLI plugin registry. -_registry: Dict[str, _Plugin] = {} -_sources: List[Path] = [] # Paths to PW_PLUGINS files -_errors: Dict[str, List[Error]] = collections.defaultdict(list) +def callable_with_no_args(plugin: Plugin) -> None: + """Checks that a plugin is callable without arguments. -def _get(name: str) -> _Plugin: - if name in _registry: - return _registry[name] + May be used for the validator argument to Registry. + """ + try: + params = inspect.signature(plugin.target).parameters + except TypeError: + raise Error('Plugin functions must be callable, but ' + f'{plugin.target_name} is a ' + f'{type(plugin.target).__name__}') + + positional = sum(p.default == p.empty for p in params.values()) + if positional: + raise Error(f'Plugin functions cannot have any required positional ' + f'arguments, but {plugin.target_name} has {positional}') + + +class Registry(collections.abc.Mapping): + """Manages a set of plugins from Python modules or plugins files.""" + def __init__(self, + validator: Callable[[Plugin], Any] = lambda _: None) -> None: + """Creates a new, empty plugins registry. + + Args: + validator: Function that checks whether a plugin is valid and should + be registered. Must raise plugins.Error is the plugin is invalid. + """ - if name in _errors: - raise Error(f'Registration for "{name}" failed: ' + - ', '.join(str(e) for e in _errors[name])) + self._registry: Dict[str, Plugin] = {} + self._sources: Set[Path] = set() # Paths to plugins files + self._errors: Dict[str, + List[Exception]] = collections.defaultdict(list) + self._validate_plugin = validator - raise Error(f'The plugin "{name}" has not been registered') + def __getitem__(self, name: str) -> Plugin: + """Accesses a plugin by name; raises KeyError if it does not exist.""" + if name in self._registry: + return self._registry[name] + if name in self._errors: + raise KeyError(f'Registration for "{name}" failed: ' + + ', '.join(str(e) for e in self._errors[name])) -def errors() -> Dict[str, List[Error]]: - return _errors + raise KeyError(f'The plugin "{name}" has not been registered') + def __iter__(self) -> Iterator[str]: + return iter(self._registry) -def run(name: str, args: List[str]) -> int: - """Runs a plugin by name. Raises Error if the plugin is not registered.""" - return _get(name).run(args) + def __len__(self) -> int: + return len(self._registry) + def errors(self) -> Dict[str, List[Exception]]: + return self._errors -def command_help() -> str: - """Returns a help string for the registered plugins.""" - width = max(len(name) for name in _registry) + 1 if _registry else 1 - help_items = '\n'.join(f' {name:{width}} {plugin.help()}' - for name, plugin in sorted(_registry.items())) - return f'supported commands:\n{help_items}' + def run_with_argv(self, name: str, argv: Iterable[str]) -> int: + """Runs a plugin by name, setting sys.argv to the provided args. + This is used to run a command as if it were executed directly from the + command line. The plugin is expected to return an int. -_BUILTIN_PLUGIN = '<built-in>' + Raises: + KeyError if plugin is not registered. + """ + return self[name].run_with_argv(argv) + + def _should_register(self, plugin: Plugin) -> bool: + """Determines and logs if a plugin should be registered or not. + + Some errors are exceptions, others are not. + """ + if plugin.name in self._registry and plugin.source is None: + raise Error( + f'Attempted to register built-in plugin "{plugin.name}", but ' + 'a plugin with that name was previously registered ' + f'({self[plugin.name]})!') -def _valid_registration(name: str, module: str, function: str, - source: Union[Path, str]) -> bool: - """Determines if a plugin should be registered or not.""" - existing = _registry.get(name) + # Run the user-provided validation function, which raises exceptions + # if there are errors. + self._validate_plugin(plugin) - if existing is None: - return True + existing = self._registry.get(plugin.name) - if source == _BUILTIN_PLUGIN: - raise Error( - f'Attempted to register built-in plugin "{name}", but that ' - f'plugin was previously registered ({existing.source})!') + if existing is None: + return True - if existing.source == _BUILTIN_PLUGIN: - _LOG.debug('%s: Overriding built-in plugin "%s" with %s.%s', source, - name, module, function) - return True + if existing.source is None: + _LOG.debug('%s: Overriding built-in plugin "%s" with %s', + plugin.source_name, plugin.name, plugin.target_name) + return True - if source == _registry[name].source: - _LOG.warning( - '%s: "%s" is registered multiple times in this file! ' - 'Only the first registration takes effect', source, name) - else: - _LOG.debug( - '%s: The plugin "%s" was previously registered in %s; ' - 'ignoring registration as %s.%s', source, name, - _registry[name].source, module, function) + if plugin.source != existing.source: + _LOG.debug( + '%s: The plugin "%s" was previously registered in %s; ' + 'ignoring registration as %s', plugin.source_name, plugin.name, + self._registry[plugin.name].source, plugin.target_name) + elif plugin.source not in self._sources: + _LOG.warning( + '%s: "%s" is registered file multiple times in this file! ' + 'Only the first registration takes effect', plugin.source_name, + plugin.name) + + return False + + def register(self, name: str, target: Any) -> Optional[Plugin]: + """Registers an object as a plugin.""" + return self._register(Plugin(name, target, None)) + + def register_by_name(self, + name: str, + module_name: str, + member_name: str, + source: Path = None) -> Optional[Plugin]: + """Registers an object from its module and name as a plugin.""" + return self._register( + Plugin.from_name(name, module_name, member_name, source)) + + def _register(self, plugin: Plugin) -> Optional[Plugin]: + # Prohibit functions not from a plugins file from overriding others. + if not self._should_register(plugin): + return None - return False + self._registry[plugin.name] = plugin + _LOG.debug('%s: Registered plugin "%s" for %s', plugin.source_name, + plugin.name, plugin.target_name) + return plugin -def _register(name: str, - module: str, - function: str, - source: Union[Path, str] = _BUILTIN_PLUGIN) -> None: - """Registers a plugin from the specified source.""" + def register_file(self, path: Path) -> None: + """Registers plugins from a plugins file. - if not _valid_registration(name, module, function, source): - return + Any exceptions raised from parsing the file are caught and logged. + """ + with path.open() as contents: + for lineno, line in enumerate(contents, 1): + line = line.strip() + if not line or line.startswith('#'): + continue + + try: + name, module, function = line.split() + except ValueError as err: + self._errors[line.strip()].append(Error(err)) + _LOG.error( + '%s:%d: Failed to parse plugin entry "%s": ' + 'Expected 3 items (name, module, function), ' + 'got %d', path, lineno, line, len(line.split())) + continue + + try: + self.register_by_name(name, module, function, path) + except Error as err: + self._errors[name].append(err) + _LOG.error('%s: Failed to register plugin "%s": %s', path, + name, err) + + self._sources.add(path) + + def register_directory(self, + directory: Path, + file_name: str, + restrict_to: Path = None) -> None: + """Finds and registers plugins from plugins files in a directory. + + Args: + directory: The directory from which to start searching up. + file_name: The name of plugins files to look for. + restrict_to: If provided, do not search higher than this directory. + """ + for path in find_all_in_parents(file_name, directory): + if not path.is_file(): + continue - try: - _registry[name] = _Plugin(name, module, function, source) - _LOG.debug('%s: Registered plugin "%s" for %s.%s', source, name, - module, function) - except Error as err: - _errors[name].append(err) - _LOG.error('%s: Failed to register plugin "%s": %s', source, name, err) + if restrict_to is not None and restrict_to not in path.parents: + _LOG.debug( + "Skipping plugins file %s because it's outside of %s", + path, restrict_to) + continue + + _LOG.debug('Found plugins file %s', path) + self.register_file(path) + + def short_help(self) -> str: + """Returns a help string for the registered plugins.""" + width = max(len(name) + for name in self._registry) + 1 if self._registry else 1 + help_items = '\n'.join( + f' {name:{width}} {plugin.help()}' + for name, plugin in sorted(self._registry.items())) + return f'supported plugins:\n{help_items}' + + def detailed_help(self, plugins: Iterable[str] = ()) -> Iterator[str]: + """Yields lines of detailed information about commands.""" + if not plugins: + plugins = list(self._registry) + + yield '\ndetailed plugin information:' + + wrapper = TextWrapper(width=80, + initial_indent=' ', + subsequent_indent=' ' * 11) + + plugins = sorted(plugins) + for plugin in plugins: + yield f' [{plugin}]' + + try: + for line in self[plugin].details(full=len(plugins) == 1): + yield wrapper.fill(line) + except KeyError as err: + yield wrapper.fill(f'error {str(err)[1:-1]}') + + yield '' + + yield 'Plugins files:' + + if self._sources: + yield from (f' [{i}] {file}' + for i, file in enumerate(self._sources, 1)) + else: + yield ' (none found)' def find_in_parents(name: str, path: Path) -> Optional[Path]: @@ -199,7 +378,7 @@ def find_in_parents(name: str, path: Path) -> Optional[Path]: return path.joinpath(name) -def find_all_in_parents(name: str, path: Path) -> Iterable[Path]: +def find_all_in_parents(name: str, path: Path) -> Iterator[Path]: """Searches all parent directories of the path for files or directories.""" while True: @@ -209,93 +388,3 @@ def find_all_in_parents(name: str, path: Path) -> Iterable[Path]: yield result path = result.parent.parent - - -def _register_builtin_plugins(): - """Registers the commands that are included with pw by default.""" - - _register('doctor', 'pw_doctor.doctor', 'main') - _register('format', 'pw_presubmit.format_code', 'main') - _register('help', 'pw_cli.plugins', '_help_command') - _register('logdemo', 'pw_cli.log', 'main') - _register('module-check', 'pw_module.check', 'main') - _register('test', 'pw_unit_test.test_runner', 'main') - _register('watch', 'pw_watch.watch', 'main') - - -def register(directory: Path): - """Finds and registers command line plugins.""" - _register_builtin_plugins() - - # Find pw plugins files starting in the current and parent directories. - for path in find_all_in_parents(REGISTRY_FILE, directory): - if not path.is_file(): - continue - - root = Path(os.environ.get('PW_PROJECT_ROOT', '')).resolve() - if root not in path.parents: - _LOG.debug( - "Skipping plugins file %s because it's outside of " - 'PW_PROJECT_ROOT (%s)', path, root) - continue - - _LOG.debug('Found plugins file %s', path) - _sources.append(path) - - with path.open() as contents: - for lineno, line in enumerate(contents, 1): - line = line.strip() - if line and not line.startswith('#'): - try: - name, module, function = line.split() - _register(name, module, function, path) - except ValueError as err: - _errors[line.strip()].append(Error(err)) - _LOG.error( - '%s:%d: Failed to parse plugin entry "%s": ' - 'Expected 3 items (name, module, function), got %d', - path, lineno, line, len(line.split())) - - -def _help_text(plugins: Iterable[str] = ()) -> Iterable[str]: - """Yields detailed information about commands.""" - yield arguments.format_help() - - if not plugins: - plugins = list(_registry) - - yield '\ndetailed command information:' - - wrapper = TextWrapper(width=80, - initial_indent=' ', - subsequent_indent=' ' * 13) - - for plugin in sorted(plugins): - yield f' [{plugin}]' - - try: - for line in _get(plugin).details(): - yield wrapper.fill(line) - except Error as err: - yield wrapper.fill(f'error {err}') - - yield '' - - yield 'PW_PLUGINS files:' - - if _sources: - yield from (f' [{i}] {file}' for i, file in enumerate(_sources, 1)) - else: - yield ' (none found)' - - -def _help_command(): - """Display detailed information about pw commands.""" - parser = argparse.ArgumentParser(description=_help_command.__doc__) - parser.add_argument('plugins', - metavar='plugin', - nargs='*', - help='command for which to display detailed info') - - for line in _help_text(**vars(parser.parse_args())): - print(line, file=sys.stderr) diff --git a/pw_cli/py/pw_cli/pw_command_plugins.py b/pw_cli/py/pw_cli/pw_command_plugins.py new file mode 100644 index 000000000..a5c20df6f --- /dev/null +++ b/pw_cli/py/pw_cli/pw_command_plugins.py @@ -0,0 +1,71 @@ +# Copyright 2021 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""This module manages the global plugin registry for pw_cli.""" + +import argparse +import os +from pathlib import Path +import sys +from typing import Iterable + +from pw_cli import arguments, plugins + +_plugin_registry = plugins.Registry(validator=plugins.callable_with_no_args) +REGISTRY_FILE = 'PW_PLUGINS' + + +def _register_builtin_plugins(registry: plugins.Registry) -> None: + """Registers the commands that are included with pw by default.""" + + # Register these by name to avoid circular dependencies. + registry.register_by_name('doctor', 'pw_doctor.doctor', 'main') + registry.register_by_name('format', 'pw_presubmit.format_code', 'main') + registry.register_by_name('logdemo', 'pw_cli.log', 'main') + registry.register_by_name('module-check', 'pw_module.check', 'main') + registry.register_by_name('test', 'pw_unit_test.test_runner', 'main') + registry.register_by_name('watch', 'pw_watch.watch', 'main') + + registry.register('help', _help_command) + + +def _help_command(): + """Display detailed information about pw commands.""" + parser = argparse.ArgumentParser(description=_help_command.__doc__) + parser.add_argument('plugins', + metavar='plugin', + nargs='*', + help='command for which to display detailed info') + + print(arguments.format_help(_plugin_registry), file=sys.stderr) + + for line in _plugin_registry.detailed_help(**vars(parser.parse_args())): + print(line, file=sys.stderr) + + +def register(directory: Path) -> None: + _register_builtin_plugins(_plugin_registry) + _plugin_registry.register_directory( + directory, REGISTRY_FILE, Path(os.environ.get('PW_PROJECT_ROOT', ''))) + + +def errors() -> dict: + return _plugin_registry.errors() + + +def format_help() -> str: + return arguments.format_help(_plugin_registry) + + +def run(name: str, args: Iterable[str]) -> int: + return _plugin_registry.run_with_argv(name, args) diff --git a/pw_doctor/py/pw_doctor/doctor.py b/pw_doctor/py/pw_doctor/doctor.py index 16fb2200e..7594616b7 100755 --- a/pw_doctor/py/pw_doctor/doctor.py +++ b/pw_doctor/py/pw_doctor/doctor.py @@ -26,7 +26,7 @@ import sys import tempfile from typing import Callable, Iterable, List, Set -import pw_cli.plugins +import pw_cli.pw_command_plugins def call_stdout(*args, **kwargs): @@ -130,7 +130,7 @@ CHECKS: List[Callable] = [] @register_into(CHECKS) def pw_plugins(ctx: DoctorContext): - if pw_cli.plugins.errors(): + if pw_cli.pw_command_plugins.errors(): ctx.error('Not all pw plugins loaded successfully') |