aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChunho Lee <chunhole@amazon.com>2021-10-06 20:09:40 -0700
committerAndy Green <andy@warmcat.com>2021-10-07 11:22:21 +0100
commitbf5744ab0742d820b4f5ce01d4da75f1fd3430de (patch)
treecb637b5a58febf57c32ca2b40206036d814dd36c
parent6decd5a7e7770090f1d86e0b3900e8b94234f3af (diff)
downloadlibwebsockets-bf5744ab0742d820b4f5ce01d4da75f1fd3430de.tar.gz
ss-mqtt: Add support for Birth message
This provides Birth message on SS policy. The Birth message is a message published just after the MQTT connection has been established.
-rw-r--r--include/libwebsockets/lws-mqtt.h7
-rw-r--r--include/libwebsockets/lws-secure-streams-policy.h5
-rw-r--r--lib/roles/mqtt/mqtt.c17
-rw-r--r--lib/roles/mqtt/private-lib-roles-mqtt.h2
-rw-r--r--lib/secure-streams/policy-json.c23
-rw-r--r--lib/secure-streams/protocols/ss-mqtt.c196
6 files changed, 164 insertions, 86 deletions
diff --git a/include/libwebsockets/lws-mqtt.h b/include/libwebsockets/lws-mqtt.h
index 71193e66..eb9dcbb2 100644
--- a/include/libwebsockets/lws-mqtt.h
+++ b/include/libwebsockets/lws-mqtt.h
@@ -81,6 +81,13 @@ typedef struct lws_mqtt_client_connect_param_s {
uint8_t retain;
} will_param; /* MQTT LWT
parameters */
+ struct {
+ const char *topic;
+ const char *message;
+ lws_mqtt_qos_levels_t qos;
+ uint8_t retain;
+ } birth_param; /* MQTT Birth
+ parameters */
const char *username;
const char *password;
uint8_t aws_iot;
diff --git a/include/libwebsockets/lws-secure-streams-policy.h b/include/libwebsockets/lws-secure-streams-policy.h
index f84edec9..863140d7 100644
--- a/include/libwebsockets/lws-secure-streams-policy.h
+++ b/include/libwebsockets/lws-secure-streams-policy.h
@@ -312,11 +312,16 @@ typedef struct lws_ss_policy {
const char *will_topic;
const char *will_message;
+ const char *birth_topic;
+ const char *birth_message;
+
uint16_t keep_alive;
uint8_t qos;
uint8_t clean_start;
uint8_t will_qos;
uint8_t will_retain;
+ uint8_t birth_qos;
+ uint8_t birth_retain;
uint8_t aws_iot;
} mqtt;
diff --git a/lib/roles/mqtt/mqtt.c b/lib/roles/mqtt/mqtt.c
index 2a73d6f5..b6613436 100644
--- a/lib/roles/mqtt/mqtt.c
+++ b/lib/roles/mqtt/mqtt.c
@@ -2038,17 +2038,12 @@ lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
return 1;
}
}
- /*
- * A non-empty Payload is expected and a chunk
- * is present
- */
- if (pub->payload_len && len) {
- p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
- memcpy(p, buf, len);
- if (lws_mqtt_str_advance(&mqtt_vh_payload, (int)len))
- return 1;
- p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
- }
+
+ p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
+ memcpy(p, buf, len);
+ if (lws_mqtt_str_advance(&mqtt_vh_payload, (int)len))
+ return 1;
+ p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
if (!is_complete)
nwsi->mqtt->inside_payload = wsi->mqtt->inside_payload = 1;
diff --git a/lib/roles/mqtt/private-lib-roles-mqtt.h b/lib/roles/mqtt/private-lib-roles-mqtt.h
index 779e2971..d89f3718 100644
--- a/lib/roles/mqtt/private-lib-roles-mqtt.h
+++ b/lib/roles/mqtt/private-lib-roles-mqtt.h
@@ -371,6 +371,7 @@ struct _lws_mqtt_related {
uint8_t inside_payload:1;
uint8_t inside_subscribe:1;
uint8_t inside_unsubscribe:1;
+ uint8_t inside_birth:1;
uint8_t inside_resume_session:1;
uint8_t send_puback:1;
uint8_t send_pubrel:1;
@@ -380,6 +381,7 @@ struct _lws_mqtt_related {
uint8_t unacked_pubrel:1;
uint8_t done_subscribe:1;
+ uint8_t done_birth:1;
};
/*
diff --git a/lib/secure-streams/policy-json.c b/lib/secure-streams/policy-json.c
index c49879e1..3427ae6d 100644
--- a/lib/secure-streams/policy-json.c
+++ b/lib/secure-streams/policy-json.c
@@ -117,6 +117,10 @@ static const char * const lejp_tokens_policy[] = {
"s[].*.mqtt_will_message",
"s[].*.mqtt_will_qos",
"s[].*.mqtt_will_retain",
+ "s[].*.mqtt_birth_topic",
+ "s[].*.mqtt_birth_message",
+ "s[].*.mqtt_birth_qos",
+ "s[].*.mqtt_birth_retain",
"s[].*.aws_iot",
"s[].*.swake_validity",
"s[].*.use_auth",
@@ -220,6 +224,10 @@ typedef enum {
LSSPPT_MQTT_WILL_MESSAGE,
LSSPPT_MQTT_WILL_QOS,
LSSPPT_MQTT_WILL_RETAIN,
+ LSSPPT_MQTT_BIRTH_TOPIC,
+ LSSPPT_MQTT_BIRTH_MESSAGE,
+ LSSPPT_MQTT_BIRTH_QOS,
+ LSSPPT_MQTT_BIRTH_RETAIN,
LSSPPT_MQTT_AWS_IOT,
LSSPPT_SWAKE_VALIDITY,
LSSPPT_USE_AUTH,
@@ -1035,6 +1043,21 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason)
a->curr[LTY_POLICY].p->u.mqtt.will_retain =
reason == LEJPCB_VAL_TRUE;
break;
+ case LSSPPT_MQTT_BIRTH_TOPIC:
+ pp = (char **)&a->curr[LTY_POLICY].p->u.mqtt.birth_topic;
+ goto string2;
+
+ case LSSPPT_MQTT_BIRTH_MESSAGE:
+ pp = (char **)&a->curr[LTY_POLICY].p->u.mqtt.birth_message;
+ goto string2;
+
+ case LSSPPT_MQTT_BIRTH_QOS:
+ a->curr[LTY_POLICY].p->u.mqtt.birth_qos = (uint8_t)atoi(ctx->buf);
+ break;
+ case LSSPPT_MQTT_BIRTH_RETAIN:
+ a->curr[LTY_POLICY].p->u.mqtt.birth_retain =
+ reason == LEJPCB_VAL_TRUE;
+ break;
case LSSPPT_MQTT_AWS_IOT:
if (reason == LEJPCB_VAL_TRUE)
a->curr[LTY_POLICY].p->u.mqtt.aws_iot =
diff --git a/lib/secure-streams/protocols/ss-mqtt.c b/lib/secure-streams/protocols/ss-mqtt.c
index 1cd7d09b..f13eea4a 100644
--- a/lib/secure-streams/protocols/ss-mqtt.c
+++ b/lib/secure-streams/protocols/ss-mqtt.c
@@ -141,14 +141,84 @@ secstream_mqtt_subscribe(struct lws *wsi)
}
static int
+secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen,
+ const char* topic,
+ lws_mqtt_qos_levels_t qos, int f)
+{
+ lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
+ size_t used_in, used_out, topic_limit;
+ lws_strexp_t exp;
+ char *expbuf;
+ lws_mqtt_publish_param_t mqpp;
+
+ if (h->policy->u.mqtt.aws_iot)
+ topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
+ else
+ topic_limit = LWS_MQTT_MAX_TOPICLEN;
+
+ memset(&mqpp, 0, sizeof(mqpp));
+
+ lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL,
+ topic_limit);
+
+ if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in,
+ &used_out) != LSTRX_DONE) {
+ lwsl_err("%s, failed to expand MQTT publish"
+ " topic with no output\n", __func__);
+ return 1;
+ }
+ expbuf = lws_malloc(used_out + 1, __func__);
+ if (!expbuf) {
+ lwsl_err("%s, failed to allocate MQTT publish topic",
+ __func__);
+ return 1;
+ }
+
+ lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf,
+ used_out + 1);
+
+ if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in,
+ &used_out) != LSTRX_DONE) {
+ lws_free(expbuf);
+ return 1;
+ }
+ lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
+ mqpp.topic = (char *)expbuf;
+
+ mqpp.topic_len = (uint16_t)strlen(mqpp.topic);
+ mqpp.packet_id = (uint16_t)(h->txord - 1);
+ mqpp.payload = buf;
+ if (h->writeable_len)
+ mqpp.payload_len = (uint32_t)h->writeable_len;
+ else
+ mqpp.payload_len = (uint32_t)buflen;
+
+ lwsl_notice("%s: payload len %d\n", __func__,
+ (int)mqpp.payload_len);
+
+ mqpp.qos = h->policy->u.mqtt.qos;
+
+ if (lws_mqtt_client_send_publish(wsi, &mqpp,
+ (const char *)buf,
+ (uint32_t)buflen,
+ f & LWSSS_FLAG_EOM)) {
+ lwsl_notice("%s: failed to publish\n", __func__);
+ lws_free(expbuf);
+ return -1;
+ }
+ lws_free(expbuf);
+ return 0;
+}
+
+static int
secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
void *in, size_t len)
{
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
- lws_mqtt_publish_param_t mqpp, *pmqpp;
+ lws_mqtt_publish_param_t *pmqpp;
uint8_t buf[LWS_PRE + 1400];
lws_ss_state_return_t r;
- size_t buflen;
+ size_t buflen = sizeof(buf) - LWS_PRE;
int f = 0;
switch (reason) {
@@ -288,8 +358,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
lws_sul_cancel(&h->sul);
r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
if (r != LWSSSSRET_OK)
- return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r,
- wsi, &h);
+ return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
}
wsi->mqtt->done_subscribe = 1;
lws_callback_on_writable(wsi);
@@ -297,6 +366,14 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
case LWS_CALLBACK_MQTT_ACK:
lws_sul_cancel(&h->sul_timeout);
+ if (wsi->mqtt->inside_birth) {
+ /*
+ * Skip LWSSSCS_QOS_ACK_REMOTE for birth topic.
+ */
+ wsi->mqtt->inside_birth = 0;
+ wsi->mqtt->done_birth = 1;
+ break;
+ }
r = lws_ss_event_helper(h, LWSSSCS_QOS_ACK_REMOTE);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
@@ -304,10 +381,6 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
{
- size_t used_in, used_out, topic_limit;
- lws_strexp_t exp;
- char *expbuf;
-
if (!h || !h->info.tx)
return 0;
lwsl_notice("%s: %s: WRITEABLE\n", __func__, lws_ss_tag(h));
@@ -316,17 +389,29 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
lwsl_warn("%s: seqstate %d\n", __func__, h->seqstate);
break;
}
- if (h->policy->u.mqtt.aws_iot)
- topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
- else
- topic_limit = LWS_MQTT_MAX_TOPICLEN;
if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe)
return secstream_mqtt_subscribe(wsi);
- buflen = sizeof(buf) - LWS_PRE;
+ if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic) {
+ lws_strexp_t exp;
+ size_t used_in, used_out = 0;
+ if (h->policy->u.mqtt.birth_message) {
+ lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata,
+ (char *)(buf + LWS_PRE), buflen);
+ if (lws_strexp_expand(&exp, h->policy->u.mqtt.birth_message,
+ strlen(h->policy->u.mqtt.birth_message),
+ &used_in, &used_out) != LSTRX_DONE) {
+ return 1;
+ }
+ }
+ wsi->mqtt->inside_birth = 1;
+ return secstream_mqtt_publish(wsi, buf + LWS_PRE,
+ used_out, h->policy->u.mqtt.birth_topic,
+ h->policy->u.mqtt.birth_qos, LWSSS_FLAG_EOM);
+ }
r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE,
- &buflen, &f);
+ &buflen, &f);
if (r == LWSSSSRET_TX_DONT_SEND)
return 0;
@@ -349,62 +434,9 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
if (r < 0)
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
- memset(&mqpp, 0, sizeof(mqpp));
-
- lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL,
- topic_limit);
-
- if (lws_strexp_expand(&exp, h->policy->u.mqtt.topic,
- strlen(h->policy->u.mqtt.topic),
- &used_in, &used_out) != LSTRX_DONE) {
- lwsl_err("%s, failed to expand MQTT publish"
- " topic with no output\n", __func__);
- return 1;
- }
- expbuf = lws_malloc(used_out + 1, __func__);
- if (!expbuf) {
- lwsl_err("%s, failed to allocate MQTT publish topic",
- __func__);
- return 1;
- }
-
- lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf,
- used_out + 1);
-
- if (lws_strexp_expand(&exp, h->policy->u.mqtt.topic,
- strlen(h->policy->u.mqtt.topic), &used_in,
- &used_out) != LSTRX_DONE) {
- lws_free(expbuf);
- return 1;
- }
- lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
- mqpp.topic = (char *)expbuf;
-
- mqpp.topic_len = (uint16_t)strlen(mqpp.topic);
- mqpp.packet_id = (uint16_t)(h->txord - 1);
- mqpp.payload = buf + LWS_PRE;
- if (h->writeable_len)
- mqpp.payload_len = (uint32_t)h->writeable_len;
- else
- mqpp.payload_len = (uint32_t)buflen;
-
- lwsl_notice("%s: payload len %d\n", __func__,
- (int)mqpp.payload_len);
-
- mqpp.qos = h->policy->u.mqtt.qos;
-
- if (lws_mqtt_client_send_publish(wsi, &mqpp,
- (const char *)buf + LWS_PRE,
- (uint32_t)buflen,
- f & LWSSS_FLAG_EOM)) {
- lwsl_notice("%s: failed to publish\n", __func__);
- lws_free(expbuf);
-
- return -1;
- }
- lws_free(expbuf);
-
- return 0;
+ return secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen,
+ h->policy->u.mqtt.topic,
+ h->policy->u.mqtt.qos, f);
}
case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
@@ -450,7 +482,9 @@ enum {
SSCMM_STRSUB_WILL_TOPIC,
SSCMM_STRSUB_WILL_MESSAGE,
SSCMM_STRSUB_SUBSCRIBE,
- SSCMM_STRSUB_TOPIC
+ SSCMM_STRSUB_TOPIC,
+ SSCMM_STRSUB_BIRTH_TOPIC,
+ SSCMM_STRSUB_BIRTH_MESSAGE
};
static int
@@ -458,16 +492,18 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
struct lws_client_connect_info *i,
union lws_ss_contemp *ct)
{
- const char *sources[4] = {
+ const char *sources[6] = {
/* we're going to string-substitute these before use */
h->policy->u.mqtt.will_topic,
h->policy->u.mqtt.will_message,
h->policy->u.mqtt.subscribe,
- h->policy->u.mqtt.topic
+ h->policy->u.mqtt.topic,
+ h->policy->u.mqtt.birth_topic,
+ h->policy->u.mqtt.birth_message
};
- size_t used_in, olen[4] = { 0, 0, 0, 0 }, tot = 0;
+ size_t used_in, olen[6] = { 0, 0, 0, 0, 0, 0 }, tot = 0;
lws_strexp_t exp;
- char *ps[4];
+ char *ps[6];
uint8_t *p = NULL;
int n = -1;
size_t blen;
@@ -485,6 +521,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
return -1;
}
p = (uint8_t *)lws_zalloc(blen+1, __func__);
+ if (!p)
+ return -1;
n = lws_system_blob_get(b, p, &blen, 0);
if (n) {
ct->ccp.client_id = NULL;
@@ -504,6 +542,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
/* If LWS_SYSBLOB_TYPE_MQTT_USERNAME is set */
if (b && (blen = lws_system_blob_get_size(b))) {
p = (uint8_t *)lws_zalloc(blen+1, __func__);
+ if (!p)
+ return -1;
n = lws_system_blob_get(b, p, &blen, 0);
if (n) {
ct->ccp.username = NULL;
@@ -520,6 +560,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
/* If LWS_SYSBLOB_TYPE_MQTT_PASSWORD is set */
if (b && (blen = lws_system_blob_get_size(b))) {
p = (uint8_t *)lws_zalloc(blen+1, __func__);
+ if (!p)
+ return -1;
n = lws_system_blob_get(b, p, &blen, 0);
if (n) {
ct->ccp.password = NULL;
@@ -534,6 +576,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
ct->ccp.clean_start = (h->policy->u.mqtt.clean_start & 1u);
ct->ccp.will_param.qos = h->policy->u.mqtt.will_qos;
ct->ccp.will_param.retain = h->policy->u.mqtt.will_retain;
+ ct->ccp.birth_param.qos = h->policy->u.mqtt.birth_qos;
+ ct->ccp.birth_param.retain = h->policy->u.mqtt.birth_retain;
ct->ccp.aws_iot = h->policy->u.mqtt.aws_iot;
h->u.mqtt.topic_qos.qos = h->policy->u.mqtt.qos;
@@ -608,6 +652,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
h->u.mqtt.subscribe_to = ps[SSCMM_STRSUB_SUBSCRIBE];
h->u.mqtt.subscribe_to_len = olen[SSCMM_STRSUB_SUBSCRIBE];
h->u.mqtt.topic_qos.name = ps[SSCMM_STRSUB_TOPIC];
+ ct->ccp.birth_param.topic = ps[SSCMM_STRSUB_BIRTH_TOPIC];
+ ct->ccp.birth_param.message = ps[SSCMM_STRSUB_BIRTH_MESSAGE];
i->method = "MQTT";
i->mqtt_cp = &ct->ccp;