aboutsummaryrefslogtreecommitdiff
path: root/src/iter/extend.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/iter/extend.rs')
-rw-r--r--src/iter/extend.rs398
1 files changed, 313 insertions, 85 deletions
diff --git a/src/iter/extend.rs b/src/iter/extend.rs
index fb89249..1769d47 100644
--- a/src/iter/extend.rs
+++ b/src/iter/extend.rs
@@ -1,4 +1,5 @@
use super::noop::NoopConsumer;
+use super::plumbing::{Consumer, Folder, Reducer, UnindexedConsumer};
use super::{IntoParallelIterator, ParallelExtend, ParallelIterator};
use std::borrow::Cow;
@@ -9,55 +10,91 @@ use std::hash::{BuildHasher, Hash};
/// Performs a generic `par_extend` by collecting to a `LinkedList<Vec<_>>` in
/// parallel, then extending the collection sequentially.
-fn extend<C, I, F>(collection: &mut C, par_iter: I, reserve: F)
-where
- I: IntoParallelIterator,
- F: FnOnce(&mut C, &LinkedList<Vec<I::Item>>),
- C: Extend<I::Item>,
-{
- let list = collect(par_iter);
- reserve(collection, &list);
- for vec in list {
- collection.extend(vec);
- }
+macro_rules! extend {
+ ($self:ident, $par_iter:ident, $extend:ident) => {
+ $extend(
+ $self,
+ $par_iter.into_par_iter().drive_unindexed(ListVecConsumer),
+ );
+ };
}
-pub(super) fn collect<I>(par_iter: I) -> LinkedList<Vec<I::Item>>
-where
- I: IntoParallelIterator,
-{
- par_iter
- .into_par_iter()
- .fold(Vec::new, vec_push)
- .map(as_list)
- .reduce(LinkedList::new, list_append)
+/// Computes the total length of a `LinkedList<Vec<_>>`.
+fn len<T>(list: &LinkedList<Vec<T>>) -> usize {
+ list.iter().map(Vec::len).sum()
}
-fn vec_push<T>(mut vec: Vec<T>, elem: T) -> Vec<T> {
- vec.push(elem);
- vec
-}
+struct ListVecConsumer;
-fn as_list<T>(item: T) -> LinkedList<T> {
- let mut list = LinkedList::new();
- list.push_back(item);
- list
+struct ListVecFolder<T> {
+ vec: Vec<T>,
}
-fn list_append<T>(mut list1: LinkedList<T>, mut list2: LinkedList<T>) -> LinkedList<T> {
- list1.append(&mut list2);
- list1
+impl<T: Send> Consumer<T> for ListVecConsumer {
+ type Folder = ListVecFolder<T>;
+ type Reducer = ListReducer;
+ type Result = LinkedList<Vec<T>>;
+
+ fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
+ (Self, Self, ListReducer)
+ }
+
+ fn into_folder(self) -> Self::Folder {
+ ListVecFolder { vec: Vec::new() }
+ }
+
+ fn full(&self) -> bool {
+ false
+ }
}
-/// Computes the total length of a `LinkedList<Vec<_>>`.
-pub(super) fn len<T>(list: &LinkedList<Vec<T>>) -> usize {
- list.iter().map(Vec::len).sum()
+impl<T: Send> UnindexedConsumer<T> for ListVecConsumer {
+ fn split_off_left(&self) -> Self {
+ Self
+ }
+
+ fn to_reducer(&self) -> Self::Reducer {
+ ListReducer
+ }
}
-fn no_reserve<C, T>(_: &mut C, _: &LinkedList<Vec<T>>) {}
+impl<T> Folder<T> for ListVecFolder<T> {
+ type Result = LinkedList<Vec<T>>;
+
+ fn consume(mut self, item: T) -> Self {
+ self.vec.push(item);
+ self
+ }
+
+ fn consume_iter<I>(mut self, iter: I) -> Self
+ where
+ I: IntoIterator<Item = T>,
+ {
+ self.vec.extend(iter);
+ self
+ }
-fn heap_reserve<T: Ord, U>(heap: &mut BinaryHeap<T>, list: &LinkedList<Vec<U>>) {
- heap.reserve(len(list));
+ fn complete(self) -> Self::Result {
+ let mut list = LinkedList::new();
+ if !self.vec.is_empty() {
+ list.push_back(self.vec);
+ }
+ list
+ }
+
+ fn full(&self) -> bool {
+ false
+ }
+}
+
+fn heap_extend<T, Item>(heap: &mut BinaryHeap<T>, list: LinkedList<Vec<Item>>)
+where
+ BinaryHeap<T>: Extend<Item>,
+{
+ heap.reserve(len(&list));
+ for vec in list {
+ heap.extend(vec);
+ }
}
/// Extends a binary heap with items from a parallel iterator.
@@ -69,7 +106,7 @@ where
where
I: IntoParallelIterator<Item = T>,
{
- extend(self, par_iter, heap_reserve);
+ extend!(self, par_iter, heap_extend);
}
}
@@ -82,7 +119,16 @@ where
where
I: IntoParallelIterator<Item = &'a T>,
{
- extend(self, par_iter, heap_reserve);
+ extend!(self, par_iter, heap_extend);
+ }
+}
+
+fn btree_map_extend<K, V, Item>(map: &mut BTreeMap<K, V>, list: LinkedList<Vec<Item>>)
+where
+ BTreeMap<K, V>: Extend<Item>,
+{
+ for vec in list {
+ map.extend(vec);
}
}
@@ -96,7 +142,7 @@ where
where
I: IntoParallelIterator<Item = (K, V)>,
{
- extend(self, par_iter, no_reserve);
+ extend!(self, par_iter, btree_map_extend);
}
}
@@ -110,7 +156,16 @@ where
where
I: IntoParallelIterator<Item = (&'a K, &'a V)>,
{
- extend(self, par_iter, no_reserve);
+ extend!(self, par_iter, btree_map_extend);
+ }
+}
+
+fn btree_set_extend<T, Item>(set: &mut BTreeSet<T>, list: LinkedList<Vec<Item>>)
+where
+ BTreeSet<T>: Extend<Item>,
+{
+ for vec in list {
+ set.extend(vec);
}
}
@@ -123,7 +178,7 @@ where
where
I: IntoParallelIterator<Item = T>,
{
- extend(self, par_iter, no_reserve);
+ extend!(self, par_iter, btree_set_extend);
}
}
@@ -136,16 +191,20 @@ where
where
I: IntoParallelIterator<Item = &'a T>,
{
- extend(self, par_iter, no_reserve);
+ extend!(self, par_iter, btree_set_extend);
}
}
-fn map_reserve<K, V, S, U>(map: &mut HashMap<K, V, S>, list: &LinkedList<Vec<U>>)
+fn hash_map_extend<K, V, S, Item>(map: &mut HashMap<K, V, S>, list: LinkedList<Vec<Item>>)
where
+ HashMap<K, V, S>: Extend<Item>,
K: Eq + Hash,
S: BuildHasher,
{
- map.reserve(len(list));
+ map.reserve(len(&list));
+ for vec in list {
+ map.extend(vec);
+ }
}
/// Extends a hash map with items from a parallel iterator.
@@ -160,7 +219,7 @@ where
I: IntoParallelIterator<Item = (K, V)>,
{
// See the map_collect benchmarks in rayon-demo for different strategies.
- extend(self, par_iter, map_reserve);
+ extend!(self, par_iter, hash_map_extend);
}
}
@@ -175,16 +234,20 @@ where
where
I: IntoParallelIterator<Item = (&'a K, &'a V)>,
{
- extend(self, par_iter, map_reserve);
+ extend!(self, par_iter, hash_map_extend);
}
}
-fn set_reserve<T, S, U>(set: &mut HashSet<T, S>, list: &LinkedList<Vec<U>>)
+fn hash_set_extend<T, S, Item>(set: &mut HashSet<T, S>, list: LinkedList<Vec<Item>>)
where
+ HashSet<T, S>: Extend<Item>,
T: Eq + Hash,
S: BuildHasher,
{
- set.reserve(len(list));
+ set.reserve(len(&list));
+ for vec in list {
+ set.extend(vec);
+ }
}
/// Extends a hash set with items from a parallel iterator.
@@ -197,7 +260,7 @@ where
where
I: IntoParallelIterator<Item = T>,
{
- extend(self, par_iter, set_reserve);
+ extend!(self, par_iter, hash_set_extend);
}
}
@@ -211,15 +274,10 @@ where
where
I: IntoParallelIterator<Item = &'a T>,
{
- extend(self, par_iter, set_reserve);
+ extend!(self, par_iter, hash_set_extend);
}
}
-fn list_push_back<T>(mut list: LinkedList<T>, elem: T) -> LinkedList<T> {
- list.push_back(elem);
- list
-}
-
/// Extends a linked list with items from a parallel iterator.
impl<T> ParallelExtend<T> for LinkedList<T>
where
@@ -229,10 +287,7 @@ where
where
I: IntoParallelIterator<Item = T>,
{
- let mut list = par_iter
- .into_par_iter()
- .fold(LinkedList::new, list_push_back)
- .reduce(LinkedList::new, list_append);
+ let mut list = par_iter.into_par_iter().drive_unindexed(ListConsumer);
self.append(&mut list);
}
}
@@ -246,13 +301,83 @@ where
where
I: IntoParallelIterator<Item = &'a T>,
{
- self.par_extend(par_iter.into_par_iter().cloned())
+ self.par_extend(par_iter.into_par_iter().copied())
+ }
+}
+
+struct ListConsumer;
+
+struct ListFolder<T> {
+ list: LinkedList<T>,
+}
+
+struct ListReducer;
+
+impl<T: Send> Consumer<T> for ListConsumer {
+ type Folder = ListFolder<T>;
+ type Reducer = ListReducer;
+ type Result = LinkedList<T>;
+
+ fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
+ (Self, Self, ListReducer)
+ }
+
+ fn into_folder(self) -> Self::Folder {
+ ListFolder {
+ list: LinkedList::new(),
+ }
+ }
+
+ fn full(&self) -> bool {
+ false
+ }
+}
+
+impl<T: Send> UnindexedConsumer<T> for ListConsumer {
+ fn split_off_left(&self) -> Self {
+ Self
+ }
+
+ fn to_reducer(&self) -> Self::Reducer {
+ ListReducer
+ }
+}
+
+impl<T> Folder<T> for ListFolder<T> {
+ type Result = LinkedList<T>;
+
+ fn consume(mut self, item: T) -> Self {
+ self.list.push_back(item);
+ self
+ }
+
+ fn consume_iter<I>(mut self, iter: I) -> Self
+ where
+ I: IntoIterator<Item = T>,
+ {
+ self.list.extend(iter);
+ self
+ }
+
+ fn complete(self) -> Self::Result {
+ self.list
+ }
+
+ fn full(&self) -> bool {
+ false
+ }
+}
+
+impl<T> Reducer<LinkedList<T>> for ListReducer {
+ fn reduce(self, mut left: LinkedList<T>, mut right: LinkedList<T>) -> LinkedList<T> {
+ left.append(&mut right);
+ left
}
}
-fn string_push(mut string: String, ch: char) -> String {
- string.push(ch);
- string
+fn flat_string_extend(string: &mut String, list: LinkedList<String>) {
+ string.reserve(list.iter().map(String::len).sum());
+ string.extend(list);
}
/// Extends a string with characters from a parallel iterator.
@@ -263,14 +388,8 @@ impl ParallelExtend<char> for String {
{
// This is like `extend`, but `Vec<char>` is less efficient to deal
// with than `String`, so instead collect to `LinkedList<String>`.
- let list: LinkedList<_> = par_iter
- .into_par_iter()
- .fold(String::new, string_push)
- .map(as_list)
- .reduce(LinkedList::new, list_append);
-
- self.reserve(list.iter().map(String::len).sum());
- self.extend(list)
+ let list = par_iter.into_par_iter().drive_unindexed(ListStringConsumer);
+ flat_string_extend(self, list);
}
}
@@ -280,13 +399,85 @@ impl<'a> ParallelExtend<&'a char> for String {
where
I: IntoParallelIterator<Item = &'a char>,
{
- self.par_extend(par_iter.into_par_iter().cloned())
+ self.par_extend(par_iter.into_par_iter().copied())
}
}
-fn string_reserve<T: AsRef<str>>(string: &mut String, list: &LinkedList<Vec<T>>) {
- let len = list.iter().flatten().map(T::as_ref).map(str::len).sum();
+struct ListStringConsumer;
+
+struct ListStringFolder {
+ string: String,
+}
+
+impl Consumer<char> for ListStringConsumer {
+ type Folder = ListStringFolder;
+ type Reducer = ListReducer;
+ type Result = LinkedList<String>;
+
+ fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
+ (Self, Self, ListReducer)
+ }
+
+ fn into_folder(self) -> Self::Folder {
+ ListStringFolder {
+ string: String::new(),
+ }
+ }
+
+ fn full(&self) -> bool {
+ false
+ }
+}
+
+impl UnindexedConsumer<char> for ListStringConsumer {
+ fn split_off_left(&self) -> Self {
+ Self
+ }
+
+ fn to_reducer(&self) -> Self::Reducer {
+ ListReducer
+ }
+}
+
+impl Folder<char> for ListStringFolder {
+ type Result = LinkedList<String>;
+
+ fn consume(mut self, item: char) -> Self {
+ self.string.push(item);
+ self
+ }
+
+ fn consume_iter<I>(mut self, iter: I) -> Self
+ where
+ I: IntoIterator<Item = char>,
+ {
+ self.string.extend(iter);
+ self
+ }
+
+ fn complete(self) -> Self::Result {
+ let mut list = LinkedList::new();
+ if !self.string.is_empty() {
+ list.push_back(self.string);
+ }
+ list
+ }
+
+ fn full(&self) -> bool {
+ false
+ }
+}
+
+fn string_extend<Item>(string: &mut String, list: LinkedList<Vec<Item>>)
+where
+ String: Extend<Item>,
+ Item: AsRef<str>,
+{
+ let len = list.iter().flatten().map(Item::as_ref).map(str::len).sum();
string.reserve(len);
+ for vec in list {
+ string.extend(vec);
+ }
}
/// Extends a string with string slices from a parallel iterator.
@@ -295,7 +486,7 @@ impl<'a> ParallelExtend<&'a str> for String {
where
I: IntoParallelIterator<Item = &'a str>,
{
- extend(self, par_iter, string_reserve);
+ extend!(self, par_iter, string_extend);
}
}
@@ -305,7 +496,7 @@ impl ParallelExtend<String> for String {
where
I: IntoParallelIterator<Item = String>,
{
- extend(self, par_iter, string_reserve);
+ extend!(self, par_iter, string_extend);
}
}
@@ -315,12 +506,18 @@ impl<'a> ParallelExtend<Cow<'a, str>> for String {
where
I: IntoParallelIterator<Item = Cow<'a, str>>,
{
- extend(self, par_iter, string_reserve);
+ extend!(self, par_iter, string_extend);
}
}
-fn deque_reserve<T, U>(deque: &mut VecDeque<T>, list: &LinkedList<Vec<U>>) {
- deque.reserve(len(list));
+fn deque_extend<T, Item>(deque: &mut VecDeque<T>, list: LinkedList<Vec<Item>>)
+where
+ VecDeque<T>: Extend<Item>,
+{
+ deque.reserve(len(&list));
+ for vec in list {
+ deque.extend(vec);
+ }
}
/// Extends a deque with items from a parallel iterator.
@@ -332,7 +529,7 @@ where
where
I: IntoParallelIterator<Item = T>,
{
- extend(self, par_iter, deque_reserve);
+ extend!(self, par_iter, deque_extend);
}
}
@@ -345,12 +542,43 @@ where
where
I: IntoParallelIterator<Item = &'a T>,
{
- extend(self, par_iter, deque_reserve);
+ extend!(self, par_iter, deque_extend);
}
}
-// See the `collect` module for the `Vec<T>` implementation.
-// impl<T> ParallelExtend<T> for Vec<T>
+fn vec_append<T>(vec: &mut Vec<T>, list: LinkedList<Vec<T>>) {
+ vec.reserve(len(&list));
+ for mut other in list {
+ vec.append(&mut other);
+ }
+}
+
+/// Extends a vector with items from a parallel iterator.
+impl<T> ParallelExtend<T> for Vec<T>
+where
+ T: Send,
+{
+ fn par_extend<I>(&mut self, par_iter: I)
+ where
+ I: IntoParallelIterator<Item = T>,
+ {
+ // See the vec_collect benchmarks in rayon-demo for different strategies.
+ let par_iter = par_iter.into_par_iter();
+ match par_iter.opt_len() {
+ Some(len) => {
+ // When Rust gets specialization, we can get here for indexed iterators
+ // without relying on `opt_len`. Until then, `special_extend()` fakes
+ // an unindexed mode on the promise that `opt_len()` is accurate.
+ super::collect::special_extend(par_iter, len, self);
+ }
+ None => {
+ // This works like `extend`, but `Vec::append` is more efficient.
+ let list = par_iter.drive_unindexed(ListVecConsumer);
+ vec_append(self, list);
+ }
+ }
+ }
+}
/// Extends a vector with copied items from a parallel iterator.
impl<'a, T> ParallelExtend<&'a T> for Vec<T>
@@ -361,7 +589,7 @@ where
where
I: IntoParallelIterator<Item = &'a T>,
{
- self.par_extend(par_iter.into_par_iter().cloned())
+ self.par_extend(par_iter.into_par_iter().copied())
}
}