From 6decd5a7e7770090f1d86e0b3900e8b94234f3af Mon Sep 17 00:00:00 2001 From: Chunho Lee Date: Wed, 6 Oct 2021 19:20:47 -0700 Subject: ss-mqtt: Skip SUBSCRIBE when MQTT session is resumed --- lib/roles/mqtt/mqtt.c | 3 + lib/roles/mqtt/private-lib-roles-mqtt.h | 1 + lib/secure-streams/protocols/ss-mqtt.c | 208 +++++++++++++++++++------------- 3 files changed, 130 insertions(+), 82 deletions(-) (limited to 'lib') diff --git a/lib/roles/mqtt/mqtt.c b/lib/roles/mqtt/mqtt.c index baad7f73..2a73d6f5 100644 --- a/lib/roles/mqtt/mqtt.c +++ b/lib/roles/mqtt/mqtt.c @@ -2272,6 +2272,9 @@ lws_mqtt_client_send_subcribe(struct lws *wsi, lws_mqtt_subscribe_param_t *sub) return 1; } + if (wsi->mqtt->inside_resume_session) + return 0; + if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) != lws_ptr_diff(p, start)) return 1; diff --git a/lib/roles/mqtt/private-lib-roles-mqtt.h b/lib/roles/mqtt/private-lib-roles-mqtt.h index a1f5d23c..779e2971 100644 --- a/lib/roles/mqtt/private-lib-roles-mqtt.h +++ b/lib/roles/mqtt/private-lib-roles-mqtt.h @@ -371,6 +371,7 @@ struct _lws_mqtt_related { uint8_t inside_payload:1; uint8_t inside_subscribe:1; uint8_t inside_unsubscribe:1; + uint8_t inside_resume_session:1; uint8_t send_puback:1; uint8_t send_pubrel:1; uint8_t send_pubrec:1; diff --git a/lib/secure-streams/protocols/ss-mqtt.c b/lib/secure-streams/protocols/ss-mqtt.c index 2c3c5222..1cd7d09b 100644 --- a/lib/secure-streams/protocols/ss-mqtt.c +++ b/lib/secure-streams/protocols/ss-mqtt.c @@ -46,6 +46,100 @@ secstream_mqtt_cleanup(lws_ss_handle_t *h) } } +static int +secstream_mqtt_subscribe(struct lws *wsi) +{ + size_t used_in, used_out, topic_limit; + lws_strexp_t exp; + char* expbuf; + lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); + + if (!h || !h->policy) + return -1; + + if (h->policy->u.mqtt.aws_iot) + topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN; + else + topic_limit = LWS_MQTT_MAX_TOPICLEN; + + if (!h->policy->u.mqtt.subscribe || wsi->mqtt->done_subscribe) + return 0; + + lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, NULL, + topic_limit); + /* + * Expand with no output first to calculate the size of + * expanded string then, allocate new buffer and expand + * again with the buffer + */ + if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe, + strlen(h->policy->u.mqtt.subscribe), &used_in, + &used_out) != LSTRX_DONE) { + lwsl_err( + "%s, failed to expand MQTT subscribe" + " topic with no output\n", + __func__); + return 1; + } + + expbuf = lws_malloc(used_out + 1, __func__); + if (!expbuf) { + lwsl_err( + "%s, failed to allocate MQTT subscribe" + "topic", + __func__); + return 1; + } + + lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, expbuf, + used_out + 1); + + if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe, + strlen(h->policy->u.mqtt.subscribe), &used_in, + &used_out) != LSTRX_DONE) { + lwsl_err("%s, failed to expand MQTT subscribe topic\n", + __func__); + lws_free(expbuf); + return 1; + } + lwsl_notice("%s, expbuf - %s\n", __func__, expbuf); + h->u.mqtt.sub_top.name = expbuf; + + /* + * The policy says to subscribe to something, and we + * haven't done it yet. Do it using the pre-prepared + * string-substituted version of the policy string. + */ + + lwsl_notice("%s: subscribing %s\n", __func__, + h->u.mqtt.sub_top.name); + + h->u.mqtt.sub_top.qos = h->policy->u.mqtt.qos; + memset(&h->u.mqtt.sub_info, 0, sizeof(h->u.mqtt.sub_info)); + h->u.mqtt.sub_info.num_topics = 1; + h->u.mqtt.sub_info.topic = &h->u.mqtt.sub_top; + h->u.mqtt.sub_info.topic = + lws_malloc(sizeof(lws_mqtt_topic_elem_t), __func__); + h->u.mqtt.sub_info.topic[0].name = lws_strdup(expbuf); + h->u.mqtt.sub_info.topic[0].qos = h->policy->u.mqtt.qos; + + if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) { + lwsl_notice("%s: unable to subscribe", __func__); + lws_free(expbuf); + h->u.mqtt.sub_top.name = NULL; + return -1; + } + lws_free(expbuf); + h->u.mqtt.sub_top.name = NULL; + + /* Expect a SUBACK */ + if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) { + lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__); + return -1; + } + return 0; +} + static int secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) @@ -120,13 +214,32 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, h->wsi = wsi; h->retry = 0; h->seqstate = SSSEQ_CONNECTED; - /* - * If a subscribe is pending on the stream, then make - * sure the SUBSCRIBE is done before signaling the - * user application. - */ - if (h->policy->u.mqtt.subscribe && - !wsi->mqtt->done_subscribe) { + + if (!h->policy->u.mqtt.subscribe || + !h->policy->u.mqtt.subscribe[0]) { + /* + * If subscribe is empty in the policy, then, + * skip sending SUBSCRIBE and signal the user + * application. + */ + wsi->mqtt->done_subscribe = 1; + } else if (!h->policy->u.mqtt.clean_start && + wsi->mqtt->session_resumed) { + wsi->mqtt->inside_resume_session = 1; + /* + * If the previous session is resumed and Server has + * stored session, then, do not subscribe. + */ + if (!secstream_mqtt_subscribe(wsi)) + wsi->mqtt->done_subscribe = 1; + wsi->mqtt->inside_resume_session = 0; + } else if (h->policy->u.mqtt.subscribe && + !wsi->mqtt->done_subscribe) { + /* + * If a subscribe is pending on the stream, then make + * sure the SUBSCRIBE is done before signaling the + * user application. + */ lws_callback_on_writable(wsi); break; } @@ -208,79 +321,8 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, else topic_limit = LWS_MQTT_MAX_TOPICLEN; - if (h->policy->u.mqtt.subscribe && - !wsi->mqtt->done_subscribe) { - lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, - NULL, topic_limit); - /* - * Expand with no output first to calculate the size of - * expanded string then, allocate new buffer and expand - * again with the buffer - */ - if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe, - strlen(h->policy->u.mqtt.subscribe), - &used_in, &used_out) != LSTRX_DONE) { - lwsl_err("%s, failed to expand MQTT subscribe" - " topic with no output\n", __func__); - return 1; - } - - expbuf = lws_malloc(used_out + 1, __func__); - if (!expbuf) { - lwsl_err("%s, failed to allocate MQTT subscribe" - "topic", __func__); - return 1; - } - - lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, - expbuf, used_out + 1); - - if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe, - strlen(h->policy->u.mqtt.subscribe), - &used_in, &used_out) != LSTRX_DONE) { - lwsl_err("%s, failed to expand MQTT subscribe topic\n", - __func__); - lws_free(expbuf); - return 1; - } - lwsl_notice("%s, expbuf - %s\n", __func__, expbuf); - h->u.mqtt.sub_top.name = expbuf; - - /* - * The policy says to subscribe to something, and we - * haven't done it yet. Do it using the pre-prepared - * string-substituted version of the policy string. - */ - - lwsl_notice("%s: subscribing %s\n", __func__, - h->u.mqtt.sub_top.name); - - h->u.mqtt.sub_top.qos = h->policy->u.mqtt.qos; - memset(&h->u.mqtt.sub_info, 0, sizeof(h->u.mqtt.sub_info)); - h->u.mqtt.sub_info.num_topics = 1; - h->u.mqtt.sub_info.topic = &h->u.mqtt.sub_top; - h->u.mqtt.sub_info.topic = lws_malloc(sizeof(lws_mqtt_topic_elem_t), - __func__); - h->u.mqtt.sub_info.topic[0].name = lws_strdup(expbuf); - h->u.mqtt.sub_info.topic[0].qos = h->policy->u.mqtt.qos; - - if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) { - lwsl_notice("%s: unable to subscribe", __func__); - lws_free(expbuf); - h->u.mqtt.sub_top.name = NULL; - return -1; - } - lws_free(expbuf); - h->u.mqtt.sub_top.name = NULL; - /* Expect a SUBACK */ - if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) { - lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__); - return -1; - } - - return 0; - } - + if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe) + return secstream_mqtt_subscribe(wsi); buflen = sizeof(buf) - LWS_PRE; r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE, @@ -308,8 +350,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); memset(&mqpp, 0, sizeof(mqpp)); - /* this is the string-substituted h->policy->u.mqtt.topic */ - mqpp.topic = (char *)h->u.mqtt.topic_qos.name; + lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL, topic_limit); @@ -513,6 +554,9 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len, */ for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) { + if (!sources[n]) + continue; + lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, NULL, (size_t)-1); if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]), -- cgit v1.2.3