VirtualBox

source: vbox/trunk/src/libs/curl-8.0.1/lib/mqtt.c@ 99797

Last change on this file since 99797 was 99344, checked in by vboxsync, 2 years ago

curl-8.0.1: Applied and adjusted our curl changes to 7.87.0 bugref:10417

  • Property svn:eol-style set to native
File size: 22.2 KB
Line 
1/***************************************************************************
2 * _ _ ____ _
3 * Project ___| | | | _ \| |
4 * / __| | | | |_) | |
5 * | (__| |_| | _ <| |___
6 * \___|\___/|_| \_\_____|
7 *
8 * Copyright (C) Daniel Stenberg, <[email protected]>, et al.
9 * Copyright (C) Björn Stenberg, <[email protected]>
10 *
11 * This software is licensed as described in the file COPYING, which
12 * you should have received as part of this distribution. The terms
13 * are also available at https://curl.se/docs/copyright.html.
14 *
15 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
16 * copies of the Software, and permit persons to whom the Software is
17 * furnished to do so, under the terms of the COPYING file.
18 *
19 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20 * KIND, either express or implied.
21 *
22 * SPDX-License-Identifier: curl
23 *
24 ***************************************************************************/
25
26#include "curl_setup.h"
27
28#ifndef CURL_DISABLE_MQTT
29
30#include "urldata.h"
31#include <curl/curl.h>
32#include "transfer.h"
33#include "sendf.h"
34#include "progress.h"
35#include "mqtt.h"
36#include "select.h"
37#include "strdup.h"
38#include "url.h"
39#include "escape.h"
40#include "warnless.h"
41#include "curl_printf.h"
42#include "curl_memory.h"
43#include "multiif.h"
44#include "rand.h"
45
46/* The last #include file should be: */
47#include "memdebug.h"
48
49#define MQTT_MSG_CONNECT 0x10
50#define MQTT_MSG_CONNACK 0x20
51#define MQTT_MSG_PUBLISH 0x30
52#define MQTT_MSG_SUBSCRIBE 0x82
53#define MQTT_MSG_SUBACK 0x90
54#define MQTT_MSG_DISCONNECT 0xe0
55
56#define MQTT_CONNACK_LEN 2
57#define MQTT_SUBACK_LEN 3
58#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
59
60/*
61 * Forward declarations.
62 */
63
64static CURLcode mqtt_do(struct Curl_easy *data, bool *done);
65static CURLcode mqtt_done(struct Curl_easy *data,
66 CURLcode status, bool premature);
67static CURLcode mqtt_doing(struct Curl_easy *data, bool *done);
68static int mqtt_getsock(struct Curl_easy *data, struct connectdata *conn,
69 curl_socket_t *sock);
70static CURLcode mqtt_setup_conn(struct Curl_easy *data,
71 struct connectdata *conn);
72
73/*
74 * MQTT protocol handler.
75 */
76
77const struct Curl_handler Curl_handler_mqtt = {
78 "MQTT", /* scheme */
79 mqtt_setup_conn, /* setup_connection */
80 mqtt_do, /* do_it */
81 mqtt_done, /* done */
82 ZERO_NULL, /* do_more */
83 ZERO_NULL, /* connect_it */
84 ZERO_NULL, /* connecting */
85 mqtt_doing, /* doing */
86 ZERO_NULL, /* proto_getsock */
87 mqtt_getsock, /* doing_getsock */
88 ZERO_NULL, /* domore_getsock */
89 ZERO_NULL, /* perform_getsock */
90 ZERO_NULL, /* disconnect */
91 ZERO_NULL, /* readwrite */
92 ZERO_NULL, /* connection_check */
93 ZERO_NULL, /* attach connection */
94 PORT_MQTT, /* defport */
95 CURLPROTO_MQTT, /* protocol */
96 CURLPROTO_MQTT, /* family */
97 PROTOPT_NONE /* flags */
98};
99
100static CURLcode mqtt_setup_conn(struct Curl_easy *data,
101 struct connectdata *conn)
102{
103 /* allocate the HTTP-specific struct for the Curl_easy, only to survive
104 during this request */
105 struct MQTT *mq;
106 (void)conn;
107 DEBUGASSERT(data->req.p.mqtt == NULL);
108
109 mq = calloc(1, sizeof(struct MQTT));
110 if(!mq)
111 return CURLE_OUT_OF_MEMORY;
112 data->req.p.mqtt = mq;
113 return CURLE_OK;
114}
115
116static CURLcode mqtt_send(struct Curl_easy *data,
117 char *buf, size_t len)
118{
119 CURLcode result = CURLE_OK;
120 struct connectdata *conn = data->conn;
121 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
122 struct MQTT *mq = data->req.p.mqtt;
123 ssize_t n;
124 result = Curl_write(data, sockfd, buf, len, &n);
125 if(result)
126 return result;
127 Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
128 if(len != (size_t)n) {
129 size_t nsend = len - n;
130 char *sendleftovers = Curl_memdup(&buf[n], nsend);
131 if(!sendleftovers)
132 return CURLE_OUT_OF_MEMORY;
133 mq->sendleftovers = sendleftovers;
134 mq->nsend = nsend;
135 }
136 else {
137 mq->sendleftovers = NULL;
138 mq->nsend = 0;
139 }
140 return result;
141}
142
143/* Generic function called by the multi interface to figure out what socket(s)
144 to wait for and for what actions during the DOING and PROTOCONNECT
145 states */
146static int mqtt_getsock(struct Curl_easy *data,
147 struct connectdata *conn,
148 curl_socket_t *sock)
149{
150 (void)data;
151 sock[0] = conn->sock[FIRSTSOCKET];
152 return GETSOCK_READSOCK(FIRSTSOCKET);
153}
154
155static int mqtt_encode_len(char *buf, size_t len)
156{
157 unsigned char encoded;
158 int i;
159
160 for(i = 0; (len > 0) && (i<4); i++) {
161 encoded = len % 0x80;
162 len /= 0x80;
163 if(len)
164 encoded |= 0x80;
165 buf[i] = encoded;
166 }
167
168 return i;
169}
170
171/* add the passwd to the CONNECT packet */
172static int add_passwd(const char *passwd, const size_t plen,
173 char *pkt, const size_t start, int remain_pos)
174{
175 /* magic number that need to be set properly */
176 const size_t conn_flags_pos = remain_pos + 8;
177 if(plen > 0xffff)
178 return 1;
179
180 /* set password flag */
181 pkt[conn_flags_pos] |= 0x40;
182
183 /* length of password provided */
184 pkt[start] = (char)((plen >> 8) & 0xFF);
185 pkt[start + 1] = (char)(plen & 0xFF);
186 memcpy(&pkt[start + 2], passwd, plen);
187 return 0;
188}
189
190/* add user to the CONNECT packet */
191static int add_user(const char *username, const size_t ulen,
192 unsigned char *pkt, const size_t start, int remain_pos)
193{
194 /* magic number that need to be set properly */
195 const size_t conn_flags_pos = remain_pos + 8;
196 if(ulen > 0xffff)
197 return 1;
198
199 /* set username flag */
200 pkt[conn_flags_pos] |= 0x80;
201 /* length of username provided */
202 pkt[start] = (unsigned char)((ulen >> 8) & 0xFF);
203 pkt[start + 1] = (unsigned char)(ulen & 0xFF);
204 memcpy(&pkt[start + 2], username, ulen);
205 return 0;
206}
207
208/* add client ID to the CONNECT packet */
209static int add_client_id(const char *client_id, const size_t client_id_len,
210 char *pkt, const size_t start)
211{
212 if(client_id_len != MQTT_CLIENTID_LEN)
213 return 1;
214 pkt[start] = 0x00;
215 pkt[start + 1] = MQTT_CLIENTID_LEN;
216 memcpy(&pkt[start + 2], client_id, MQTT_CLIENTID_LEN);
217 return 0;
218}
219
220/* Set initial values of CONNECT packet */
221static int init_connpack(char *packet, char *remain, int remain_pos)
222{
223 /* Fixed header starts */
224 /* packet type */
225 packet[0] = MQTT_MSG_CONNECT;
226 /* remaining length field */
227 memcpy(&packet[1], remain, remain_pos);
228 /* Fixed header ends */
229
230 /* Variable header starts */
231 /* protocol length */
232 packet[remain_pos + 1] = 0x00;
233 packet[remain_pos + 2] = 0x04;
234 /* protocol name */
235 packet[remain_pos + 3] = 'M';
236 packet[remain_pos + 4] = 'Q';
237 packet[remain_pos + 5] = 'T';
238 packet[remain_pos + 6] = 'T';
239 /* protocol level */
240 packet[remain_pos + 7] = 0x04;
241 /* CONNECT flag: CleanSession */
242 packet[remain_pos + 8] = 0x02;
243 /* keep-alive 0 = disabled */
244 packet[remain_pos + 9] = 0x00;
245 packet[remain_pos + 10] = 0x3c;
246 /* end of variable header */
247 return remain_pos + 10;
248}
249
250static CURLcode mqtt_connect(struct Curl_easy *data)
251{
252 CURLcode result = CURLE_OK;
253 int pos = 0;
254 int rc = 0;
255 /* remain length */
256 int remain_pos = 0;
257 char remain[4] = {0};
258 size_t packetlen = 0;
259 size_t payloadlen = 0;
260 size_t start_user = 0;
261 size_t start_pwd = 0;
262 char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
263 const size_t clen = strlen("curl");
264 char *packet = NULL;
265
266 /* extracting username from request */
267 const char *username = data->state.aptr.user ?
268 data->state.aptr.user : "";
269 const size_t ulen = strlen(username);
270 /* extracting password from request */
271 const char *passwd = data->state.aptr.passwd ?
272 data->state.aptr.passwd : "";
273 const size_t plen = strlen(passwd);
274
275 payloadlen = ulen + plen + MQTT_CLIENTID_LEN + 2;
276 /* The plus 2 are for the MSB and LSB describing the length of the string to
277 * be added on the payload. Refer to spec 1.5.2 and 1.5.4 */
278 if(ulen)
279 payloadlen += 2;
280 if(plen)
281 payloadlen += 2;
282
283 /* getting how much occupy the remain length */
284 remain_pos = mqtt_encode_len(remain, payloadlen + 10);
285
286 /* 10 length of variable header and 1 the first byte of the fixed header */
287 packetlen = payloadlen + 10 + remain_pos + 1;
288
289 /* allocating packet */
290 if(packetlen > 268435455)
291 return CURLE_WEIRD_SERVER_REPLY;
292 packet = malloc(packetlen);
293 if(!packet)
294 return CURLE_OUT_OF_MEMORY;
295 memset(packet, 0, packetlen);
296
297 /* set initial values for the CONNECT packet */
298 pos = init_connpack(packet, remain, remain_pos);
299
300 result = Curl_rand_hex(data, (unsigned char *)&client_id[clen],
301 MQTT_CLIENTID_LEN - clen + 1);
302 /* add client id */
303 rc = add_client_id(client_id, strlen(client_id), packet, pos + 1);
304 if(rc) {
305 failf(data, "Client ID length mismatched: [%lu]", strlen(client_id));
306 result = CURLE_WEIRD_SERVER_REPLY;
307 goto end;
308 }
309 infof(data, "Using client id '%s'", client_id);
310
311 /* position where starts the user payload */
312 start_user = pos + 3 + MQTT_CLIENTID_LEN;
313 /* position where starts the password payload */
314 start_pwd = start_user + ulen;
315 /* if user name was provided, add it to the packet */
316 if(ulen) {
317 start_pwd += 2;
318
319 rc = add_user(username, ulen,
320 (unsigned char *)packet, start_user, remain_pos);
321 if(rc) {
322 failf(data, "Username is too large: [%lu]", ulen);
323 result = CURLE_WEIRD_SERVER_REPLY;
324 goto end;
325 }
326 }
327
328 /* if passwd was provided, add it to the packet */
329 if(plen) {
330 rc = add_passwd(passwd, plen, packet, start_pwd, remain_pos);
331 if(rc) {
332 failf(data, "Password is too large: [%lu]", plen);
333 result = CURLE_WEIRD_SERVER_REPLY;
334 goto end;
335 }
336 }
337
338 if(!result)
339 result = mqtt_send(data, packet, packetlen);
340
341end:
342 if(packet)
343 free(packet);
344 Curl_safefree(data->state.aptr.user);
345 Curl_safefree(data->state.aptr.passwd);
346 return result;
347}
348
349static CURLcode mqtt_disconnect(struct Curl_easy *data)
350{
351 CURLcode result = CURLE_OK;
352 struct MQTT *mq = data->req.p.mqtt;
353 result = mqtt_send(data, (char *)"\xe0\x00", 2);
354 Curl_safefree(mq->sendleftovers);
355 return result;
356}
357
358static CURLcode mqtt_verify_connack(struct Curl_easy *data)
359{
360 CURLcode result;
361 struct connectdata *conn = data->conn;
362 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
363 unsigned char readbuf[MQTT_CONNACK_LEN];
364 ssize_t nread;
365
366 result = Curl_read(data, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
367 if(result)
368 goto fail;
369
370 Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
371
372 /* fixme */
373 if(nread < MQTT_CONNACK_LEN) {
374 result = CURLE_WEIRD_SERVER_REPLY;
375 goto fail;
376 }
377
378 /* verify CONNACK */
379 if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
380 failf(data, "Expected %02x%02x but got %02x%02x",
381 0x00, 0x00, readbuf[0], readbuf[1]);
382 result = CURLE_WEIRD_SERVER_REPLY;
383 }
384
385fail:
386 return result;
387}
388
389static CURLcode mqtt_get_topic(struct Curl_easy *data,
390 char **topic, size_t *topiclen)
391{
392 char *path = data->state.up.path;
393 CURLcode result = CURLE_URL_MALFORMAT;
394 if(strlen(path) > 1) {
395 result = Curl_urldecode(path + 1, 0, topic, topiclen, REJECT_NADA);
396 if(!result && (*topiclen > 0xffff)) {
397 failf(data, "Too long MQTT topic");
398 result = CURLE_URL_MALFORMAT;
399 }
400 }
401 else
402 failf(data, "No MQTT topic found. Forgot to URL encode it?");
403
404 return result;
405}
406
407static CURLcode mqtt_subscribe(struct Curl_easy *data)
408{
409 CURLcode result = CURLE_OK;
410 char *topic = NULL;
411 size_t topiclen;
412 unsigned char *packet = NULL;
413 size_t packetlen;
414 char encodedsize[4];
415 size_t n;
416 struct connectdata *conn = data->conn;
417
418 result = mqtt_get_topic(data, &topic, &topiclen);
419 if(result)
420 goto fail;
421
422 conn->proto.mqtt.packetid++;
423
424 packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
425 + 2 bytes topic length + QoS byte */
426 n = mqtt_encode_len((char *)encodedsize, packetlen);
427 packetlen += n + 1; /* add one for the control packet type byte */
428
429 packet = malloc(packetlen);
430 if(!packet) {
431 result = CURLE_OUT_OF_MEMORY;
432 goto fail;
433 }
434
435 packet[0] = MQTT_MSG_SUBSCRIBE;
436 memcpy(&packet[1], encodedsize, n);
437 packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
438 packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
439 packet[3 + n] = (topiclen >> 8) & 0xff;
440 packet[4 + n ] = topiclen & 0xff;
441 memcpy(&packet[5 + n], topic, topiclen);
442 packet[5 + n + topiclen] = 0; /* QoS zero */
443
444 result = mqtt_send(data, (char *)packet, packetlen);
445
446fail:
447 free(topic);
448 free(packet);
449 return result;
450}
451
452/*
453 * Called when the first byte was already read.
454 */
455static CURLcode mqtt_verify_suback(struct Curl_easy *data)
456{
457 CURLcode result;
458 struct connectdata *conn = data->conn;
459 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
460 unsigned char readbuf[MQTT_SUBACK_LEN];
461 ssize_t nread;
462 struct mqtt_conn *mqtt = &conn->proto.mqtt;
463
464 result = Curl_read(data, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
465 if(result)
466 goto fail;
467
468 Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
469
470 /* fixme */
471 if(nread < MQTT_SUBACK_LEN) {
472 result = CURLE_WEIRD_SERVER_REPLY;
473 goto fail;
474 }
475
476 /* verify SUBACK */
477 if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
478 readbuf[1] != (mqtt->packetid & 0xff) ||
479 readbuf[2] != 0x00)
480 result = CURLE_WEIRD_SERVER_REPLY;
481
482fail:
483 return result;
484}
485
486static CURLcode mqtt_publish(struct Curl_easy *data)
487{
488 CURLcode result;
489 char *payload = data->set.postfields;
490 size_t payloadlen;
491 char *topic = NULL;
492 size_t topiclen;
493 unsigned char *pkt = NULL;
494 size_t i = 0;
495 size_t remaininglength;
496 size_t encodelen;
497 char encodedbytes[4];
498 curl_off_t postfieldsize = data->set.postfieldsize;
499
500 if(!payload)
501 return CURLE_BAD_FUNCTION_ARGUMENT;
502 if(postfieldsize < 0)
503 payloadlen = strlen(payload);
504 else
505 payloadlen = (size_t)postfieldsize;
506
507 result = mqtt_get_topic(data, &topic, &topiclen);
508 if(result)
509 goto fail;
510
511 remaininglength = payloadlen + 2 + topiclen;
512 encodelen = mqtt_encode_len(encodedbytes, remaininglength);
513
514 /* add the control byte and the encoded remaining length */
515 pkt = malloc(remaininglength + 1 + encodelen);
516 if(!pkt) {
517 result = CURLE_OUT_OF_MEMORY;
518 goto fail;
519 }
520
521 /* assemble packet */
522 pkt[i++] = MQTT_MSG_PUBLISH;
523 memcpy(&pkt[i], encodedbytes, encodelen);
524 i += encodelen;
525 pkt[i++] = (topiclen >> 8) & 0xff;
526 pkt[i++] = (topiclen & 0xff);
527 memcpy(&pkt[i], topic, topiclen);
528 i += topiclen;
529 memcpy(&pkt[i], payload, payloadlen);
530 i += payloadlen;
531 result = mqtt_send(data, (char *)pkt, i);
532
533fail:
534 free(pkt);
535 free(topic);
536 return result;
537}
538
539static size_t mqtt_decode_len(unsigned char *buf,
540 size_t buflen, size_t *lenbytes)
541{
542 size_t len = 0;
543 size_t mult = 1;
544 size_t i;
545 unsigned char encoded = 128;
546
547 for(i = 0; (i < buflen) && (encoded & 128); i++) {
548 encoded = buf[i];
549 len += (encoded & 127) * mult;
550 mult *= 128;
551 }
552
553 if(lenbytes)
554 *lenbytes = i;
555
556 return len;
557}
558
559#ifdef CURLDEBUG
560static const char *statenames[]={
561 "MQTT_FIRST",
562 "MQTT_REMAINING_LENGTH",
563 "MQTT_CONNACK",
564 "MQTT_SUBACK",
565 "MQTT_SUBACK_COMING",
566 "MQTT_PUBWAIT",
567 "MQTT_PUB_REMAIN",
568
569 "NOT A STATE"
570};
571#endif
572
573/* The only way to change state */
574static void mqstate(struct Curl_easy *data,
575 enum mqttstate state,
576 enum mqttstate nextstate) /* used if state == FIRST */
577{
578 struct connectdata *conn = data->conn;
579 struct mqtt_conn *mqtt = &conn->proto.mqtt;
580#ifdef CURLDEBUG
581 infof(data, "%s (from %s) (next is %s)",
582 statenames[state],
583 statenames[mqtt->state],
584 (state == MQTT_FIRST)? statenames[nextstate] : "");
585#endif
586 mqtt->state = state;
587 if(state == MQTT_FIRST)
588 mqtt->nextstate = nextstate;
589}
590
591
592/* for the publish packet */
593#define MQTT_HEADER_LEN 5 /* max 5 bytes */
594
595static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done)
596{
597 CURLcode result = CURLE_OK;
598 struct connectdata *conn = data->conn;
599 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
600 ssize_t nread;
601 unsigned char *pkt = (unsigned char *)data->state.buffer;
602 size_t remlen;
603 struct mqtt_conn *mqtt = &conn->proto.mqtt;
604 struct MQTT *mq = data->req.p.mqtt;
605 unsigned char packet;
606
607 switch(mqtt->state) {
608 MQTT_SUBACK_COMING:
609 case MQTT_SUBACK_COMING:
610 result = mqtt_verify_suback(data);
611 if(result)
612 break;
613
614 mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
615 break;
616
617 case MQTT_SUBACK:
618 case MQTT_PUBWAIT:
619 /* we are expecting PUBLISH or SUBACK */
620 packet = mq->firstbyte & 0xf0;
621 if(packet == MQTT_MSG_PUBLISH)
622 mqstate(data, MQTT_PUB_REMAIN, MQTT_NOSTATE);
623 else if(packet == MQTT_MSG_SUBACK) {
624 mqstate(data, MQTT_SUBACK_COMING, MQTT_NOSTATE);
625 goto MQTT_SUBACK_COMING;
626 }
627 else if(packet == MQTT_MSG_DISCONNECT) {
628 infof(data, "Got DISCONNECT");
629 *done = TRUE;
630 goto end;
631 }
632 else {
633 result = CURLE_WEIRD_SERVER_REPLY;
634 goto end;
635 }
636
637 /* -- switched state -- */
638 remlen = mq->remaining_length;
639 infof(data, "Remaining length: %zd bytes", remlen);
640 if(data->set.max_filesize &&
641 (curl_off_t)remlen > data->set.max_filesize) {
642 failf(data, "Maximum file size exceeded");
643 result = CURLE_FILESIZE_EXCEEDED;
644 goto end;
645 }
646 Curl_pgrsSetDownloadSize(data, remlen);
647 data->req.bytecount = 0;
648 data->req.size = remlen;
649 mq->npacket = remlen; /* get this many bytes */
650 /* FALLTHROUGH */
651 case MQTT_PUB_REMAIN: {
652 /* read rest of packet, but no more. Cap to buffer size */
653 struct SingleRequest *k = &data->req;
654 size_t rest = mq->npacket;
655 if(rest > (size_t)data->set.buffer_size)
656 rest = (size_t)data->set.buffer_size;
657 result = Curl_read(data, sockfd, (char *)pkt, rest, &nread);
658 if(result) {
659 if(CURLE_AGAIN == result) {
660 infof(data, "EEEE AAAAGAIN");
661 }
662 goto end;
663 }
664 if(!nread) {
665 infof(data, "server disconnected");
666 result = CURLE_PARTIAL_FILE;
667 goto end;
668 }
669 Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
670
671 mq->npacket -= nread;
672 k->bytecount += nread;
673 Curl_pgrsSetDownloadCounter(data, k->bytecount);
674
675 /* if QoS is set, message contains packet id */
676
677 result = Curl_client_write(data, CLIENTWRITE_BODY, (char *)pkt, nread);
678 if(result)
679 goto end;
680
681 if(!mq->npacket)
682 /* no more PUBLISH payload, back to subscribe wait state */
683 mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
684 break;
685 }
686 default:
687 DEBUGASSERT(NULL); /* illegal state */
688 result = CURLE_WEIRD_SERVER_REPLY;
689 goto end;
690 }
691 end:
692 return result;
693}
694
695static CURLcode mqtt_do(struct Curl_easy *data, bool *done)
696{
697 CURLcode result = CURLE_OK;
698 *done = FALSE; /* unconditionally */
699
700 result = mqtt_connect(data);
701 if(result) {
702 failf(data, "Error %d sending MQTT CONNECT request", result);
703 return result;
704 }
705 mqstate(data, MQTT_FIRST, MQTT_CONNACK);
706 return CURLE_OK;
707}
708
709static CURLcode mqtt_done(struct Curl_easy *data,
710 CURLcode status, bool premature)
711{
712 struct MQTT *mq = data->req.p.mqtt;
713 (void)status;
714 (void)premature;
715 Curl_safefree(mq->sendleftovers);
716 return CURLE_OK;
717}
718
719static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
720{
721 CURLcode result = CURLE_OK;
722 struct connectdata *conn = data->conn;
723 struct mqtt_conn *mqtt = &conn->proto.mqtt;
724 struct MQTT *mq = data->req.p.mqtt;
725 ssize_t nread;
726 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
727 unsigned char *pkt = (unsigned char *)data->state.buffer;
728 unsigned char byte;
729
730 *done = FALSE;
731
732 if(mq->nsend) {
733 /* send the remainder of an outgoing packet */
734 char *ptr = mq->sendleftovers;
735 result = mqtt_send(data, mq->sendleftovers, mq->nsend);
736 free(ptr);
737 if(result)
738 return result;
739 }
740
741 infof(data, "mqtt_doing: state [%d]", (int) mqtt->state);
742 switch(mqtt->state) {
743 case MQTT_FIRST:
744 /* Read the initial byte only */
745 result = Curl_read(data, sockfd, (char *)&mq->firstbyte, 1, &nread);
746 if(result)
747 break;
748 else if(!nread) {
749 failf(data, "Connection disconnected");
750 *done = TRUE;
751 result = CURLE_RECV_ERROR;
752 break;
753 }
754 Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
755 /* remember the first byte */
756 mq->npacket = 0;
757 mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
758 /* FALLTHROUGH */
759 case MQTT_REMAINING_LENGTH:
760 do {
761 result = Curl_read(data, sockfd, (char *)&byte, 1, &nread);
762 if(!nread)
763 break;
764 Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
765 pkt[mq->npacket++] = byte;
766 } while((byte & 0x80) && (mq->npacket < 4));
767 if(nread && (byte & 0x80))
768 /* MQTT supports up to 127 * 128^0 + 127 * 128^1 + 127 * 128^2 +
769 127 * 128^3 bytes. server tried to send more */
770 result = CURLE_WEIRD_SERVER_REPLY;
771 if(result)
772 break;
773 mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
774 mq->npacket = 0;
775 if(mq->remaining_length) {
776 mqstate(data, mqtt->nextstate, MQTT_NOSTATE);
777 break;
778 }
779 mqstate(data, MQTT_FIRST, MQTT_FIRST);
780
781 if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
782 infof(data, "Got DISCONNECT");
783 *done = TRUE;
784 }
785 break;
786 case MQTT_CONNACK:
787 result = mqtt_verify_connack(data);
788 if(result)
789 break;
790
791 if(data->state.httpreq == HTTPREQ_POST) {
792 result = mqtt_publish(data);
793 if(!result) {
794 result = mqtt_disconnect(data);
795 *done = TRUE;
796 }
797 mqtt->nextstate = MQTT_FIRST;
798 }
799 else {
800 result = mqtt_subscribe(data);
801 if(!result) {
802 mqstate(data, MQTT_FIRST, MQTT_SUBACK);
803 }
804 }
805 break;
806
807 case MQTT_SUBACK:
808 case MQTT_PUBWAIT:
809 case MQTT_PUB_REMAIN:
810 result = mqtt_read_publish(data, done);
811 break;
812
813 default:
814 failf(data, "State not handled yet");
815 *done = TRUE;
816 break;
817 }
818
819 if(result == CURLE_AGAIN)
820 result = CURLE_OK;
821 return result;
822}
823
824#endif /* CURL_DISABLE_MQTT */
Note: See TracBrowser for help on using the repository browser.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette