aboutsummaryrefslogtreecommitdiff
path: root/pw_transfer/public/pw_transfer/internal/context.h
diff options
context:
space:
mode:
Diffstat (limited to 'pw_transfer/public/pw_transfer/internal/context.h')
-rw-r--r--pw_transfer/public/pw_transfer/internal/context.h358
1 files changed, 175 insertions, 183 deletions
diff --git a/pw_transfer/public/pw_transfer/internal/context.h b/pw_transfer/public/pw_transfer/internal/context.h
index 5039325c4..f825419dd 100644
--- a/pw_transfer/public/pw_transfer/internal/context.h
+++ b/pw_transfer/public/pw_transfer/internal/context.h
@@ -1,4 +1,4 @@
-// Copyright 2021 The Pigweed Authors
+// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
@@ -16,56 +16,59 @@
#include <cinttypes>
#include <cstddef>
#include <limits>
+#include <optional>
#include "pw_assert/assert.h"
-#include "pw_chrono/system_timer.h"
+#include "pw_chrono/system_clock.h"
#include "pw_rpc/writer.h"
#include "pw_status/status.h"
#include "pw_stream/stream.h"
-#include "pw_sync/interrupt_spin_lock.h"
-#include "pw_sync/lock_annotations.h"
#include "pw_transfer/internal/chunk.h"
-#include "pw_transfer/internal/chunk_data_buffer.h"
-#include "pw_transfer/internal/config.h"
-#include "pw_work_queue/work_queue.h"
+#include "pw_transfer/internal/event.h"
+#include "pw_transfer/rate_estimate.h"
namespace pw::transfer::internal {
+class TransferThread;
+
class TransferParameters {
public:
constexpr TransferParameters(uint32_t pending_bytes,
- uint32_t max_chunk_size_bytes)
+ uint32_t max_chunk_size_bytes,
+ uint32_t extend_window_divisor)
: pending_bytes_(pending_bytes),
- max_chunk_size_bytes_(max_chunk_size_bytes) {
+ max_chunk_size_bytes_(max_chunk_size_bytes),
+ extend_window_divisor_(extend_window_divisor) {
PW_ASSERT(pending_bytes > 0);
PW_ASSERT(max_chunk_size_bytes > 0);
+ PW_ASSERT(extend_window_divisor > 1);
}
uint32_t pending_bytes() const { return pending_bytes_; }
+ void set_pending_bytes(uint32_t pending_bytes) {
+ pending_bytes_ = pending_bytes;
+ }
+
uint32_t max_chunk_size_bytes() const { return max_chunk_size_bytes_; }
+ void set_max_chunk_size_bytes(uint32_t max_chunk_size_bytes) {
+ max_chunk_size_bytes_ = max_chunk_size_bytes;
+ }
- // The fractional position within a window at which a receive transfer should
- // extend its window size to minimize the amount of time the transmitter
- // spends blocked.
- //
- // For example, a divisor of 2 will extend the window when half of the
- // requested data has been received, a divisor of three will extend at a third
- // of the window, and so on.
- //
- // TODO(frolv): Find a good threshold for this; maybe make it configurable?
- static constexpr uint32_t kExtendWindowDivisor = 2;
- static_assert(kExtendWindowDivisor > 1);
+ uint32_t extend_window_divisor() const { return extend_window_divisor_; }
+ void set_extend_window_divisor(uint32_t extend_window_divisor) {
+ PW_DASSERT(extend_window_divisor > 1);
+ extend_window_divisor_ = extend_window_divisor;
+ }
private:
uint32_t pending_bytes_;
uint32_t max_chunk_size_bytes_;
+ uint32_t extend_window_divisor_;
};
// Information about a single transfer.
class Context {
public:
- enum Type : bool { kTransmit, kReceive };
-
Context(const Context&) = delete;
Context(Context&&) = delete;
Context& operator=(const Context&) = delete;
@@ -74,46 +77,33 @@ class Context {
constexpr uint32_t transfer_id() const { return transfer_id_; }
// True if the context has been used for a transfer (it has an ID).
- bool initialized() {
- state_lock_.lock();
- bool initialized = transfer_state_ != TransferState::kInactive;
- state_lock_.unlock();
- return initialized;
+ bool initialized() const {
+ return transfer_state_ != TransferState::kInactive;
}
// True if the transfer is active.
- bool active() {
- state_lock_.lock();
- bool active = transfer_state_ >= TransferState::kData;
- state_lock_.unlock();
- return active;
+ bool active() const { return transfer_state_ >= TransferState::kWaiting; }
+
+ std::optional<chrono::SystemClock::time_point> timeout() const {
+ return active() && next_timeout_ != kNoTimeout
+ ? std::optional(next_timeout_)
+ : std::nullopt;
}
- // Starts a new transfer from an initialized context by sending the initial
- // transfer chunk. This is generally called from within a transfer client, as
- // it is unusual for a server to initiate a transfer.
- Status InitiateTransfer(const TransferParameters& max_parameters);
+ // Returns true if the transfer's most recently set timeout has passed.
+ bool timed_out() const {
+ std::optional<chrono::SystemClock::time_point> next_timeout = timeout();
+ return next_timeout.has_value() &&
+ chrono::SystemClock::now() >= next_timeout.value();
+ }
- // Extracts data from the provided chunk into the transfer context. This is
- // intended to be the immediate part of the transfer, run directly from within
- // the RPC message handler.
- //
- // Returns true if there is any deferred work required for this chunk (i.e.
- // ProcessChunk should be called to complete the operation).
- bool ReadChunkData(ChunkDataBuffer& buffer,
- const TransferParameters& max_parameters,
- const Chunk& chunk);
-
- // Handles the chunk from the previous invocation of ReadChunkData(). This
- // operation is intended to be deferred, running from a different context than
- // the RPC callback in which the chunk was received.
- void ProcessChunk(ChunkDataBuffer& buffer,
- const TransferParameters& max_parameters);
+ // Processes an event for this transfer.
+ void HandleEvent(const Event& event);
protected:
- using CompletionFunction = Status (*)(Context&, Status);
+ ~Context() = default;
- Context(CompletionFunction on_completion)
+ constexpr Context()
: transfer_id_(0),
flags_(0),
transfer_state_(TransferState::kInactive),
@@ -126,183 +116,178 @@ class Context {
window_end_offset_(0),
pending_bytes_(0),
max_chunk_size_bytes_(std::numeric_limits<uint32_t>::max()),
+ max_parameters_(nullptr),
+ thread_(nullptr),
last_chunk_offset_(0),
- timer_([this](chrono::SystemClock::time_point) { this->OnTimeout(); }),
chunk_timeout_(chrono::SystemClock::duration::zero()),
- work_queue_(nullptr),
- encoding_buffer_(nullptr),
- on_completion_(on_completion) {}
+ interchunk_delay_(chrono::SystemClock::for_at_least(
+ std::chrono::microseconds(kDefaultChunkDelayMicroseconds))),
+ next_timeout_(kNoTimeout) {}
+
+ constexpr TransferType type() const {
+ return static_cast<TransferType>(flags_ & kFlagsType);
+ }
+ private:
enum class TransferState : uint8_t {
// This ServerContext has never been used for a transfer. It is available
// for use for a transfer.
kInactive,
- // A transfer completed and the final status chunk was sent. The Context is
- // available for use for a new transfer. A receive transfer uses this state
- // to allow a transmitter to retry its last chunk if the final status chunk
+ // A transfer completed and the final status chunk was sent. The Context
+ // is
+ // available for use for a new transfer. A receive transfer uses this
+ // state
+ // to allow a transmitter to retry its last chunk if the final status
+ // chunk
// was dropped.
kCompleted,
- // Sending or receiving data for an active transfer.
- kData,
+ // Waiting for the other end to send a chunk.
+ kWaiting,
+ // Transmitting a window of data to a receiver.
+ kTransmitting,
// Recovering after one or more chunks was dropped in an active transfer.
kRecovery,
- // Hit a timeout and waiting for the timeout handler to run.
- kTimedOut,
};
- constexpr Type type() const { return static_cast<Type>(flags_ & kFlagsType); }
+ enum class TransmitAction {
+ // Start of a new transfer.
+ kBegin,
+ // Extend the current window length.
+ kExtend,
+ // Retransmit from a specified offset.
+ kRetransmit,
+ };
+
+ void set_transfer_state(TransferState state) { transfer_state_ = state; }
- void set_transfer_state(TransferState state) {
- state_lock_.lock();
- transfer_state_ = state;
- state_lock_.unlock();
+ // The transfer ID as unsigned instead of uint32_t so it can be used with %u.
+ unsigned id_for_log() const {
+ static_assert(sizeof(unsigned) >= sizeof(transfer_id_));
+ return static_cast<unsigned>(transfer_id_);
}
- // Begins a new transmit transfer from this context.
- // Precondition: context is not active.
- void InitializeForTransmit(
- uint32_t transfer_id,
- work_queue::WorkQueue& work_queue,
- EncodingBuffer& encoding_buffer,
- rpc::Writer& rpc_writer,
- stream::Reader& reader,
- chrono::SystemClock::duration chunk_timeout = cfg::kDefaultChunkTimeout,
- uint8_t max_retries = cfg::kDefaultMaxRetries) {
- Initialize(kTransmit,
- transfer_id,
- work_queue,
- encoding_buffer,
- rpc_writer,
- reader,
- chunk_timeout,
- max_retries);
+ stream::Reader& reader() {
+ PW_DASSERT(active() && type() == TransferType::kTransmit);
+ return static_cast<stream::Reader&>(*stream_);
}
- // Begins a new receive transfer from this context.
- // Precondition: context is not active.
- void InitializeForReceive(
- uint32_t transfer_id,
- work_queue::WorkQueue& work_queue,
- EncodingBuffer& encoding_buffer,
- rpc::Writer& rpc_writer,
- stream::Writer& writer,
- chrono::SystemClock::duration chunk_timeout = cfg::kDefaultChunkTimeout,
- uint8_t max_retries = cfg::kDefaultMaxRetries) {
- Initialize(kReceive,
- transfer_id,
- work_queue,
- encoding_buffer,
- rpc_writer,
- writer,
- chunk_timeout,
- max_retries);
+ stream::Writer& writer() {
+ PW_DASSERT(active() && type() == TransferType::kReceive);
+ return static_cast<stream::Writer&>(*stream_);
}
- // Calculates the maximum size of actual data that can be sent within a single
- // client write transfer chunk, accounting for the overhead of the transfer
- // protocol and RPC system.
+ // Calculates the maximum size of actual data that can be sent within a
+ // single client write transfer chunk, accounting for the overhead of the
+ // transfer protocol and RPC system.
//
// Note: This function relies on RPC protocol internals. This is generally a
- // *bad* idea, but is necessary here due to limitations of the RPC system and
- // its asymmetric ingress and egress paths.
+ // *bad* idea, but is necessary here due to limitations of the RPC system
+ // and its asymmetric ingress and egress paths.
//
// TODO(frolv): This should be investigated further and perhaps addressed
// within the RPC system, at the least through a helper function.
uint32_t MaxWriteChunkSize(uint32_t max_chunk_size_bytes,
uint32_t channel_id) const;
- private:
- enum TransmitAction : bool { kExtend, kRetransmit };
+ // Initializes a new transfer using new_transfer. The provided stream
+ // argument is used in place of the NewTransferEvent's stream. Only
+ // initializes state; no packets are sent.
+ //
+ // Precondition: context is not active.
+ void Initialize(const NewTransferEvent& new_transfer);
- void Initialize(Type type,
- uint32_t transfer_id,
- work_queue::WorkQueue& work_queue,
- EncodingBuffer& encoding_buffer,
- rpc::Writer& rpc_writer,
- stream::Stream& stream,
- chrono::SystemClock::duration chunk_timeout,
- uint8_t max_retries);
+ // Starts a new transfer from an initialized context by sending the initial
+ // transfer chunk. This is only used by transfer clients, as the transfer
+ // service cannot initiate transfers.
+ //
+ // Calls Finish(), which calls the on_completion callback, if initiating a
+ // transfer fails.
+ void InitiateTransferAsClient();
- stream::Reader& reader() {
- PW_DASSERT(active() && type() == kTransmit);
- return static_cast<stream::Reader&>(*stream_);
- }
+ // Starts a new transfer on the server after receiving a request from a
+ // client.
+ void StartTransferAsServer(const NewTransferEvent& new_transfer);
- stream::Writer& writer() {
- PW_DASSERT(active() && type() == kReceive);
- return static_cast<stream::Writer&>(*stream_);
- }
+ // Does final cleanup specific to the server or client. Returns whether the
+ // cleanup succeeded. An error in cleanup indicates that the transfer
+ // failed.
+ virtual Status FinalCleanup(Status status) = 0;
- // Sends the first chunk in a transmit transfer.
- Status SendInitialTransmitChunk();
-
- // Functions which extract relevant data from a chunk into the context.
- bool ReadTransmitChunk(const TransferParameters& max_parameters,
- const Chunk& chunk);
- bool ReadReceiveChunk(ChunkDataBuffer& buffer,
- const TransferParameters& max_parameters,
- const Chunk& chunk);
-
- // Functions which handle the last received chunk.
- void ProcessTransmitChunk();
- void ProcessReceiveChunk(ChunkDataBuffer& buffer,
- const TransferParameters& max_parameters);
-
- // In a transmit transfer, sends the next data chunk from the local stream.
- // Returns status indicating what to do next:
- //
- // OK - continue
- // OUT_OF_RANGE - done for now
- // other errors - abort transfer with this error
- //
- Status SendNextDataChunk();
+ // Processes a chunk in either a transfer or receive transfer.
+ void HandleChunkEvent(const ChunkEvent& event);
+
+ // Processes a chunk in a transmit transfer.
+ void HandleTransmitChunk(const Chunk& chunk);
+
+ // Processes a transfer parameters update in a transmit transfer.
+ void HandleTransferParametersUpdate(const Chunk& chunk);
- // In a receive transfer, processes the fields from a data chunk and stages
- // the data for a deferred write. Returns true if there is a deferred
- // operation to complete.
- bool HandleDataChunk(ChunkDataBuffer& buffer,
- const TransferParameters& max_parameters,
- const Chunk& chunk);
+ // Sends the next chunk in a transmit transfer, if any.
+ void TransmitNextChunk(bool retransmit_requested);
- // In a receive transfer, sends a parameters chunk telling the transmitter how
- // much data they can send.
- Status SendTransferParameters(TransmitAction action);
+ // Processes a chunk in a receive transfer.
+ void HandleReceiveChunk(const Chunk& chunk);
+
+ // Processes a data chunk in a received while in the kWaiting state.
+ void HandleReceivedData(const Chunk& chunk);
+
+ // Sends the first chunk in a transmit transfer.
+ void SendInitialTransmitChunk();
+
+ // In a receive transfer, sends a parameters chunk telling the transmitter
+ // how much data they can send.
+ void SendTransferParameters(TransmitAction action);
// Updates the current receive transfer parameters from the provided object,
// then sends them.
- Status UpdateAndSendTransferParameters(
- const TransferParameters& max_parameters, TransmitAction action);
+ void UpdateAndSendTransferParameters(TransmitAction action);
- void SendStatusChunk(Status status);
- void FinishAndSendStatus(Status status);
+ // Sends a final status chunk of a completed transfer without updating the
+ // the transfer. Sends status_, which MUST have been set by a previous
+ // Finish() call.
+ void SendFinalStatusChunk();
- void CancelTimer() {
- timer_.Cancel();
- retries_ = 0;
- }
+ // Marks the transfer as completed and calls FinalCleanup(). Sets status_ to
+ // the final status for this transfer. The transfer MUST be active when this
+ // is called.
+ void Finish(Status status);
+
+ // Encodes the specified chunk to the encode buffer and sends it with the
+ // rpc_writer_. Calls Finish() with an error if the operation fails.
+ void EncodeAndSendChunk(const Chunk& chunk);
- // Timeout function invoked from the timer context. This may occur in an
- // interrupt, so no real work can be done. Instead, sets state to timed out
- // and adds a job to run the timeout handler.
- void OnTimeout();
+ void SetTimeout(chrono::SystemClock::duration timeout);
+ void ClearTimeout() { next_timeout_ = kNoTimeout; }
- // The acutal timeout handler, invoked from within the work queue.
+ // Called when the transfer's timeout expires.
void HandleTimeout();
+ // Resends the last packet or aborts the transfer if the maximum retries has
+ // been exceeded.
+ void Retry();
+
static constexpr uint8_t kFlagsType = 1 << 0;
static constexpr uint8_t kFlagsDataSent = 1 << 1;
- // TODO(frolv): Make this value configurable per transfer.
static constexpr uint32_t kDefaultChunkDelayMicroseconds = 2000;
+ // How long to wait for the other side to ACK a final transfer chunk before
+ // resetting the context so that it can be reused. During this time, the
+ // status chunk will be re-sent for every non-ACK chunk received,
+ // continually notifying the other end that the transfer is over.
+ static constexpr chrono::SystemClock::duration kFinalChunkAckTimeout =
+ std::chrono::milliseconds(5000);
+
+ static constexpr chrono::SystemClock::time_point kNoTimeout =
+ chrono::SystemClock::time_point(chrono::SystemClock::duration(0));
+
uint32_t transfer_id_;
uint8_t flags_;
- TransferState transfer_state_ PW_GUARDED_BY(state_lock_);
+ TransferState transfer_state_;
uint8_t retries_;
uint8_t max_retries_;
- sync::InterruptSpinLock state_lock_;
-
+ // The stream from which to read or to which to write data.
stream::Stream* stream_;
rpc::Writer* rpc_writer_;
@@ -313,18 +298,25 @@ class Context {
uint32_t pending_bytes_;
uint32_t max_chunk_size_bytes_;
+ const TransferParameters* max_parameters_;
+ TransferThread* thread_;
+
union {
Status status_; // Used when state is kCompleted.
- uint32_t last_chunk_offset_; // Used in states kData and kRecovery.
+ uint32_t last_chunk_offset_; // Used in states kWaiting and kRecovery.
};
- // Timer used to handle timeouts waiting for chunks.
- chrono::SystemTimer timer_;
+ // How long to wait for a chunk from the other end.
chrono::SystemClock::duration chunk_timeout_;
- work_queue::WorkQueue* work_queue_;
- EncodingBuffer* encoding_buffer_;
- CompletionFunction on_completion_;
+ // How long to delay between transmitting subsequent data chunks within a
+ // window.
+ chrono::SystemClock::duration interchunk_delay_;
+
+ // Timestamp at which the transfer will next time out, or kNoTimeout.
+ chrono::SystemClock::time_point next_timeout_;
+
+ RateEstimate transfer_rate_;
};
} // namespace pw::transfer::internal