aboutsummaryrefslogtreecommitdiff
path: root/src/stream/stream/count.rs
diff options
context:
space:
mode:
authorDavid LeGare <legare@google.com>2022-03-04 02:06:33 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2022-03-04 02:06:33 +0000
commite2f2e216a919a0c300a9b7942233344ed065925f (patch)
tree7b826874213015b98b9e0383a89c88be66b64b11 /src/stream/stream/count.rs
parentd4344fa6b0ff2c5a8487fa8f1ed3904ad46cd82a (diff)
parentb9180251b318c57c5c635cff669a6ea82d56e516 (diff)
downloadfutures-util-e2f2e216a919a0c300a9b7942233344ed065925f.tar.gz
Update futures-util to 0.3.21 am: 737dc97288 am: b9180251b3
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures-util/+/2004175 Change-Id: I30785daabcfc58d3479c32b9aff9cc4130c3bfe0
Diffstat (limited to 'src/stream/stream/count.rs')
-rw-r--r--src/stream/stream/count.rs53
1 files changed, 53 insertions, 0 deletions
diff --git a/src/stream/stream/count.rs b/src/stream/stream/count.rs
new file mode 100644
index 0000000..513cab7
--- /dev/null
+++ b/src/stream/stream/count.rs
@@ -0,0 +1,53 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream};
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`count`](super::StreamExt::count) method.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Count<St> {
+ #[pin]
+ stream: St,
+ count: usize
+ }
+}
+
+impl<St> fmt::Debug for Count<St>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Count").field("stream", &self.stream).field("count", &self.count).finish()
+ }
+}
+
+impl<St: Stream> Count<St> {
+ pub(super) fn new(stream: St) -> Self {
+ Self { stream, count: 0 }
+ }
+}
+
+impl<St: FusedStream> FusedFuture for Count<St> {
+ fn is_terminated(&self) -> bool {
+ self.stream.is_terminated()
+ }
+}
+
+impl<St: Stream> Future for Count<St> {
+ type Output = usize;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut this = self.project();
+
+ Poll::Ready(loop {
+ match ready!(this.stream.as_mut().poll_next(cx)) {
+ Some(_) => *this.count += 1,
+ None => break *this.count,
+ }
+ })
+ }
+}