aboutsummaryrefslogtreecommitdiff
path: root/pw_transfer/transfer.cc
blob: 50bd2612cdbd87436cb96a48911f39cb62c528cd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

#define PW_LOG_MODULE_NAME "TRN"

#include "pw_transfer/transfer.h"

#include "pw_assert/check.h"
#include "pw_log/log.h"
#include "pw_status/try.h"
#include "pw_transfer/internal/chunk.h"

namespace pw::transfer {

void TransferService::HandleChunk(ConstByteSpan message,
                                  internal::TransferType type) {
  // All incoming chunks in a client read transfer are transfer parameter
  // updates, except for the final chunk, which is an acknowledgement of
  // completion.
  //
  // Transfer parameters may contain the following fields:
  //
  //   - transfer_id (required)
  //   - pending_bytes (required)
  //   - offset (required)
  //   - max_chunk_size_bytes
  //   - min_delay_microseconds (not yet supported)
  //
  internal::Chunk chunk;

  if (Status status = internal::DecodeChunk(message, chunk); !status.ok()) {
    // No special handling required here. The client will retransmit the chunk
    // when no response is received.
    PW_LOG_ERROR("Failed to decode incoming transfer chunk");
    return;
  }

  internal::ServerContextPool& pool =
      type == internal::kRead ? read_transfers_ : write_transfers_;
  rpc::RawServerReaderWriter& stream =
      type == internal::kRead ? client_.read_stream() : client_.write_stream();

  Result<internal::ServerContext*> result =
      chunk.IsInitialChunk() ? pool.StartTransfer(chunk.transfer_id,
                                                  work_queue_,
                                                  encoding_buffer_,
                                                  stream,
                                                  chunk_timeout_,
                                                  max_retries_)
                             : pool.GetPendingTransfer(chunk.transfer_id);
  if (!result.ok()) {
    client_.SendStatusChunk(type, chunk.transfer_id, result.status());
    PW_LOG_ERROR("Error handling chunk for transfer %u: %d",
                 static_cast<unsigned>(chunk.transfer_id),
                 result.status().code());
    return;
  }

  internal::ServerContext& transfer = *result.value();

  if (chunk.status.has_value()) {
    // Transfer has been terminated (successfully or not).
    const Status status = chunk.status.value();
    if (!status.ok()) {
      PW_LOG_ERROR("Receiver terminated transfer %u with status %d",
                   static_cast<unsigned>(chunk.transfer_id),
                   static_cast<int>(status.code()));
    }
    if (transfer.active()) {
      transfer.Finish(status).IgnoreError();
    }
    return;
  }

  if (transfer.ReadChunkData(
          chunk_data_buffer_, client_.max_parameters(), chunk)) {
    // Call this synchronously for now. Later, this will be deferred within a
    // work queue.
    transfer.ProcessChunk(chunk_data_buffer_, client_.max_parameters());
  }
}

}  // namespace pw::transfer