aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorChunho Lee <chunhole@amazon.com>2021-10-06 19:20:47 -0700
committerAndy Green <andy@warmcat.com>2021-10-07 11:22:08 +0100
commit6decd5a7e7770090f1d86e0b3900e8b94234f3af (patch)
tree42e5300a3749e93047a16fe03c3d870b8275e65d /lib
parent3b90c89bab1387db9689e3fcdc80f3f495ea1ada (diff)
downloadlibwebsockets-6decd5a7e7770090f1d86e0b3900e8b94234f3af.tar.gz
ss-mqtt: Skip SUBSCRIBE when MQTT session is resumed
Diffstat (limited to 'lib')
-rw-r--r--lib/roles/mqtt/mqtt.c3
-rw-r--r--lib/roles/mqtt/private-lib-roles-mqtt.h1
-rw-r--r--lib/secure-streams/protocols/ss-mqtt.c208
3 files changed, 130 insertions, 82 deletions
diff --git a/lib/roles/mqtt/mqtt.c b/lib/roles/mqtt/mqtt.c
index baad7f73..2a73d6f5 100644
--- a/lib/roles/mqtt/mqtt.c
+++ b/lib/roles/mqtt/mqtt.c
@@ -2272,6 +2272,9 @@ lws_mqtt_client_send_subcribe(struct lws *wsi, lws_mqtt_subscribe_param_t *sub)
return 1;
}
+ if (wsi->mqtt->inside_resume_session)
+ return 0;
+
if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
lws_ptr_diff(p, start))
return 1;
diff --git a/lib/roles/mqtt/private-lib-roles-mqtt.h b/lib/roles/mqtt/private-lib-roles-mqtt.h
index a1f5d23c..779e2971 100644
--- a/lib/roles/mqtt/private-lib-roles-mqtt.h
+++ b/lib/roles/mqtt/private-lib-roles-mqtt.h
@@ -371,6 +371,7 @@ struct _lws_mqtt_related {
uint8_t inside_payload:1;
uint8_t inside_subscribe:1;
uint8_t inside_unsubscribe:1;
+ uint8_t inside_resume_session:1;
uint8_t send_puback:1;
uint8_t send_pubrel:1;
uint8_t send_pubrec:1;
diff --git a/lib/secure-streams/protocols/ss-mqtt.c b/lib/secure-streams/protocols/ss-mqtt.c
index 2c3c5222..1cd7d09b 100644
--- a/lib/secure-streams/protocols/ss-mqtt.c
+++ b/lib/secure-streams/protocols/ss-mqtt.c
@@ -47,6 +47,100 @@ secstream_mqtt_cleanup(lws_ss_handle_t *h)
}
static int
+secstream_mqtt_subscribe(struct lws *wsi)
+{
+ size_t used_in, used_out, topic_limit;
+ lws_strexp_t exp;
+ char* expbuf;
+ lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
+
+ if (!h || !h->policy)
+ return -1;
+
+ if (h->policy->u.mqtt.aws_iot)
+ topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
+ else
+ topic_limit = LWS_MQTT_MAX_TOPICLEN;
+
+ if (!h->policy->u.mqtt.subscribe || wsi->mqtt->done_subscribe)
+ return 0;
+
+ lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, NULL,
+ topic_limit);
+ /*
+ * Expand with no output first to calculate the size of
+ * expanded string then, allocate new buffer and expand
+ * again with the buffer
+ */
+ if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
+ strlen(h->policy->u.mqtt.subscribe), &used_in,
+ &used_out) != LSTRX_DONE) {
+ lwsl_err(
+ "%s, failed to expand MQTT subscribe"
+ " topic with no output\n",
+ __func__);
+ return 1;
+ }
+
+ expbuf = lws_malloc(used_out + 1, __func__);
+ if (!expbuf) {
+ lwsl_err(
+ "%s, failed to allocate MQTT subscribe"
+ "topic",
+ __func__);
+ return 1;
+ }
+
+ lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, expbuf,
+ used_out + 1);
+
+ if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
+ strlen(h->policy->u.mqtt.subscribe), &used_in,
+ &used_out) != LSTRX_DONE) {
+ lwsl_err("%s, failed to expand MQTT subscribe topic\n",
+ __func__);
+ lws_free(expbuf);
+ return 1;
+ }
+ lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
+ h->u.mqtt.sub_top.name = expbuf;
+
+ /*
+ * The policy says to subscribe to something, and we
+ * haven't done it yet. Do it using the pre-prepared
+ * string-substituted version of the policy string.
+ */
+
+ lwsl_notice("%s: subscribing %s\n", __func__,
+ h->u.mqtt.sub_top.name);
+
+ h->u.mqtt.sub_top.qos = h->policy->u.mqtt.qos;
+ memset(&h->u.mqtt.sub_info, 0, sizeof(h->u.mqtt.sub_info));
+ h->u.mqtt.sub_info.num_topics = 1;
+ h->u.mqtt.sub_info.topic = &h->u.mqtt.sub_top;
+ h->u.mqtt.sub_info.topic =
+ lws_malloc(sizeof(lws_mqtt_topic_elem_t), __func__);
+ h->u.mqtt.sub_info.topic[0].name = lws_strdup(expbuf);
+ h->u.mqtt.sub_info.topic[0].qos = h->policy->u.mqtt.qos;
+
+ if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) {
+ lwsl_notice("%s: unable to subscribe", __func__);
+ lws_free(expbuf);
+ h->u.mqtt.sub_top.name = NULL;
+ return -1;
+ }
+ lws_free(expbuf);
+ h->u.mqtt.sub_top.name = NULL;
+
+ /* Expect a SUBACK */
+ if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
+ lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__);
+ return -1;
+ }
+ return 0;
+}
+
+static int
secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
void *in, size_t len)
{
@@ -120,13 +214,32 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
h->wsi = wsi;
h->retry = 0;
h->seqstate = SSSEQ_CONNECTED;
- /*
- * If a subscribe is pending on the stream, then make
- * sure the SUBSCRIBE is done before signaling the
- * user application.
- */
- if (h->policy->u.mqtt.subscribe &&
- !wsi->mqtt->done_subscribe) {
+
+ if (!h->policy->u.mqtt.subscribe ||
+ !h->policy->u.mqtt.subscribe[0]) {
+ /*
+ * If subscribe is empty in the policy, then,
+ * skip sending SUBSCRIBE and signal the user
+ * application.
+ */
+ wsi->mqtt->done_subscribe = 1;
+ } else if (!h->policy->u.mqtt.clean_start &&
+ wsi->mqtt->session_resumed) {
+ wsi->mqtt->inside_resume_session = 1;
+ /*
+ * If the previous session is resumed and Server has
+ * stored session, then, do not subscribe.
+ */
+ if (!secstream_mqtt_subscribe(wsi))
+ wsi->mqtt->done_subscribe = 1;
+ wsi->mqtt->inside_resume_session = 0;
+ } else if (h->policy->u.mqtt.subscribe &&
+ !wsi->mqtt->done_subscribe) {
+ /*
+ * If a subscribe is pending on the stream, then make
+ * sure the SUBSCRIBE is done before signaling the
+ * user application.
+ */
lws_callback_on_writable(wsi);
break;
}
@@ -208,79 +321,8 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
else
topic_limit = LWS_MQTT_MAX_TOPICLEN;
- if (h->policy->u.mqtt.subscribe &&
- !wsi->mqtt->done_subscribe) {
- lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
- NULL, topic_limit);
- /*
- * Expand with no output first to calculate the size of
- * expanded string then, allocate new buffer and expand
- * again with the buffer
- */
- if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
- strlen(h->policy->u.mqtt.subscribe),
- &used_in, &used_out) != LSTRX_DONE) {
- lwsl_err("%s, failed to expand MQTT subscribe"
- " topic with no output\n", __func__);
- return 1;
- }
-
- expbuf = lws_malloc(used_out + 1, __func__);
- if (!expbuf) {
- lwsl_err("%s, failed to allocate MQTT subscribe"
- "topic", __func__);
- return 1;
- }
-
- lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
- expbuf, used_out + 1);
-
- if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
- strlen(h->policy->u.mqtt.subscribe),
- &used_in, &used_out) != LSTRX_DONE) {
- lwsl_err("%s, failed to expand MQTT subscribe topic\n",
- __func__);
- lws_free(expbuf);
- return 1;
- }
- lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
- h->u.mqtt.sub_top.name = expbuf;
-
- /*
- * The policy says to subscribe to something, and we
- * haven't done it yet. Do it using the pre-prepared
- * string-substituted version of the policy string.
- */
-
- lwsl_notice("%s: subscribing %s\n", __func__,
- h->u.mqtt.sub_top.name);
-
- h->u.mqtt.sub_top.qos = h->policy->u.mqtt.qos;
- memset(&h->u.mqtt.sub_info, 0, sizeof(h->u.mqtt.sub_info));
- h->u.mqtt.sub_info.num_topics = 1;
- h->u.mqtt.sub_info.topic = &h->u.mqtt.sub_top;
- h->u.mqtt.sub_info.topic = lws_malloc(sizeof(lws_mqtt_topic_elem_t),
- __func__);
- h->u.mqtt.sub_info.topic[0].name = lws_strdup(expbuf);
- h->u.mqtt.sub_info.topic[0].qos = h->policy->u.mqtt.qos;
-
- if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) {
- lwsl_notice("%s: unable to subscribe", __func__);
- lws_free(expbuf);
- h->u.mqtt.sub_top.name = NULL;
- return -1;
- }
- lws_free(expbuf);
- h->u.mqtt.sub_top.name = NULL;
- /* Expect a SUBACK */
- if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
- lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__);
- return -1;
- }
-
- return 0;
- }
-
+ if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe)
+ return secstream_mqtt_subscribe(wsi);
buflen = sizeof(buf) - LWS_PRE;
r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE,
@@ -308,8 +350,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
memset(&mqpp, 0, sizeof(mqpp));
- /* this is the string-substituted h->policy->u.mqtt.topic */
- mqpp.topic = (char *)h->u.mqtt.topic_qos.name;
+
lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL,
topic_limit);
@@ -513,6 +554,9 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
*/
for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) {
+ if (!sources[n])
+ continue;
+
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
NULL, (size_t)-1);
if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]),