SocketBuffer.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2009, 2020 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial API and implementation and/or initial documentation
15  * Ian Craggs, Allan Stockdill-Mander - SSL updates
16  * Ian Craggs - fix for issue #244, issue #20
17  *******************************************************************************/
18 
25 #include "SocketBuffer.h"
26 #include "LinkedList.h"
27 #include "Log.h"
28 #include "Messages.h"
29 #include "StackTrace.h"
30 
31 #include <stdlib.h>
32 #include <stdio.h>
33 #include <string.h>
34 
35 #include "Heap.h"
36 
37 #if defined(_WIN32) || defined(_WIN64)
38 #define iov_len len
39 #define iov_base buf
40 #endif
41 
46 
50 static List* queues;
51 
55 static List writes;
56 
57 
58 int socketcompare(void* a, void* b);
59 int SocketBuffer_newDefQ(void);
60 void SocketBuffer_freeDefQ(void);
61 int pending_socketcompare(void* a, void* b);
62 
63 
70 int socketcompare(void* a, void* b)
71 {
72  return ((socket_queue*)a)->socket == *(int*)b;
73 }
74 
75 
80 {
81  int rc = PAHO_MEMORY_ERROR;
82 
83  def_queue = malloc(sizeof(socket_queue));
84  if (def_queue)
85  {
86  def_queue->buflen = 1000;
87  def_queue->buf = malloc(def_queue->buflen);
88  if (def_queue->buf)
89  {
90  def_queue->socket = def_queue->index = 0;
91  def_queue->buflen = def_queue->datalen = def_queue->headerlen = 0;
92  rc = 0;
93  }
94  }
95  return rc;
96 }
97 
98 
103 {
104  int rc = 0;
105 
106  FUNC_ENTRY;
107  rc = SocketBuffer_newDefQ();
108  if (rc == 0)
109  {
110  if ((queues = ListInitialize()) == NULL)
111  rc = PAHO_MEMORY_ERROR;
112  }
113  ListZero(&writes);
114  FUNC_EXIT_RC(rc);
115  return rc;
116 }
117 
118 
123 {
124  free(def_queue->buf);
125  free(def_queue);
126  def_queue = NULL;
127 }
128 
129 
134 {
135  ListElement* cur = NULL;
136  ListEmpty(&writes);
137 
138  FUNC_ENTRY;
139  while (ListNextElement(queues, &cur))
140  free(((socket_queue*)(cur->content))->buf);
141  ListFree(queues);
143  FUNC_EXIT;
144 }
145 
146 
152 {
153  FUNC_ENTRY;
154  SocketBuffer_writeComplete(socket); /* clean up write buffers */
155  if (ListFindItem(queues, &socket, socketcompare))
156  {
157  free(((socket_queue*)(queues->current->content))->buf);
158  ListRemove(queues, queues->current->content);
159  }
160  if (def_queue->socket == socket)
161  {
162  def_queue->socket = def_queue->index = 0;
163  def_queue->headerlen = def_queue->datalen = 0;
164  }
165  FUNC_EXIT;
166 }
167 
168 
176 char* SocketBuffer_getQueuedData(int socket, size_t bytes, size_t* actual_len)
177 {
178  socket_queue* queue = NULL;
179 
180  FUNC_ENTRY;
181  if (ListFindItem(queues, &socket, socketcompare))
182  { /* if there is queued data for this socket, add any data read to it */
183  queue = (socket_queue*)(queues->current->content);
184  *actual_len = queue->datalen;
185  }
186  else
187  {
188  *actual_len = 0;
189  queue = def_queue;
190  }
191  if (bytes > queue->buflen)
192  {
193  if (queue->datalen > 0)
194  {
195  void* newmem = malloc(bytes);
196 
197  free(queue->buf);
198  queue->buf = newmem;
199  if (!newmem)
200  goto exit;
201  memcpy(newmem, queue->buf, queue->datalen);
202  }
203  else
204  queue->buf = realloc(queue->buf, bytes);
205  queue->buflen = bytes;
206  }
207 exit:
208  FUNC_EXIT;
209  return queue->buf;
210 }
211 
212 
220 {
222 
223  FUNC_ENTRY;
224  if (ListFindItem(queues, &socket, socketcompare))
225  { /* if there is queued data for this socket, read that first */
226  socket_queue* queue = (socket_queue*)(queues->current->content);
227  if (queue->index < queue->headerlen)
228  {
229  *c = queue->fixed_header[(queue->index)++];
230  Log(TRACE_MAX, -1, "index is now %d, headerlen %d", queue->index, (int)queue->headerlen);
232  goto exit;
233  }
234  else if (queue->index > 4)
235  {
236  Log(LOG_FATAL, -1, "header is already at full length");
237  rc = SOCKET_ERROR;
238  goto exit;
239  }
240  }
241 exit:
242  FUNC_EXIT_RC(rc);
243  return rc; /* there was no queued char if rc is SOCKETBUFFER_INTERRUPTED*/
244 }
245 
246 
252 void SocketBuffer_interrupted(int socket, size_t actual_len)
253 {
254  socket_queue* queue = NULL;
255 
256  FUNC_ENTRY;
257  if (ListFindItem(queues, &socket, socketcompare))
258  queue = (socket_queue*)(queues->current->content);
259  else /* new saved queue */
260  {
261  queue = def_queue;
262  /* if SocketBuffer_queueChar() has not yet been called, then the socket number
263  in def_queue will not have been set. Issue #244.
264  If actual_len == 0 then we may not need to do anything - I'll leave that
265  optimization for another time. */
266  queue->socket = socket;
267  ListAppend(queues, def_queue, sizeof(socket_queue)+def_queue->buflen);
269  }
270  queue->index = 0;
271  queue->datalen = actual_len;
272  FUNC_EXIT;
273 }
274 
275 
282 {
283  FUNC_ENTRY;
284  if (ListFindItem(queues, &socket, socketcompare))
285  {
286  socket_queue* queue = (socket_queue*)(queues->current->content);
288  def_queue = queue;
289  ListDetach(queues, queue);
290  }
291  def_queue->socket = def_queue->index = 0;
292  def_queue->headerlen = def_queue->datalen = 0;
293  FUNC_EXIT;
294  return def_queue->buf;
295 }
296 
297 
304 {
305  int error = 0;
306  socket_queue* curq = def_queue;
307 
308  FUNC_ENTRY;
309  if (ListFindItem(queues, &socket, socketcompare))
310  curq = (socket_queue*)(queues->current->content);
311  else if (def_queue->socket == 0)
312  {
313  def_queue->socket = socket;
314  def_queue->index = 0;
315  def_queue->datalen = 0;
316  }
317  else if (def_queue->socket != socket)
318  {
319  Log(LOG_FATAL, -1, "attempt to reuse socket queue");
320  error = 1;
321  }
322  if (curq->index > 4)
323  {
324  Log(LOG_FATAL, -1, "socket queue fixed_header field full");
325  error = 1;
326  }
327  if (!error)
328  {
329  curq->fixed_header[(curq->index)++] = c;
330  curq->headerlen = curq->index;
331  }
332  Log(TRACE_MAX, -1, "queueChar: index is now %d, headerlen %d", curq->index, (int)curq->headerlen);
333  FUNC_EXIT;
334 }
335 
336 
346 #if defined(OPENSSL)
347 int SocketBuffer_pendingWrite(int socket, SSL* ssl, int count, iobuf* iovecs, int* frees, size_t total, size_t bytes)
348 #else
349 int SocketBuffer_pendingWrite(int socket, int count, iobuf* iovecs, int* frees, size_t total, size_t bytes)
350 #endif
351 {
352  int i = 0;
353  pending_writes* pw = NULL;
354  int rc = 0;
355 
356  FUNC_ENTRY;
357  /* store the buffers until the whole packet is written */
358  if ((pw = malloc(sizeof(pending_writes))) == NULL)
359  {
360  rc = PAHO_MEMORY_ERROR;
361  goto exit;
362  }
363  pw->socket = socket;
364 #if defined(OPENSSL)
365  pw->ssl = ssl;
366 #endif
367  pw->bytes = bytes;
368  pw->total = total;
369  pw->count = count;
370  for (i = 0; i < count; i++)
371  {
372  pw->iovecs[i] = iovecs[i];
373  pw->frees[i] = frees[i];
374  }
375  ListAppend(&writes, pw, sizeof(pw) + total);
376 exit:
377  FUNC_EXIT_RC(rc);
378  return rc;
379 }
380 
381 
388 int pending_socketcompare(void* a, void* b)
389 {
390  return ((pending_writes*)a)->socket == *(int*)b;
391 }
392 
393 
400 {
401  ListElement* le = ListFindItem(&writes, &socket, pending_socketcompare);
402  return (le) ? (pending_writes*)(le->content) : NULL;
403 }
404 
405 
412 {
413  return ListRemoveItem(&writes, &socket, pending_socketcompare);
414 }
415 
416 
424 pending_writes* SocketBuffer_updateWrite(int socket, char* topic, char* payload)
425 {
426  pending_writes* pw = NULL;
427  ListElement* le = NULL;
428 
429  FUNC_ENTRY;
430  if ((le = ListFindItem(&writes, &socket, pending_socketcompare)) != NULL)
431  {
432  pw = (pending_writes*)(le->content);
433  if (pw->count == 4)
434  {
435  pw->iovecs[2].iov_base = topic;
436  pw->iovecs[3].iov_base = payload;
437  }
438  }
439 
440  FUNC_EXIT;
441  return pw;
442 }
char * SocketBuffer_getQueuedData(int socket, size_t bytes, size_t *actual_len)
Definition: SocketBuffer.c:176
Definition: Log.h:43
string topic
Definition: test2.py:8
#define FUNC_EXIT
Definition: StackTrace.h:59
static socket_queue * def_queue
Definition: SocketBuffer.c:45
int SocketBuffer_newDefQ(void)
Definition: SocketBuffer.c:79
struct @73::@74 bytes[4]
ListElement * current
Definition: LinkedList.h:69
pending_writes * SocketBuffer_updateWrite(int socket, char *topic, char *payload)
Definition: SocketBuffer.c:424
int SocketBuffer_initialize(void)
Definition: SocketBuffer.c:102
#define PAHO_MEMORY_ERROR
Definition: Heap.h:26
void SocketBuffer_queueChar(int socket, char c)
Definition: SocketBuffer.c:303
#define malloc(x)
Definition: Heap.h:41
#define SOCKETBUFFER_COMPLETE
Definition: SocketBuffer.h:60
static l_noret error(LoadState *S, const char *why)
Definition: lundump.c:40
void ListZero(List *newl)
Definition: LinkedList.c:42
int ListRemove(List *aList, void *content)
Definition: LinkedList.c:257
#define TRACE_MAX
Definition: Log.h:65
int ListDetach(List *aList, void *content)
Definition: LinkedList.c:245
#define free(x)
Definition: Heap.h:55
void ListEmpty(List *aList)
Definition: LinkedList.c:359
int pending_socketcompare(void *a, void *b)
Definition: SocketBuffer.c:388
constexpr size_t count()
Definition: core.h:960
#define FUNC_EXIT_RC(x)
Definition: StackTrace.h:63
#define SOCKET_ERROR
Definition: Socket.h:76
ListElement * ListNextElement(List *aList, ListElement **pos)
Definition: LinkedList.c:411
void SocketBuffer_interrupted(int socket, size_t actual_len)
Definition: SocketBuffer.c:252
#define realloc(a, b)
Definition: Heap.h:49
int ListRemoveItem(List *aList, void *content, int(*callback)(void *, void *))
Definition: LinkedList.c:349
void Log(enum LOG_LEVELS log_level, int msgno, const char *format,...)
Definition: Log.c:417
size_t buflen
Definition: SocketBuffer.h:43
char fixed_header[5]
Definition: SocketBuffer.h:42
void SocketBuffer_terminate(void)
Definition: SocketBuffer.c:133
void SocketBuffer_freeDefQ(void)
Definition: SocketBuffer.c:122
ListElement * ListAppend(List *aList, void *content, size_t size)
Definition: LinkedList.c:90
int SocketBuffer_writeComplete(int socket)
Definition: SocketBuffer.c:411
#define SOCKETBUFFER_INTERRUPTED
Definition: SocketBuffer.h:64
unsigned int index
Definition: SocketBuffer.h:40
int SocketBuffer_getQueuedChar(int socket, char *c)
Definition: SocketBuffer.c:219
#define FUNC_ENTRY
Definition: StackTrace.h:55
iobuf iovecs[5]
Definition: SocketBuffer.h:56
int socketcompare(void *a, void *b)
Definition: SocketBuffer.c:70
size_t datalen
Definition: SocketBuffer.h:43
MQTTClient c
Definition: test10.c:1656
size_t headerlen
Definition: SocketBuffer.h:41
pending_writes * SocketBuffer_getWrite(int socket)
Definition: SocketBuffer.c:399
char * SocketBuffer_complete(int socket)
Definition: SocketBuffer.c:281
static List writes
Definition: SocketBuffer.c:55
List * ListInitialize(void)
Definition: LinkedList.c:52
ListElement * ListFindItem(List *aList, void *content, int(*callback)(void *, void *))
Definition: LinkedList.c:154
void ListFree(List *aList)
Definition: LinkedList.c:381
enum MQTTReasonCodes rc
Definition: test10.c:1112
struct iovec iobuf
Definition: SocketBuffer.h:34
int SocketBuffer_pendingWrite(int socket, int count, iobuf *iovecs, int *frees, size_t total, size_t bytes)
Definition: SocketBuffer.c:349
static List * queues
Definition: SocketBuffer.c:50
Definition: format.h:3618
void SocketBuffer_cleanup(int socket)
Definition: SocketBuffer.c:151


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 03:48:11