aboutsummaryrefslogtreecommitdiff
path: root/src/stream/stream/split.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream/stream/split.rs')
-rw-r--r--src/stream/stream/split.rs80
1 files changed, 80 insertions, 0 deletions
diff --git a/src/stream/stream/split.rs b/src/stream/stream/split.rs
index e2034e0..1a7fdcb 100644
--- a/src/stream/stream/split.rs
+++ b/src/stream/stream/split.rs
@@ -15,6 +15,13 @@ pub struct SplitStream<S>(BiLock<S>);
impl<S> Unpin for SplitStream<S> {}
+impl<S> SplitStream<S> {
+ /// Returns `true` if the `SplitStream<S>` and `SplitSink<S>` originate from the same call to `StreamExt::split`.
+ pub fn is_pair_of<Item>(&self, other: &SplitSink<S, Item>) -> bool {
+ other.is_pair_of(&self)
+ }
+}
+
impl<S: Unpin> SplitStream<S> {
/// Attempts to put the two "halves" of a split `Stream + Sink` back
/// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
@@ -60,6 +67,13 @@ impl<S: Sink<Item> + Unpin, Item> SplitSink<S, Item> {
}
}
+impl<S, Item> SplitSink<S, Item> {
+ /// Returns `true` if the `SplitStream<S>` and `SplitSink<S>` originate from the same call to `StreamExt::split`.
+ pub fn is_pair_of(&self, other: &SplitStream<S>) -> bool {
+ self.lock.is_pair_of(&other.0)
+ }
+}
+
impl<S: Sink<Item>, Item> SplitSink<S, Item> {
fn poll_flush_slot(
mut inner: Pin<&mut S>,
@@ -142,3 +156,69 @@ impl<T, Item> fmt::Display for ReuniteError<T, Item> {
#[cfg(feature = "std")]
impl<T: core::any::Any, Item> std::error::Error for ReuniteError<T, Item> {}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::{sink::Sink, stream::StreamExt};
+ use core::marker::PhantomData;
+
+ struct NopStream<Item> {
+ phantom: PhantomData<Item>,
+ }
+
+ impl<Item> Stream for NopStream<Item> {
+ type Item = Item;
+
+ fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ todo!()
+ }
+ }
+
+ impl<Item> Sink<Item> for NopStream<Item> {
+ type Error = ();
+
+ fn poll_ready(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ todo!()
+ }
+
+ fn start_send(self: Pin<&mut Self>, _item: Item) -> Result<(), Self::Error> {
+ todo!()
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ todo!()
+ }
+
+ fn poll_close(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ todo!()
+ }
+ }
+
+ #[test]
+ fn test_pairing() {
+ let s1 = NopStream::<()> { phantom: PhantomData };
+ let (sink1, stream1) = s1.split();
+ assert!(sink1.is_pair_of(&stream1));
+ assert!(stream1.is_pair_of(&sink1));
+
+ let s2 = NopStream::<()> { phantom: PhantomData };
+ let (sink2, stream2) = s2.split();
+ assert!(sink2.is_pair_of(&stream2));
+ assert!(stream2.is_pair_of(&sink2));
+
+ assert!(!sink1.is_pair_of(&stream2));
+ assert!(!stream1.is_pair_of(&sink2));
+ assert!(!sink2.is_pair_of(&stream1));
+ assert!(!stream2.is_pair_of(&sink1));
+ }
+}