aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWeston Carvalho <westoncarvalho@google.com>2023-10-20 17:05:01 -0500
committerWeston Carvalho <westoncarvalho@google.com>2024-01-10 15:43:43 -0600
commitd200cb7fff2fbfb5862f0b71d1ddcfff2cad3827 (patch)
treecbd5a46e8f2bd77bc7696f0abfdf96d036c83dbd
parent16717ea84f1c386c3127a4023245e3d30117970b (diff)
downloadstorage-d200cb7fff2fbfb5862f0b71d1ddcfff2cad3827.tar.gz
Split transaction end
Bug: 307999674 Change-Id: Ice1e2fe455110e8af1db6c0f99f63c6112edcef7
-rw-r--r--client_tipc.c252
1 files changed, 180 insertions, 72 deletions
diff --git a/client_tipc.c b/client_tipc.c
index c82554b..6564c15 100644
--- a/client_tipc.c
+++ b/client_tipc.c
@@ -290,15 +290,102 @@ struct storage_op_flags {
bool update_checkpoint;
};
+static enum storage_err assert_checkpoint_flag_valid(
+ struct storage_client_session* session,
+ struct storage_op_flags flags,
+ const char* func_name) {
+ if (flags.update_checkpoint) {
+ if (!(flags.complete_transaction)) {
+ SS_ERR("%s: STORAGE_MSG_FLAG_TRANSACT_CHECKPOINT cannot "
+ "be used without STORAGE_MSG_FLAG_TRANSACT_COMPLETE\n",
+ func_name);
+ return STORAGE_ERR_NOT_VALID;
+ }
+
+ if (!checkpoint_update_allowed(&session->tr)) {
+ SS_ERR("%s: Checkpoint requested but not currently allowed.\n",
+ func_name);
+ return STORAGE_ERR_NOT_ALLOWED;
+ }
+ }
+ return STORAGE_NO_ERROR;
+}
+
+static enum storage_err ensure_active_transaction(
+ struct storage_client_session* session,
+ struct storage_op_flags flags) {
+ if (session->tr.failed) {
+ if (flags.complete_transaction) {
+ /* last command in current transaction:
+ * reset failed state and return error
+ */
+ session->tr.failed = false;
+ }
+ return STORAGE_ERR_TRANSACT;
+ }
+
+ if (!transaction_is_active(&session->tr)) {
+ /* previous transaction complete */
+ transaction_activate(&session->tr);
+ }
+ return STORAGE_NO_ERROR;
+}
+
+/* abort transaction and clear sticky transaction error */
+static enum storage_err storage_transaction_end(
+ struct storage_client_session* session,
+ struct storage_op_flags flags) {
+ enum storage_err result =
+ assert_checkpoint_flag_valid(session, flags, __func__);
+ if (result != STORAGE_NO_ERROR) {
+ return result;
+ }
+
+ if (flags.complete_transaction) {
+ /* Allow checkpoint creation without an active transaction */
+ if (flags.update_checkpoint && !transaction_is_active(&session->tr)) {
+ transaction_activate(&session->tr);
+ }
+ /* try to complete current transaction */
+ if (transaction_is_active(&session->tr)) {
+ transaction_complete_etc(&session->tr, flags.update_checkpoint);
+ }
+ if (session->tr.failed) {
+ SS_ERR("%s: failed to complete transaction\n", __func__);
+ /* clear transaction failed state */
+ session->tr.failed = false;
+ return STORAGE_ERR_TRANSACT;
+ }
+ return STORAGE_NO_ERROR;
+ }
+
+ /* discard current transaction */
+ if (transaction_is_active(&session->tr)) {
+ transaction_fail(&session->tr);
+ }
+ /* clear transaction failed state */
+ session->tr.failed = false;
+ return STORAGE_NO_ERROR;
+}
+
static enum storage_err storage_file_delete(
struct storage_client_session* session,
const char* fname,
size_t fname_len,
struct storage_op_flags flags) {
enum file_op_result delete_res;
- enum storage_err result;
char path_buf[FS_PATH_MAX];
+ enum storage_err result =
+ assert_checkpoint_flag_valid(session, flags, __func__);
+ if (result != STORAGE_NO_ERROR) {
+ return result;
+ }
+ result = ensure_active_transaction(session, flags);
+ if (result != STORAGE_NO_ERROR) {
+ return result;
+ }
+
/* make sure filename is legal */
if (!is_valid_name(fname, fname_len)) {
SS_ERR("%s: invalid filename\n", __func__);
@@ -370,11 +457,20 @@ static enum storage_err storage_file_move(
struct storage_op_flags flags) {
enum file_op_result open_result;
enum file_op_result move_result;
- enum storage_err result;
struct file_handle* file = NULL;
char path_buf[FS_PATH_MAX];
struct file_handle tmp_file;
+ enum storage_err result =
+ assert_checkpoint_flag_valid(session, flags, __func__);
+ if (result != STORAGE_NO_ERROR) {
+ return result;
+ }
+ result = ensure_active_transaction(session, flags);
+ if (result != STORAGE_NO_ERROR) {
+ return result;
+ }
+
/* make sure filenames are legal */
if (!is_valid_name(src_name, src_name_len)) {
SS_ERR("%s: invalid src filename\n", __func__);
@@ -452,11 +548,20 @@ static enum storage_err storage_file_open(
struct storage_op_flags flags,
uint32_t* handle) {
enum file_op_result open_result;
- enum storage_err result;
struct file_handle* file = NULL;
uint32_t f_handle;
char path_buf[FS_PATH_MAX];
+ enum storage_err result =
+ assert_checkpoint_flag_valid(session, flags, __func__);
+ if (result != STORAGE_NO_ERROR) {
+ return result;
+ }
+ result = ensure_active_transaction(session, flags);
+ if (result != STORAGE_NO_ERROR) {
+ return result;
+ }
+
/* make sure filename is legal */
if (!is_valid_name(fname, fname_len)) {
SS_ERR("%s: invalid filename\n", __func__);
@@ -521,6 +626,16 @@ static enum storage_err storage_file_close(
struct storage_op_flags flags) {
struct file_handle* file;
+ enum storage_err result =
+ assert_checkpoint_flag_valid(session, flags, __func__);
+ if (result != STORAGE_NO_ERROR) {
+ return result;
+ }
+ result = ensure_active_transaction(session, flags);
+ if (result != STORAGE_NO_ERROR) {
+ return result;
+ }
+
file = get_file_handle(session, handle);
if (!file) {
return STORAGE_ERR_NOT_VALID;
@@ -545,6 +660,7 @@ static enum storage_err storage_file_read(
uint32_t handle,
uint32_t size,
uint64_t offset,
+ struct storage_op_flags flags,
uint8_t* resp,
size_t* resp_len) {
void* bufp = resp;
@@ -557,6 +673,16 @@ static enum storage_err storage_file_read(
struct obj_ref block_data_ref = OBJ_REF_INITIAL_VALUE(block_data_ref);
size_t block_offset;
+ enum storage_err result =
+ assert_checkpoint_flag_valid(session, flags, __func__);
+ if (result != STORAGE_NO_ERROR) {
+ return result;
+ }
+ result = ensure_active_transaction(session, flags);
+ if (result != STORAGE_NO_ERROR) {
+ return result;
+ }
+
file = get_file_handle(session, handle);
if (!file) {
SS_ERR("%s: invalid file handle (%" PRIx32 ")\n", __func__, handle);
@@ -660,7 +786,6 @@ static enum storage_err storage_file_write(
const uint8_t* data,
size_t data_len,
struct storage_op_flags flags) {
- enum storage_err result = STORAGE_NO_ERROR;
size_t len;
struct file_handle* file;
size_t block_size = get_file_block_size(session->tr.fs);
@@ -669,6 +794,16 @@ static enum storage_err storage_file_write(
struct obj_ref block_data_ref = OBJ_REF_INITIAL_VALUE(block_data_ref);
size_t block_offset;
+ enum storage_err result =
+ assert_checkpoint_flag_valid(session, flags, __func__);
+ if (result != STORAGE_NO_ERROR) {
+ return result;
+ }
+ result = ensure_active_transaction(session, flags);
+ if (result != STORAGE_NO_ERROR) {
+ return result;
+ }
+
file = get_file_handle(session, handle);
if (!file) {
SS_ERR("%s: invalid file handle (%" PRIx32 ")\n", __func__, handle);
@@ -808,7 +943,6 @@ static enum storage_err storage_file_list(
const char* path,
size_t path_len),
void* callback_data) {
- enum storage_err result = STORAGE_NO_ERROR;
enum file_op_result iterate_res;
const char* last_name;
char path_buf[FS_PATH_MAX];
@@ -821,6 +955,16 @@ static enum storage_err storage_file_list(
.callback_data = callback_data,
};
+ enum storage_err result =
+ assert_checkpoint_flag_valid(session, flags, __func__);
+ if (result != STORAGE_NO_ERROR) {
+ return result;
+ }
+ result = ensure_active_transaction(session, flags);
+ if (result != STORAGE_NO_ERROR) {
+ return result;
+ }
+
result =
get_path(state.prefix, sizeof(state.prefix), &session->uuid, "", 0);
if (result != STORAGE_NO_ERROR) {
@@ -879,10 +1023,21 @@ static enum storage_err storage_file_list(
static enum storage_err storage_file_get_size(
struct storage_client_session* session,
uint32_t handle,
+ struct storage_op_flags flags,
uint64_t* size) {
bool valid;
struct file_handle* file;
+ enum storage_err result =
+ assert_checkpoint_flag_valid(session, flags, __func__);
+ if (result != STORAGE_NO_ERROR) {
+ return result;
+ }
+ result = ensure_active_transaction(session, flags);
+ if (result != STORAGE_NO_ERROR) {
+ return result;
+ }
+
file = get_file_handle(session, handle);
if (!file) {
SS_ERR("%s: invalid file handle (%" PRIx32 ")\n", __func__, handle);
@@ -904,6 +1059,16 @@ static enum storage_err storage_file_set_size(
struct storage_op_flags flags) {
struct file_handle* file;
+ enum storage_err result =
+ assert_checkpoint_flag_valid(session, flags, __func__);
+ if (result != STORAGE_NO_ERROR) {
+ return result;
+ }
+ result = ensure_active_transaction(session, flags);
+ if (result != STORAGE_NO_ERROR) {
+ return result;
+ }
+
file = get_file_handle(session, handle);
if (!file) {
SS_ERR("%s: invalid file handle (%" PRIx32 ")\n", __func__, handle);
@@ -915,7 +1080,6 @@ static enum storage_err storage_file_set_size(
/* for now we only support shrinking the file */
if (new_size > file->size) {
- enum storage_err result;
result = storage_create_gap(session, file);
if (result != STORAGE_NO_ERROR) {
return result;
@@ -1090,9 +1254,9 @@ static enum storage_err storage_tipc_file_read(
uint8_t* resp = (uint8_t*)(msg + 1);
size_t resp_len = STORAGE_MAX_BUFFER_SIZE - sizeof(*msg);
- enum storage_err result =
- storage_file_read(&tipc_session->session, req->handle, req->size,
- req->offset, resp, &resp_len);
+ enum storage_err result = storage_file_read(
+ &tipc_session->session, req->handle, req->size, req->offset,
+ extract_storage_op_flags(msg->flags), resp, &resp_len);
if (result != STORAGE_NO_ERROR) {
return send_result(tipc_session, msg, result);
}
@@ -1166,8 +1330,9 @@ static enum storage_err storage_tipc_file_get_size(
}
struct storage_file_get_size_resp resp;
- enum storage_err result = storage_file_get_size(&tipc_session->session,
- req->handle, &resp.size);
+ enum storage_err result = storage_file_get_size(
+ &tipc_session->session, req->handle,
+ extract_storage_op_flags(msg->flags), &resp.size);
if (result != STORAGE_NO_ERROR) {
return send_result(tipc_session, msg, result);
}
@@ -1319,67 +1484,6 @@ static int client_handle_msg(struct ipc_channel_context* ctx,
payload_len = msg_size - sizeof(struct storage_msg);
payload = msg->payload;
- if (msg->flags & STORAGE_MSG_FLAG_TRANSACT_CHECKPOINT) {
- if (!(msg->flags & STORAGE_MSG_FLAG_TRANSACT_COMPLETE)) {
- SS_ERR("%s: STORAGE_MSG_FLAG_TRANSACT_CHECKPOINT cannot "
- "be used without STORAGE_MSG_FLAG_TRANSACT_COMPLETE\n",
- __func__);
- return send_result(tipc_session, msg, STORAGE_ERR_NOT_VALID);
- }
-
- if (!checkpoint_update_allowed(&session->tr)) {
- SS_ERR("%s: Checkpoint requested but not currently allowed.\n",
- __func__);
- return send_result(tipc_session, msg, STORAGE_ERR_NOT_ALLOWED);
- }
- }
-
- /* abort transaction and clear sticky transaction error */
- if (msg->cmd == STORAGE_END_TRANSACTION) {
- if (msg->flags & STORAGE_MSG_FLAG_TRANSACT_COMPLETE) {
- /* Allow checkpoint creation without an active transaction */
- if (msg->flags & STORAGE_MSG_FLAG_TRANSACT_CHECKPOINT &&
- !transaction_is_active(&session->tr)) {
- transaction_activate(&session->tr);
- }
- /* try to complete current transaction */
- if (transaction_is_active(&session->tr)) {
- transaction_complete_etc(
- &session->tr,
- msg->flags & STORAGE_MSG_FLAG_TRANSACT_CHECKPOINT);
- }
- if (session->tr.failed) {
- SS_ERR("%s: failed to complete transaction\n", __func__);
- /* clear transaction failed state */
- session->tr.failed = false;
- return send_result(tipc_session, msg, STORAGE_ERR_TRANSACT);
- }
- return send_result(tipc_session, msg, STORAGE_NO_ERROR);
- } else {
- /* discard current transaction */
- if (transaction_is_active(&session->tr)) {
- transaction_fail(&session->tr);
- }
- /* clear transaction failed state */
- session->tr.failed = false;
- return send_result(tipc_session, msg, STORAGE_NO_ERROR);
- }
- }
-
- if (session->tr.failed) {
- if (msg->flags & STORAGE_MSG_FLAG_TRANSACT_COMPLETE) {
- /* last command in current trunsaction: reset failed state and
- * return error */
- session->tr.failed = false;
- }
- return send_result(tipc_session, msg, STORAGE_ERR_TRANSACT);
- }
-
- if (!transaction_is_active(&session->tr)) {
- /* previous transaction complete */
- transaction_activate(&session->tr);
- }
-
switch (msg->cmd) {
case STORAGE_FILE_DELETE:
result = storage_tipc_file_delete(session, msg, payload, payload_len);
@@ -1405,6 +1509,10 @@ static int client_handle_msg(struct ipc_channel_context* ctx,
case STORAGE_FILE_SET_SIZE:
result = storage_tipc_file_set_size(session, msg, payload, payload_len);
break;
+ case STORAGE_END_TRANSACTION:
+ result = storage_transaction_end(session,
+ extract_storage_op_flags(msg->flags));
+ break;
default:
SS_ERR("%s: unsupported command 0x%" PRIx32 "\n", __func__, msg->cmd);
result = STORAGE_ERR_UNIMPLEMENTED;