diff --git a/src/network_worker.cpp b/src/network_worker.cpp index 52ba3a9d48f..f21d724ca5b 100644 --- a/src/network_worker.cpp +++ b/src/network_worker.cpp @@ -21,6 +21,7 @@ #include "wassert.hpp" //#include "wesconfig.h" #include "serialization/binary_wml.hpp" +#include "serialization/binary_or_text.hpp" #include #include @@ -128,7 +129,59 @@ threading::condition* cond = NULL; std::map threads; std::vector to_clear; +int receive_bytes(TCPsocket s, char* buf, size_t nbytes) +{ +#ifdef NETWORK_USE_RAW_SOCKETS + const _TCPsocket* sock = reinterpret_cast(s); + return recv(sock->channel, buf, nbytes, MSG_DONTWAIT|MSG_ERRQUEUE); +#else + return SDLNet_TCP_Recv(s, buf, nbytes); +#endif +} + +bool receive_with_timeout(TCPsocket s, char* buf, size_t nbytes, bool update_stats=false, int timeout_ms=60000) +{ + int nsleeps = 0; + while(nbytes > 0) { + const int bytes_read = receive_bytes(s, buf, nbytes); + if(bytes_read == 0) { + return false; + } else if(bytes_read < 0) { +#if defined(EAGAIN) && !defined(__BEOS__) && !defined(_WIN32) + if(errno == EAGAIN) +#elif defined(EWOULDBLOCK) + if(errno == EWOULDBLOCK) +#else + if(false) +#endif + { + //TODO: consider replacing this with a select call + if(++nsleeps == timeout_ms) { + return false; + } + usleep(1000); + } else { + return false; + } + } else { + if(bytes_read > static_cast(nbytes)) { + return false; + } + + nbytes -= bytes_read; + buf += bytes_read; + if(update_stats) { + const threading::lock lock(*global_mutex); + transfer_stats[s].second.transfer(static_cast(bytes_read)); + } + } + } + + return true; +} + static SOCKET_STATE send_buf(TCPsocket sock, config& config_in) { + write_possibly_compressed(std::cerr, config_in, false); #ifdef __BEOS__ int timeout = 15000; #endif @@ -223,13 +276,13 @@ static SOCKET_STATE send_buf(TCPsocket sock, config& config_in) { static SOCKET_STATE receive_buf(TCPsocket sock, std::vector& buf) { char num_buf[4]; - int len = SDLNet_TCP_Recv(sock,num_buf,4); + bool res = receive_with_timeout(sock,num_buf,4,false); - if(len != 4) { + if(!res) { return SOCKET_ERRORED; } - len = SDLNet_Read32(num_buf); + const int len = SDLNet_Read32(num_buf); if(len < 1 || len > 100000000) { return SOCKET_ERRORED; @@ -244,61 +297,11 @@ static SOCKET_STATE receive_buf(TCPsocket sock, std::vector& buf) transfer_stats[sock].second.fresh_current(len); } - while(beg != end) { - { - // if we are receiving the socket is in sockets_locked - // check if it is still locked - const threading::lock lock(*global_mutex); - if(sockets_locked[sock] != SOCKET_LOCKED) { - return SOCKET_ERRORED; - } - } - - const int res = SDLNet_TCP_Recv(sock, beg, end - beg); - if(res <= 0) { -#if defined(EAGAIN) && !defined(__BEOS__) && !defined(_WIN32) - if(errno == EAGAIN) -#elif defined(EWOULDBLOCK) - if(errno == EWOULDBLOCK) -#endif - { - -#ifdef USE_POLL - struct pollfd fd = { ((_TCPsocket*)sock)->channel, POLLIN, 0 }; - int poll_res; - do { - poll_res = poll(&fd, 1, 15000); - } while(poll_res == -1 && errno == EINTR); - - if(poll_res > 0) - continue; -#elif defined(USE_SELECT) - fd_set readfds; - FD_ZERO(&readfds); - FD_SET(((_TCPsocket*)sock)->channel, &readfds); - int retval; - struct timeval tv; - tv.tv_sec = 15; - tv.tv_usec = 0; - - do { - retval = select(((_TCPsocket*)sock)->channel + 1, &readfds, NULL, NULL, &tv); - } while(retval == -1 && errno == EINTR); - - if(retval > 0) - continue; -#endif - } - - return SOCKET_ERRORED; - } - - beg += res; - { - const threading::lock lock(*global_mutex); - transfer_stats[sock].second.transfer(static_cast(res)); - } + res = receive_with_timeout(sock, beg, end - beg, true); + if(!res) { + return SOCKET_ERRORED; } + return SOCKET_READY; }