aboutsummaryrefslogtreecommitdiff
path: root/src/iter/for_each.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/iter/for_each.rs')
-rw-r--r--src/iter/for_each.rs77
1 files changed, 77 insertions, 0 deletions
diff --git a/src/iter/for_each.rs b/src/iter/for_each.rs
new file mode 100644
index 0000000..3b77beb
--- /dev/null
+++ b/src/iter/for_each.rs
@@ -0,0 +1,77 @@
+use super::noop::*;
+use super::plumbing::*;
+use super::ParallelIterator;
+
+pub(super) fn for_each<I, F, T>(pi: I, op: &F)
+where
+ I: ParallelIterator<Item = T>,
+ F: Fn(T) + Sync,
+ T: Send,
+{
+ let consumer = ForEachConsumer { op };
+ pi.drive_unindexed(consumer)
+}
+
+struct ForEachConsumer<'f, F> {
+ op: &'f F,
+}
+
+impl<'f, F, T> Consumer<T> for ForEachConsumer<'f, F>
+where
+ F: Fn(T) + Sync,
+{
+ type Folder = ForEachConsumer<'f, F>;
+ type Reducer = NoopReducer;
+ type Result = ();
+
+ fn split_at(self, _index: usize) -> (Self, Self, NoopReducer) {
+ (self.split_off_left(), self, NoopReducer)
+ }
+
+ fn into_folder(self) -> Self {
+ self
+ }
+
+ fn full(&self) -> bool {
+ false
+ }
+}
+
+impl<'f, F, T> Folder<T> for ForEachConsumer<'f, F>
+where
+ F: Fn(T) + Sync,
+{
+ type Result = ();
+
+ fn consume(self, item: T) -> Self {
+ (self.op)(item);
+ self
+ }
+
+ fn consume_iter<I>(self, iter: I) -> Self
+ where
+ I: IntoIterator<Item = T>,
+ {
+ iter.into_iter().for_each(self.op);
+ self
+ }
+
+ fn complete(self) {}
+
+ fn full(&self) -> bool {
+ false
+ }
+}
+
+impl<'f, F, T> UnindexedConsumer<T> for ForEachConsumer<'f, F>
+where
+ F: Fn(T) + Sync,
+{
+ fn split_off_left(&self) -> Self {
+ ForEachConsumer { op: self.op }
+ }
+
+ fn to_reducer(&self) -> NoopReducer {
+ NoopReducer
+ }
+}