58 #if defined(_WIN32) || defined(_WIN64) 77 #if defined(_WIN32) || defined(_WIN64) 81 rc = ioctl(sock, FIONBIO, &flag);
86 if ((flags = fcntl(sock, F_GETFL, 0)))
88 rc = fcntl(sock, F_SETFL, flags | O_NONBLOCK);
105 #if defined(_WIN32) || defined(_WIN64) 106 err = WSAGetLastError();
110 if (err != EINTR && err != EAGAIN && err != EINPROGRESS && err != EWOULDBLOCK)
112 if (strcmp(aString,
"shutdown") != 0 || (err != ENOTCONN && err != ECONNRESET))
113 Log(
TRACE_MINIMUM, -1,
"Socket error %s(%d) in %s for socket %d", strerror(err), err, aString, sock);
124 #if defined(_WIN32) || defined(_WIN64) 125 WORD winsockVer = 0x0202;
129 WSAStartup(winsockVer, &wsd);
132 signal(SIGPIPE, SIG_IGN);
140 FD_ZERO(&(mod_s.
rset));
158 #if defined(_WIN32) || defined(_WIN64) 178 Log(
LOG_ERROR, -1,
"addSocket: exceeded FD_SETSIZE %d", FD_SETSIZE);
183 int* pnewSd = (
int*)
malloc(
sizeof(newSd));
205 Log(
LOG_ERROR, -1,
"addSocket: socket %d already in the list", newSd);
245 static struct timeval zero = {0L, 0L};
246 static struct timeval one = {1L, 0L};
247 struct timeval timeout = one;
272 memcpy((
void*)&(pwset), (
void*)&(mod_s.
pending_wset),
sizeof(pwset));
282 Log(
TRACE_MAX, -1,
"Return code %d from read select", rc);
297 Log(
TRACE_MAX, -1,
"Return code %d from write select", rc1);
299 if (rc == 0 && rc1 == 0)
340 if ((rc = recv(socket, c, (
size_t)1, 0)) ==
SOCKET_ERROR)
343 if (err == EWOULDBLOCK || err == EAGAIN)
383 if ((*rc = recv(socket, buf + (*actual_len), (
int)(bytes - (*actual_len)), 0)) ==
SOCKET_ERROR)
386 if (*rc != EAGAIN && *rc != EWOULDBLOCK)
400 if (*actual_len == bytes)
405 Log(
TRACE_MAX, -1,
"%d bytes expected but %d bytes now received", (
int)bytes, (
int)*actual_len);
439 #if defined(_WIN32) || defined(_WIN64) 440 rc = WSASend(socket, iovecs, count, (LPDWORD)bytes, 0, NULL, NULL);
444 if (err == EWOULDBLOCK || err == EAGAIN)
452 #if defined(TCPSOCKET_INTERRUPTED_TESTING) 454 if (++i >= 10 && i < 21)
458 printf(
"Deliberately simulating TCPSOCKET_INTERRUPTED\n");
463 printf(
"Deliberately simulating SOCKET_ERROR\n");
469 printf(
"Shutdown socket\n");
470 shutdown(socket, SHUT_WR);
476 rc = writev(socket, iovecs, count);
480 if (err == EWOULDBLOCK || err == EAGAIN)
485 #if defined(TCPSOCKET_INTERRUPTED_TESTING) 507 unsigned long bytes = 0L;
511 size_t total = buf0len;
516 Log(
LOG_SEVERE, -1,
"Trying to write to socket %d for which there is already pending output", socket);
521 for (i = 0; i < bufs.
count; i++)
524 iovecs[0].iov_base = buf0;
525 iovecs[0].iov_len = (
ULONG)buf0len;
527 for (i = 0; i < bufs.
count; i++)
529 iovecs[i+1].iov_base = bufs.
buffers[i];
531 frees1[i+1] = bufs.
frees[i];
540 int* sockmem = (
int*)
malloc(
sizeof(
int));
547 Log(
TRACE_MIN, -1,
"Partial write: %lu bytes of %lu actually written on socket %d",
548 bytes, total, socket);
604 #if defined(_WIN32) || defined(_WIN64) 612 if ((rc = recv(socket, NULL, (
size_t)0, 0)) ==
SOCKET_ERROR)
644 Log(
LOG_ERROR, -1,
"Failed to remove socket %d", socket);
645 if (socket + 1 >= mod_s.
maxfdp1)
668 #if defined(__GNUC__) && defined(__linux__) 669 int Socket_new(
const char* addr,
size_t addr_len,
int port,
int*
sock,
long timeout)
674 int type = SOCK_STREAM;
676 struct sockaddr_in address;
677 #if defined(AF_INET6) 678 struct sockaddr_in6 address6;
681 #if defined(_WIN32) || defined(_WIN64) 684 sa_family_t family = AF_INET;
686 struct addrinfo *
result = NULL;
687 struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL};
691 memset(&address6,
'\0',
sizeof(address6));
699 if ((addr_mem =
malloc( addr_len + 1u )) == NULL)
704 memcpy( addr_mem, addr, addr_len );
705 addr_mem[addr_len] =
'\0';
712 struct gaicb ar = {addr_mem, NULL, &hints, NULL};
713 struct gaicb *reqs[] = {&ar};
715 unsigned long int seconds = timeout / 1000L;
716 unsigned long int nanos = (timeout - (seconds * 1000L)) * 1000000L;
717 struct timespec timeoutspec = {seconds, nanos};
719 rc = getaddrinfo_a(GAI_NOWAIT, reqs, 1, NULL);
721 rc = gai_suspend((
const struct gaicb*
const *) reqs, 1, &timeoutspec);
725 rc = gai_error(reqs[0]);
726 result = ar.ar_result;
729 rc = getaddrinfo(addr_mem, NULL, &hints, &result);
734 struct addrinfo* res =
result;
738 if (res->ai_family == AF_INET || res->ai_next == NULL)
746 #if defined(AF_INET6) 747 if (res->ai_family == AF_INET6)
749 address6.sin6_port = htons(port);
750 address6.sin6_family = family = AF_INET6;
751 memcpy(&address6.sin6_addr, &((
struct sockaddr_in6*)(res->ai_addr))->sin6_addr,
sizeof(address6.sin6_addr));
755 if (res->ai_family == AF_INET)
757 memset(&address.sin_zero, 0,
sizeof(address.sin_zero));
758 address.sin_port = htons(port);
759 address.sin_family = family = AF_INET;
760 address.sin_addr = ((
struct sockaddr_in*)(res->ai_addr))->sin_addr;
765 freeaddrinfo(result);
768 Log(
LOG_ERROR, -1,
"getaddrinfo failed for addr %s with rc %d", addr_mem, rc);
771 Log(
LOG_ERROR, -1,
"%s is not a valid IP address", addr_mem);
774 *sock = (int)
socket(family, type, 0);
779 #if defined(NOSIGPIPE) 782 if (setsockopt(*sock, SOL_SOCKET, SO_NOSIGPIPE, (
void*)&opt,
sizeof(opt)) != 0)
783 Log(
LOG_ERROR, -1,
"Could not set SO_NOSIGPIPE for socket %d", *sock);
789 #if defined(SMALL_TCP_BUFFER_TESTING) 793 printf(
"Setting optsend to %d\n", optsend);
794 if (setsockopt(*sock, SOL_SOCKET, SO_SNDBUF, (
void*)&optsend,
sizeof(optsend)) != 0)
795 Log(
LOG_ERROR, -1,
"Could not set SO_SNDBUF for socket %d", *sock);
798 Log(
TRACE_MIN, -1,
"New socket %d for %s, port %d", *sock, addr, port);
804 if (family == AF_INET)
805 rc = connect(*sock, (
struct sockaddr*)&address,
sizeof(address));
806 #if defined(AF_INET6) 808 rc = connect(*sock, (
struct sockaddr*)&address6,
sizeof(address6));
812 if (rc == EINPROGRESS || rc == EWOULDBLOCK)
814 int* pnewSd = (
int*)
malloc(
sizeof(
int));
833 if (rc != 0 && (rc != EINPROGRESS) && (rc != EWOULDBLOCK))
854 writecomplete = mywritecomplete;
868 unsigned long curbuflen = 0L,
884 for (i = 0; i < pw->
count; ++i)
886 if (pw->
bytes <= curbuflen)
889 iovecs1[++curbuf].iov_len = pw->
iovecs[i].iov_len;
890 iovecs1[curbuf].iov_base = pw->
iovecs[i].iov_base;
892 else if (pw->
bytes < curbuflen + pw->
iovecs[i].iov_len)
895 size_t offset = pw->
bytes - curbuflen;
896 iovecs1[++curbuf].iov_len = pw->
iovecs[i].iov_len - (
ULONG)offset;
897 iovecs1[curbuf].iov_base = (
char*)pw->
iovecs[i].iov_base + offset;
900 curbuflen += pw->
iovecs[i].iov_len;
908 for (i = 0; i < pw->
count; i++)
913 pw->
iovecs[i].iov_base = NULL;
917 Log(
TRACE_MIN, -1,
"ContinueWrite: partial write now complete for socket %d", socket);
922 Log(
TRACE_MIN, -1,
"ContinueWrite wrote +%lu bytes on socket %d", bytes, socket);
927 for (i = 0; i < pw->
count; i++)
932 pw->
iovecs[i].iov_base = NULL;
964 for (i = 0; i < pw->
count; i++)
968 Log(
TRACE_MIN, -1,
"Cleaning in abortWrite for socket %d", socket);
989 while (curpending && curpending->
content)
997 Log(
LOG_SEVERE, -1,
"Failed to remove pending write from socket buffer list");
1001 Log(
LOG_SEVERE, -1,
"Failed to remove pending write from list");
1028 #define ADDRLEN INET6_ADDRSTRLEN+1 1035 #if defined(_WIN32) || defined(_WIN64) 1038 if (WSAAddressToStringW(sa,
sizeof(
struct sockaddr_in6), NULL, buf, (LPDWORD)&buflen) ==
SOCKET_ERROR)
1041 wcstombs(addr_string, buf,
sizeof(addr_string));
1045 struct sockaddr_in *sin = (
struct sockaddr_in *)sa;
1046 inet_ntop(sin->sin_family, &sin->sin_addr, addr_string,
ADDRLEN);
1047 sprintf(&addr_string[strlen(addr_string)],
":%d", ntohs(sin->sin_port));
1060 struct sockaddr_in6 sa;
1061 socklen_t sal =
sizeof(sa);
1063 if (getpeername(sock, (
struct sockaddr*)&sa, &sal) ==
SOCKET_ERROR)
1073 #if defined(Socket_TEST) 1075 int main(
int argc,
char *argv[])
1077 Socket_connect(
"127.0.0.1", 1883);
1078 Socket_connect(
"localhost", 1883);
1079 Socket_connect(
"loadsadsacalhost", 1883);
char * SocketBuffer_getQueuedData(int socket, size_t bytes, size_t *actual_len)
int Socket_new(const char *addr, size_t addr_len, int port, int *sock)
int Socket_getch(int socket, char *c)
void select(lua_State *L, Fx &&fx, Args &&...args)
int SocketBuffer_initialize(void)
int Socket_noPendingWrites(int socket)
void Socket_outTerminate(void)
#define PAHO_MEMORY_ERROR
void SocketBuffer_queueChar(int socket, char c)
int Socket_getReadySocket(int more_work, struct timeval *tp, mutex_type mutex)
int Thread_lock_mutex(mutex_type mutex)
int isReady(int socket, fd_set *read_set, fd_set *write_set)
int Socket_error(char *aString, int sock)
int ListRemove(List *aList, void *content)
void Socket_writeComplete(int socket, int rc)
struct ListElementStruct * next
void Socket_setWriteCompleteCallback(Socket_writeComplete *mywritecomplete)
void Socket_clearPendingWrite(int socket)
ListElement * ListNextElement(List *aList, ListElement **pos)
char * Socket_getaddrname(struct sockaddr *sa, int sock)
void SocketBuffer_interrupted(int socket, size_t actual_len)
int ListRemoveItem(List *aList, void *content, int(*callback)(void *, void *))
int Thread_unlock_mutex(mutex_type mutex)
ListElement * cur_clientsds
void Log(enum LOG_LEVELS log_level, int msgno, const char *format,...)
void Socket_addPendingWrite(int socket)
int Socket_continueWrite(int socket)
void SocketBuffer_terminate(void)
int Socket_addSocket(int newSd)
char * Socket_getdata(int socket, size_t bytes, size_t *actual_len, int *rc)
ListElement * ListAppend(List *aList, void *content, size_t size)
int Socket_setnonblocking(int sock)
int SocketBuffer_writeComplete(int socket)
#define SOCKETBUFFER_INTERRUPTED
int SocketBuffer_getQueuedChar(int socket, char *c)
int Socket_continueWrites(fd_set *pwset)
char * Socket_getpeer(int sock)
int Socket_abortWrite(int socket)
pending_writes * SocketBuffer_getWrite(int socket)
char * SocketBuffer_complete(int socket)
int Socket_writev(int socket, iobuf *iovecs, int count, unsigned long *bytes)
int Socket_putdatas(int socket, char *buf0, size_t buf0len, PacketBuffers bufs)
List * ListInitialize(void)
#define TCPSOCKET_COMPLETE
ListElement * ListFindItem(List *aList, void *content, int(*callback)(void *, void *))
int Socket_close_only(int socket)
void Socket_close(int socket)
int intcompare(void *a, void *b)
void ListFree(List *aList)
int main(int argc, char **argv)
void Socket_outInitialize(void)
int SocketBuffer_pendingWrite(int socket, int count, iobuf *iovecs, int *frees, size_t total, size_t bytes)
static Socket_writeComplete * writecomplete
#define TCPSOCKET_INTERRUPTED
void SocketBuffer_cleanup(int socket)
int SSLSocket_continueWrite(pending_writes *pw)