mqtt: send ping at upkeep interval

Closes #16975
This commit is contained in:
Christian Schmitz
2025-04-05 13:28:03 +02:00
committed by Daniel Stenberg
parent 56e40ae6a5
commit 8ad0243e1f
4 changed files with 66 additions and 1 deletions

View File

@@ -26,6 +26,10 @@ Example subscribe:
This sends an MQTT SUBSCRIBE packet for the topic `bedroom/temp` and listen in
for incoming PUBLISH packets.
You can set the upkeep interval ms option to make curl send MQTT ping requests to the
server at an internal, to prevent the connection to get closed because of idleness.
You might then need to use the progress callback to cancel the operation.
### Publishing
Command usage:

View File

@@ -31,10 +31,13 @@ send some traffic on existing connections in order to keep them alive; this
can prevent connections from being closed due to overzealous firewalls, for
example.
Currently the only protocol with a connection upkeep mechanism is HTTP/2: when
For HTTP/2 we have an upkeep mechanism: when
the connection upkeep interval is exceeded and curl_easy_upkeep(3)
is called, an HTTP/2 PING frame is sent on the connection.
For MQTT the upkeep interval defines when to send ping requests to prevent the
server from disconnecting.
This function must be explicitly called in order to perform the upkeep work.
The connection upkeep interval is set with
CURLOPT_UPKEEP_INTERVAL_MS(3).

View File

@@ -46,12 +46,16 @@
/* The last #include file should be: */
#include "memdebug.h"
/* first byte is command.
second byte is for flags. */
#define MQTT_MSG_CONNECT 0x10
/* #define MQTT_MSG_CONNACK 0x20 */
#define MQTT_MSG_PUBLISH 0x30
#define MQTT_MSG_SUBSCRIBE 0x82
#define MQTT_MSG_SUBACK 0x90
#define MQTT_MSG_DISCONNECT 0xe0
#define MQTT_MSG_PINGREQ 0xC0
#define MQTT_MSG_PINGRESP 0xD0
#define MQTT_CONNACK_LEN 2
#define MQTT_SUBACK_LEN 3
@@ -125,6 +129,7 @@ static CURLcode mqtt_send(struct Curl_easy *data,
CURLcode result = Curl_xfer_send(data, buf, len, FALSE, &n);
if(result)
return result;
mq->lastTime = Curl_now();
Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
if(len != n) {
size_t nsend = len - n;
@@ -687,6 +692,9 @@ MQTT_SUBACK_COMING:
goto end;
}
/* we received something */
mq->lastTime = Curl_now();
/* if QoS is set, message contains packet id */
result = Curl_client_write(data, CLIENTWRITE_BODY, buffer, nread);
if(result)
@@ -709,9 +717,13 @@ end:
static CURLcode mqtt_do(struct Curl_easy *data, bool *done)
{
struct MQTT *mq = data->req.p.mqtt;
CURLcode result = CURLE_OK;
*done = FALSE; /* unconditionally */
mq->lastTime = Curl_now();
mq->pingsent = FALSE;
result = mqtt_connect(data);
if(result) {
failf(data, "Error %d sending MQTT CONNECT request", result);
@@ -732,6 +744,35 @@ static CURLcode mqtt_done(struct Curl_easy *data,
return CURLE_OK;
}
/* we ping regularly to avoid being disconnected by the server */
static CURLcode mqtt_ping(struct Curl_easy *data)
{
CURLcode result = CURLE_OK;
struct connectdata *conn = data->conn;
struct mqtt_conn *mqtt = &conn->proto.mqtt;
struct MQTT *mq = data->req.p.mqtt;
if(mqtt->state == MQTT_FIRST &&
!mq->pingsent &&
data->set.upkeep_interval_ms > 0) {
struct curltime t = Curl_now();
timediff_t diff = Curl_timediff(t, mq->lastTime);
if(diff > data->set.upkeep_interval_ms) {
/* 0xC0 is PINGREQ, and 0x00 is remaining length */
unsigned char packet[2] = { 0xC0, 0x00 };
size_t packetlen = sizeof(packet);
result = mqtt_send(data, (char *)packet, packetlen);
if(!result) {
mq->pingsent = TRUE;
}
infof(data, "mqtt_ping: sent ping request.");
}
}
return result;
}
static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
{
CURLcode result = CURLE_OK;
@@ -750,6 +791,10 @@ static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
return result;
}
result = mqtt_ping(data);
if(result)
return result;
infof(data, "mqtt_doing: state [%d]", (int) mqtt->state);
switch(mqtt->state) {
case MQTT_FIRST:
@@ -764,6 +809,10 @@ static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
break;
}
Curl_debug(data, CURLINFO_HEADER_IN, (const char *)&mq->firstbyte, 1);
/* we received something */
mq->lastTime = Curl_now();
/* remember the first byte */
mq->npacket = 0;
mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
@@ -794,6 +843,13 @@ static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
infof(data, "Got DISCONNECT");
*done = TRUE;
}
/* ping response */
if(mq->firstbyte == MQTT_MSG_PINGRESP) {
infof(data, "Received ping response.");
mq->pingsent = FALSE;
mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
}
break;
case MQTT_CONNACK:
result = mqtt_verify_connack(data);

View File

@@ -55,6 +55,8 @@ struct MQTT {
size_t npacket; /* byte counter */
size_t remaining_length;
unsigned char pkt_hd[4]; /* for decoding the arriving packet length */
struct curltime lastTime; /* last time we sent or received data */
bool pingsent; /* 1 while we wait for ping response */
unsigned char firstbyte;
};