aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWyatt Hepler <hepler@google.com>2023-01-27 21:00:15 +0000
committerCQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>2023-01-27 21:00:15 +0000
commit5b53103e1076a700be79d7ed700b51f2077ccb0d (patch)
treecc027bccdb5019d8509aa424f2fda9b79a466006
parentaaf7aa543ce96ded8b578fc337a4048ac34750ab (diff)
downloadpigweed-5b53103e1076a700be79d7ed700b51f2077ccb0d.tar.gz
pw_transfer: Set RPC streams directly
Rather than passing the RPC stream to the transfer thread to be moved, set it directly. pw_rpc call objects are synchronized by pw_rpc, so it is safe to move them between threads. With upcoming pw_rpc changes, a thread will wait until an RPC call's callbacks finish before moving the call object. This could cause deadlocks in pw_transfer if a packet arrives immediately after the stream starts, before the transfer thread gets a chance to move the call object to its final location. The RPC thread would wait for the next event to be available in the callback, while the transfer thread would wait to move the new call object out of the next event until the callback completed, resulting in deadlock. This change avoids this issue without needing to drop any packets. Change-Id: I24b088e36b7712ceda042cdbe80e0b05dec480b7 Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/126924 Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com> Reviewed-by: Alexei Frolov <frolv@google.com> Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
-rw-r--r--pw_transfer/context.cc1
-rw-r--r--pw_transfer/public/pw_transfer/internal/event.h4
-rw-r--r--pw_transfer/public/pw_transfer/transfer_thread.h16
-rw-r--r--pw_transfer/transfer_thread.cc45
4 files changed, 7 insertions, 59 deletions
diff --git a/pw_transfer/context.cc b/pw_transfer/context.cc
index 2e2d77b08..22686398c 100644
--- a/pw_transfer/context.cc
+++ b/pw_transfer/context.cc
@@ -75,7 +75,6 @@ void Context::HandleEvent(const Event& event) {
return;
case EventType::kSendStatusChunk:
- case EventType::kSetTransferStream:
case EventType::kAddTransferHandler:
case EventType::kRemoveTransferHandler:
case EventType::kTerminate:
diff --git a/pw_transfer/public/pw_transfer/internal/event.h b/pw_transfer/public/pw_transfer/internal/event.h
index 61fc19521..79a7853d1 100644
--- a/pw_transfer/public/pw_transfer/internal/event.h
+++ b/pw_transfer/public/pw_transfer/internal/event.h
@@ -55,9 +55,6 @@ enum class EventType {
// transfer context's completion handler; it is for out-of-band termination.
kSendStatusChunk,
- // Updates one of the transfer thread's RPC streams.
- kSetTransferStream,
-
// Manages the list of transfer handlers for a transfer service.
kAddTransferHandler,
kRemoveTransferHandler,
@@ -126,7 +123,6 @@ struct Event {
ChunkEvent chunk;
EndTransferEvent end_transfer;
SendStatusChunkEvent send_status_chunk;
- TransferStream set_transfer_stream;
Handler* add_transfer_handler;
Handler* remove_transfer_handler;
};
diff --git a/pw_transfer/public/pw_transfer/transfer_thread.h b/pw_transfer/public/pw_transfer/transfer_thread.h
index 6663c7d73..5904c9506 100644
--- a/pw_transfer/public/pw_transfer/transfer_thread.h
+++ b/pw_transfer/public/pw_transfer/transfer_thread.h
@@ -114,20 +114,23 @@ class TransferThread : public thread::ThreadCore {
EventType::kServerEndTransfer, session_id, status, send_status_chunk);
}
+ // Move the read/write streams on this thread instead of the transfer thread.
+ // RPC call objects are synchronized by pw_rpc, so this move will be atomic
+ // with respect to the transfer thread.
void SetClientReadStream(rpc::RawClientReaderWriter& read_stream) {
- SetClientStream(TransferStream::kClientRead, read_stream);
+ client_read_stream_ = std::move(read_stream);
}
void SetClientWriteStream(rpc::RawClientReaderWriter& write_stream) {
- SetClientStream(TransferStream::kClientWrite, write_stream);
+ client_write_stream_ = std::move(write_stream);
}
void SetServerReadStream(rpc::RawServerReaderWriter& read_stream) {
- SetServerStream(TransferStream::kServerRead, read_stream);
+ server_read_stream_ = std::move(read_stream);
}
void SetServerWriteStream(rpc::RawServerReaderWriter& write_stream) {
- SetServerStream(TransferStream::kServerWrite, write_stream);
+ server_write_stream_ = std::move(write_stream);
}
void AddTransferHandler(Handler& handler) {
@@ -260,9 +263,6 @@ class TransferThread : public thread::ThreadCore {
Status status,
bool send_status_chunk);
- void SetClientStream(TransferStream type, rpc::RawClientReaderWriter& stream);
- void SetServerStream(TransferStream type, rpc::RawServerReaderWriter& stream);
-
void TransferHandlerEvent(EventType type, Handler& handler);
void HandleEvent(const Event& event);
@@ -275,8 +275,6 @@ class TransferThread : public thread::ThreadCore {
Event next_event_;
Function<void(Status)> staged_on_completion_;
- rpc::RawClientReaderWriter staged_client_stream_;
- rpc::RawServerReaderWriter staged_server_stream_;
rpc::RawClientReaderWriter client_read_stream_;
rpc::RawClientReaderWriter client_write_stream_;
diff --git a/pw_transfer/transfer_thread.cc b/pw_transfer/transfer_thread.cc
index 4f115ab9a..c0d03efa1 100644
--- a/pw_transfer/transfer_thread.cc
+++ b/pw_transfer/transfer_thread.cc
@@ -221,30 +221,6 @@ void TransferThread::EndTransfer(EventType type,
event_notification_.release();
}
-void TransferThread::SetClientStream(TransferStream type,
- rpc::RawClientReaderWriter& stream) {
- // Block until the last event has been processed.
- next_event_ownership_.acquire();
-
- next_event_.type = EventType::kSetTransferStream;
- next_event_.set_transfer_stream = type;
- staged_client_stream_ = std::move(stream);
-
- event_notification_.release();
-}
-
-void TransferThread::SetServerStream(TransferStream type,
- rpc::RawServerReaderWriter& stream) {
- // Block until the last event has been processed.
- next_event_ownership_.acquire();
-
- next_event_.type = EventType::kSetTransferStream;
- next_event_.set_transfer_stream = type;
- staged_server_stream_ = std::move(stream);
-
- event_notification_.release();
-}
-
void TransferThread::TransferHandlerEvent(EventType type, Handler& handler) {
// Block until the last event has been processed.
next_event_ownership_.acquire();
@@ -299,26 +275,6 @@ void TransferThread::HandleEvent(const internal::Event& event) {
SendStatusChunk(event.send_status_chunk);
break;
- case EventType::kSetTransferStream:
- switch (event.set_transfer_stream) {
- case TransferStream::kClientRead:
- client_read_stream_ = std::move(staged_client_stream_);
- break;
-
- case TransferStream::kClientWrite:
- client_write_stream_ = std::move(staged_client_stream_);
- break;
-
- case TransferStream::kServerRead:
- server_read_stream_ = std::move(staged_server_stream_);
- break;
-
- case TransferStream::kServerWrite:
- server_write_stream_ = std::move(staged_server_stream_);
- break;
- }
- return;
-
case EventType::kAddTransferHandler:
handlers_.push_front(*event.add_transfer_handler);
return;
@@ -423,7 +379,6 @@ Context* TransferThread::FindContextForEvent(
event.end_transfer.session_id);
case EventType::kSendStatusChunk:
- case EventType::kSetTransferStream:
case EventType::kAddTransferHandler:
case EventType::kRemoveTransferHandler:
case EventType::kTerminate: