diff options
Diffstat (limited to 'pw_transfer/public/pw_transfer/internal/context.h')
-rw-r--r-- | pw_transfer/public/pw_transfer/internal/context.h | 358 |
1 files changed, 175 insertions, 183 deletions
diff --git a/pw_transfer/public/pw_transfer/internal/context.h b/pw_transfer/public/pw_transfer/internal/context.h index 5039325c4..f825419dd 100644 --- a/pw_transfer/public/pw_transfer/internal/context.h +++ b/pw_transfer/public/pw_transfer/internal/context.h @@ -1,4 +1,4 @@ -// Copyright 2021 The Pigweed Authors +// Copyright 2022 The Pigweed Authors // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy of @@ -16,56 +16,59 @@ #include <cinttypes> #include <cstddef> #include <limits> +#include <optional> #include "pw_assert/assert.h" -#include "pw_chrono/system_timer.h" +#include "pw_chrono/system_clock.h" #include "pw_rpc/writer.h" #include "pw_status/status.h" #include "pw_stream/stream.h" -#include "pw_sync/interrupt_spin_lock.h" -#include "pw_sync/lock_annotations.h" #include "pw_transfer/internal/chunk.h" -#include "pw_transfer/internal/chunk_data_buffer.h" -#include "pw_transfer/internal/config.h" -#include "pw_work_queue/work_queue.h" +#include "pw_transfer/internal/event.h" +#include "pw_transfer/rate_estimate.h" namespace pw::transfer::internal { +class TransferThread; + class TransferParameters { public: constexpr TransferParameters(uint32_t pending_bytes, - uint32_t max_chunk_size_bytes) + uint32_t max_chunk_size_bytes, + uint32_t extend_window_divisor) : pending_bytes_(pending_bytes), - max_chunk_size_bytes_(max_chunk_size_bytes) { + max_chunk_size_bytes_(max_chunk_size_bytes), + extend_window_divisor_(extend_window_divisor) { PW_ASSERT(pending_bytes > 0); PW_ASSERT(max_chunk_size_bytes > 0); + PW_ASSERT(extend_window_divisor > 1); } uint32_t pending_bytes() const { return pending_bytes_; } + void set_pending_bytes(uint32_t pending_bytes) { + pending_bytes_ = pending_bytes; + } + uint32_t max_chunk_size_bytes() const { return max_chunk_size_bytes_; } + void set_max_chunk_size_bytes(uint32_t max_chunk_size_bytes) { + max_chunk_size_bytes_ = max_chunk_size_bytes; + } - // The fractional position within a window at which a receive transfer should - // extend its window size to minimize the amount of time the transmitter - // spends blocked. - // - // For example, a divisor of 2 will extend the window when half of the - // requested data has been received, a divisor of three will extend at a third - // of the window, and so on. - // - // TODO(frolv): Find a good threshold for this; maybe make it configurable? - static constexpr uint32_t kExtendWindowDivisor = 2; - static_assert(kExtendWindowDivisor > 1); + uint32_t extend_window_divisor() const { return extend_window_divisor_; } + void set_extend_window_divisor(uint32_t extend_window_divisor) { + PW_DASSERT(extend_window_divisor > 1); + extend_window_divisor_ = extend_window_divisor; + } private: uint32_t pending_bytes_; uint32_t max_chunk_size_bytes_; + uint32_t extend_window_divisor_; }; // Information about a single transfer. class Context { public: - enum Type : bool { kTransmit, kReceive }; - Context(const Context&) = delete; Context(Context&&) = delete; Context& operator=(const Context&) = delete; @@ -74,46 +77,33 @@ class Context { constexpr uint32_t transfer_id() const { return transfer_id_; } // True if the context has been used for a transfer (it has an ID). - bool initialized() { - state_lock_.lock(); - bool initialized = transfer_state_ != TransferState::kInactive; - state_lock_.unlock(); - return initialized; + bool initialized() const { + return transfer_state_ != TransferState::kInactive; } // True if the transfer is active. - bool active() { - state_lock_.lock(); - bool active = transfer_state_ >= TransferState::kData; - state_lock_.unlock(); - return active; + bool active() const { return transfer_state_ >= TransferState::kWaiting; } + + std::optional<chrono::SystemClock::time_point> timeout() const { + return active() && next_timeout_ != kNoTimeout + ? std::optional(next_timeout_) + : std::nullopt; } - // Starts a new transfer from an initialized context by sending the initial - // transfer chunk. This is generally called from within a transfer client, as - // it is unusual for a server to initiate a transfer. - Status InitiateTransfer(const TransferParameters& max_parameters); + // Returns true if the transfer's most recently set timeout has passed. + bool timed_out() const { + std::optional<chrono::SystemClock::time_point> next_timeout = timeout(); + return next_timeout.has_value() && + chrono::SystemClock::now() >= next_timeout.value(); + } - // Extracts data from the provided chunk into the transfer context. This is - // intended to be the immediate part of the transfer, run directly from within - // the RPC message handler. - // - // Returns true if there is any deferred work required for this chunk (i.e. - // ProcessChunk should be called to complete the operation). - bool ReadChunkData(ChunkDataBuffer& buffer, - const TransferParameters& max_parameters, - const Chunk& chunk); - - // Handles the chunk from the previous invocation of ReadChunkData(). This - // operation is intended to be deferred, running from a different context than - // the RPC callback in which the chunk was received. - void ProcessChunk(ChunkDataBuffer& buffer, - const TransferParameters& max_parameters); + // Processes an event for this transfer. + void HandleEvent(const Event& event); protected: - using CompletionFunction = Status (*)(Context&, Status); + ~Context() = default; - Context(CompletionFunction on_completion) + constexpr Context() : transfer_id_(0), flags_(0), transfer_state_(TransferState::kInactive), @@ -126,183 +116,178 @@ class Context { window_end_offset_(0), pending_bytes_(0), max_chunk_size_bytes_(std::numeric_limits<uint32_t>::max()), + max_parameters_(nullptr), + thread_(nullptr), last_chunk_offset_(0), - timer_([this](chrono::SystemClock::time_point) { this->OnTimeout(); }), chunk_timeout_(chrono::SystemClock::duration::zero()), - work_queue_(nullptr), - encoding_buffer_(nullptr), - on_completion_(on_completion) {} + interchunk_delay_(chrono::SystemClock::for_at_least( + std::chrono::microseconds(kDefaultChunkDelayMicroseconds))), + next_timeout_(kNoTimeout) {} + + constexpr TransferType type() const { + return static_cast<TransferType>(flags_ & kFlagsType); + } + private: enum class TransferState : uint8_t { // This ServerContext has never been used for a transfer. It is available // for use for a transfer. kInactive, - // A transfer completed and the final status chunk was sent. The Context is - // available for use for a new transfer. A receive transfer uses this state - // to allow a transmitter to retry its last chunk if the final status chunk + // A transfer completed and the final status chunk was sent. The Context + // is + // available for use for a new transfer. A receive transfer uses this + // state + // to allow a transmitter to retry its last chunk if the final status + // chunk // was dropped. kCompleted, - // Sending or receiving data for an active transfer. - kData, + // Waiting for the other end to send a chunk. + kWaiting, + // Transmitting a window of data to a receiver. + kTransmitting, // Recovering after one or more chunks was dropped in an active transfer. kRecovery, - // Hit a timeout and waiting for the timeout handler to run. - kTimedOut, }; - constexpr Type type() const { return static_cast<Type>(flags_ & kFlagsType); } + enum class TransmitAction { + // Start of a new transfer. + kBegin, + // Extend the current window length. + kExtend, + // Retransmit from a specified offset. + kRetransmit, + }; + + void set_transfer_state(TransferState state) { transfer_state_ = state; } - void set_transfer_state(TransferState state) { - state_lock_.lock(); - transfer_state_ = state; - state_lock_.unlock(); + // The transfer ID as unsigned instead of uint32_t so it can be used with %u. + unsigned id_for_log() const { + static_assert(sizeof(unsigned) >= sizeof(transfer_id_)); + return static_cast<unsigned>(transfer_id_); } - // Begins a new transmit transfer from this context. - // Precondition: context is not active. - void InitializeForTransmit( - uint32_t transfer_id, - work_queue::WorkQueue& work_queue, - EncodingBuffer& encoding_buffer, - rpc::Writer& rpc_writer, - stream::Reader& reader, - chrono::SystemClock::duration chunk_timeout = cfg::kDefaultChunkTimeout, - uint8_t max_retries = cfg::kDefaultMaxRetries) { - Initialize(kTransmit, - transfer_id, - work_queue, - encoding_buffer, - rpc_writer, - reader, - chunk_timeout, - max_retries); + stream::Reader& reader() { + PW_DASSERT(active() && type() == TransferType::kTransmit); + return static_cast<stream::Reader&>(*stream_); } - // Begins a new receive transfer from this context. - // Precondition: context is not active. - void InitializeForReceive( - uint32_t transfer_id, - work_queue::WorkQueue& work_queue, - EncodingBuffer& encoding_buffer, - rpc::Writer& rpc_writer, - stream::Writer& writer, - chrono::SystemClock::duration chunk_timeout = cfg::kDefaultChunkTimeout, - uint8_t max_retries = cfg::kDefaultMaxRetries) { - Initialize(kReceive, - transfer_id, - work_queue, - encoding_buffer, - rpc_writer, - writer, - chunk_timeout, - max_retries); + stream::Writer& writer() { + PW_DASSERT(active() && type() == TransferType::kReceive); + return static_cast<stream::Writer&>(*stream_); } - // Calculates the maximum size of actual data that can be sent within a single - // client write transfer chunk, accounting for the overhead of the transfer - // protocol and RPC system. + // Calculates the maximum size of actual data that can be sent within a + // single client write transfer chunk, accounting for the overhead of the + // transfer protocol and RPC system. // // Note: This function relies on RPC protocol internals. This is generally a - // *bad* idea, but is necessary here due to limitations of the RPC system and - // its asymmetric ingress and egress paths. + // *bad* idea, but is necessary here due to limitations of the RPC system + // and its asymmetric ingress and egress paths. // // TODO(frolv): This should be investigated further and perhaps addressed // within the RPC system, at the least through a helper function. uint32_t MaxWriteChunkSize(uint32_t max_chunk_size_bytes, uint32_t channel_id) const; - private: - enum TransmitAction : bool { kExtend, kRetransmit }; + // Initializes a new transfer using new_transfer. The provided stream + // argument is used in place of the NewTransferEvent's stream. Only + // initializes state; no packets are sent. + // + // Precondition: context is not active. + void Initialize(const NewTransferEvent& new_transfer); - void Initialize(Type type, - uint32_t transfer_id, - work_queue::WorkQueue& work_queue, - EncodingBuffer& encoding_buffer, - rpc::Writer& rpc_writer, - stream::Stream& stream, - chrono::SystemClock::duration chunk_timeout, - uint8_t max_retries); + // Starts a new transfer from an initialized context by sending the initial + // transfer chunk. This is only used by transfer clients, as the transfer + // service cannot initiate transfers. + // + // Calls Finish(), which calls the on_completion callback, if initiating a + // transfer fails. + void InitiateTransferAsClient(); - stream::Reader& reader() { - PW_DASSERT(active() && type() == kTransmit); - return static_cast<stream::Reader&>(*stream_); - } + // Starts a new transfer on the server after receiving a request from a + // client. + void StartTransferAsServer(const NewTransferEvent& new_transfer); - stream::Writer& writer() { - PW_DASSERT(active() && type() == kReceive); - return static_cast<stream::Writer&>(*stream_); - } + // Does final cleanup specific to the server or client. Returns whether the + // cleanup succeeded. An error in cleanup indicates that the transfer + // failed. + virtual Status FinalCleanup(Status status) = 0; - // Sends the first chunk in a transmit transfer. - Status SendInitialTransmitChunk(); - - // Functions which extract relevant data from a chunk into the context. - bool ReadTransmitChunk(const TransferParameters& max_parameters, - const Chunk& chunk); - bool ReadReceiveChunk(ChunkDataBuffer& buffer, - const TransferParameters& max_parameters, - const Chunk& chunk); - - // Functions which handle the last received chunk. - void ProcessTransmitChunk(); - void ProcessReceiveChunk(ChunkDataBuffer& buffer, - const TransferParameters& max_parameters); - - // In a transmit transfer, sends the next data chunk from the local stream. - // Returns status indicating what to do next: - // - // OK - continue - // OUT_OF_RANGE - done for now - // other errors - abort transfer with this error - // - Status SendNextDataChunk(); + // Processes a chunk in either a transfer or receive transfer. + void HandleChunkEvent(const ChunkEvent& event); + + // Processes a chunk in a transmit transfer. + void HandleTransmitChunk(const Chunk& chunk); + + // Processes a transfer parameters update in a transmit transfer. + void HandleTransferParametersUpdate(const Chunk& chunk); - // In a receive transfer, processes the fields from a data chunk and stages - // the data for a deferred write. Returns true if there is a deferred - // operation to complete. - bool HandleDataChunk(ChunkDataBuffer& buffer, - const TransferParameters& max_parameters, - const Chunk& chunk); + // Sends the next chunk in a transmit transfer, if any. + void TransmitNextChunk(bool retransmit_requested); - // In a receive transfer, sends a parameters chunk telling the transmitter how - // much data they can send. - Status SendTransferParameters(TransmitAction action); + // Processes a chunk in a receive transfer. + void HandleReceiveChunk(const Chunk& chunk); + + // Processes a data chunk in a received while in the kWaiting state. + void HandleReceivedData(const Chunk& chunk); + + // Sends the first chunk in a transmit transfer. + void SendInitialTransmitChunk(); + + // In a receive transfer, sends a parameters chunk telling the transmitter + // how much data they can send. + void SendTransferParameters(TransmitAction action); // Updates the current receive transfer parameters from the provided object, // then sends them. - Status UpdateAndSendTransferParameters( - const TransferParameters& max_parameters, TransmitAction action); + void UpdateAndSendTransferParameters(TransmitAction action); - void SendStatusChunk(Status status); - void FinishAndSendStatus(Status status); + // Sends a final status chunk of a completed transfer without updating the + // the transfer. Sends status_, which MUST have been set by a previous + // Finish() call. + void SendFinalStatusChunk(); - void CancelTimer() { - timer_.Cancel(); - retries_ = 0; - } + // Marks the transfer as completed and calls FinalCleanup(). Sets status_ to + // the final status for this transfer. The transfer MUST be active when this + // is called. + void Finish(Status status); + + // Encodes the specified chunk to the encode buffer and sends it with the + // rpc_writer_. Calls Finish() with an error if the operation fails. + void EncodeAndSendChunk(const Chunk& chunk); - // Timeout function invoked from the timer context. This may occur in an - // interrupt, so no real work can be done. Instead, sets state to timed out - // and adds a job to run the timeout handler. - void OnTimeout(); + void SetTimeout(chrono::SystemClock::duration timeout); + void ClearTimeout() { next_timeout_ = kNoTimeout; } - // The acutal timeout handler, invoked from within the work queue. + // Called when the transfer's timeout expires. void HandleTimeout(); + // Resends the last packet or aborts the transfer if the maximum retries has + // been exceeded. + void Retry(); + static constexpr uint8_t kFlagsType = 1 << 0; static constexpr uint8_t kFlagsDataSent = 1 << 1; - // TODO(frolv): Make this value configurable per transfer. static constexpr uint32_t kDefaultChunkDelayMicroseconds = 2000; + // How long to wait for the other side to ACK a final transfer chunk before + // resetting the context so that it can be reused. During this time, the + // status chunk will be re-sent for every non-ACK chunk received, + // continually notifying the other end that the transfer is over. + static constexpr chrono::SystemClock::duration kFinalChunkAckTimeout = + std::chrono::milliseconds(5000); + + static constexpr chrono::SystemClock::time_point kNoTimeout = + chrono::SystemClock::time_point(chrono::SystemClock::duration(0)); + uint32_t transfer_id_; uint8_t flags_; - TransferState transfer_state_ PW_GUARDED_BY(state_lock_); + TransferState transfer_state_; uint8_t retries_; uint8_t max_retries_; - sync::InterruptSpinLock state_lock_; - + // The stream from which to read or to which to write data. stream::Stream* stream_; rpc::Writer* rpc_writer_; @@ -313,18 +298,25 @@ class Context { uint32_t pending_bytes_; uint32_t max_chunk_size_bytes_; + const TransferParameters* max_parameters_; + TransferThread* thread_; + union { Status status_; // Used when state is kCompleted. - uint32_t last_chunk_offset_; // Used in states kData and kRecovery. + uint32_t last_chunk_offset_; // Used in states kWaiting and kRecovery. }; - // Timer used to handle timeouts waiting for chunks. - chrono::SystemTimer timer_; + // How long to wait for a chunk from the other end. chrono::SystemClock::duration chunk_timeout_; - work_queue::WorkQueue* work_queue_; - EncodingBuffer* encoding_buffer_; - CompletionFunction on_completion_; + // How long to delay between transmitting subsequent data chunks within a + // window. + chrono::SystemClock::duration interchunk_delay_; + + // Timestamp at which the transfer will next time out, or kNoTimeout. + chrono::SystemClock::time_point next_timeout_; + + RateEstimate transfer_rate_; }; } // namespace pw::transfer::internal |