aboutsummaryrefslogtreecommitdiff
path: root/pyee/twisted.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyee/twisted.py')
-rw-r--r--pyee/twisted.py93
1 files changed, 93 insertions, 0 deletions
diff --git a/pyee/twisted.py b/pyee/twisted.py
new file mode 100644
index 0000000..2b9d20b
--- /dev/null
+++ b/pyee/twisted.py
@@ -0,0 +1,93 @@
+# -*- coding: utf-8 -*-
+
+from typing import Any, Callable, Dict, Tuple
+
+from twisted.internet.defer import Deferred, ensureDeferred
+from twisted.python.failure import Failure
+
+from pyee.base import EventEmitter, PyeeException
+
+try:
+ from asyncio import iscoroutine
+except ImportError:
+ iscoroutine = None
+
+
+__all__ = ["TwistedEventEmitter"]
+
+
+class TwistedEventEmitter(EventEmitter):
+ """An event emitter class which can run twisted coroutines and handle
+ returned Deferreds, in addition to synchronous blocking functions. For
+ example::
+
+ @ee.on('event')
+ @inlineCallbacks
+ def async_handler(*args, **kwargs):
+ yield returns_a_deferred()
+
+ or::
+
+ @ee.on('event')
+ async def async_handler(*args, **kwargs):
+ await returns_a_deferred()
+
+
+ When async handlers fail, Failures are first emitted on the ``failure``
+ event. If there are no ``failure`` handlers, the Failure's associated
+ exception is then emitted on the ``error`` event. If there are no ``error``
+ handlers, the exception is raised. For consistency, when handlers raise
+ errors synchronously, they're captured, wrapped in a Failure and treated
+ as an async failure. This is unlike the behavior of EventEmitter,
+ which have no special error handling.
+
+ For twisted coroutine event handlers, calling emit is non-blocking.
+ In other words, you do not have to await any results from emit, and the
+ coroutine is scheduled in a fire-and-forget fashion.
+
+ Similar behavior occurs for "sync" functions which return Deferreds.
+ """
+
+ def __init__(self):
+ super(TwistedEventEmitter, self).__init__()
+
+ def _emit_run(
+ self,
+ f: Callable,
+ args: Tuple[Any, ...],
+ kwargs: Dict[str, Any],
+ ) -> None:
+ d = None
+ try:
+ result = f(*args, **kwargs)
+ except Exception:
+ self.emit("failure", Failure())
+ else:
+ if iscoroutine and iscoroutine(result):
+ d: Deferred[Any] = ensureDeferred(result)
+ elif isinstance(result, Deferred):
+ d = result
+ else:
+ return
+
+ def errback(failure: Failure) -> None:
+ if failure:
+ self.emit("failure", failure)
+
+ d.addErrback(errback)
+
+ def _emit_handle_potential_error(self, event: str, error: Any) -> None:
+ if event == "failure":
+ if isinstance(error, Failure):
+ try:
+ error.raiseException()
+ except Exception as exc:
+ self.emit("error", exc)
+ elif isinstance(error, Exception):
+ self.emit("error", error)
+ else:
+ self.emit("error", PyeeException(f"Unexpected failure object: {error}"))
+ else:
+ (super(TwistedEventEmitter, self))._emit_handle_potential_error(
+ event, error
+ )