From dc98a6e1fb4ccdaa080f2576efe2eb0829863e84 Mon Sep 17 00:00:00 2001 From: Pauli Nieminen Date: Mon, 28 Jan 2008 08:37:15 +0000 Subject: [PATCH] Fixed socket locking problem in network_worker --- src/network_worker.cpp | 71 ++++++++++++++++++++++++------------------ 1 file changed, 40 insertions(+), 31 deletions(-) diff --git a/src/network_worker.cpp b/src/network_worker.cpp index f1dd34cff4c..c0a5a1df16a 100644 --- a/src/network_worker.cpp +++ b/src/network_worker.cpp @@ -38,6 +38,7 @@ #include #include #include +#include #include @@ -114,8 +115,10 @@ struct buffer { }; +typedef boost::shared_ptr bufferPtr; + bool managed = false; // management_mutex -typedef std::vector< buffer * > buffer_set; +typedef std::vector< bufferPtr > buffer_set; buffer_set bufs; // management_mutex struct schema_pair @@ -131,7 +134,7 @@ schema_map schemas; //schemas_mutex typedef std::vector receive_list; receive_list pending_receives; // management_mutex -typedef std::deque received_queue; +typedef std::deque received_queue; received_queue received_data_queue; // receive_mutex enum SOCKET_STATE { SOCKET_READY, SOCKET_LOCKED, SOCKET_ERRORED, SOCKET_INTERRUPT }; @@ -196,7 +199,7 @@ bool receive_with_timeout(TCPsocket s, char* buf, size_t nbytes, int poll_res; do { time_used = SDL_GetTicks() - startTicks; - poll_res = poll(&fd, 1, minimum(15000,timeout_ms - time_used)); + poll_res = poll(&fd, 1, timeout_ms - time_used); } while(poll_res == -1 && errno == EINTR); #elif defined(USE_SELECT) @@ -209,7 +212,7 @@ bool receive_with_timeout(TCPsocket s, char* buf, size_t nbytes, do { time_used = SDL_GetTicks() - startTicks; - time_left = minimum(15000, timeout_ms - time_used); + time_left = timeout_ms - time_used; tv.tv_sec = time_left/1000; tv.tv_usec = time_left % 1000; retval = select(((_TCPsocket*)s)->channel + 1, &readfds, NULL, NULL, &tv); @@ -374,6 +377,17 @@ static SOCKET_STATE receive_buf(TCPsocket sock, std::vector& buf) return SOCKET_READY; } +inline void check_socket_result(TCPsocket& sock, SOCKET_STATE& result) +{ + const threading::lock lock(*management_mutex); + socket_state_map::iterator lock_it = sockets_locked.find(sock); + assert(lock_it != sockets_locked.end()); + lock_it->second = result; + if(result == SOCKET_ERRORED) { + ++socket_errors; + } +} + static int process_queue(void*) { DBG_NW << "thread started...\n"; @@ -383,7 +397,7 @@ static int process_queue(void*) //to receive data from, sent_buf will be NULL. 'sock' will always refer to the socket //that data is being sent to/received from TCPsocket sock = NULL; - buffer *sent_buf = NULL; + bufferPtr sent_buf; { const threading::lock lock(*management_mutex); @@ -462,27 +476,21 @@ static int process_queue(void*) SOCKET_STATE result = SOCKET_READY; std::vector buf; - if(sent_buf != NULL) { + if(sent_buf) { result = send_buffer(sent_buf->sock, sent_buf->config_buf, sent_buf->gzipped); - delete sent_buf; - sent_buf = NULL; + sent_buf.reset(); } else { result = receive_buf(sock,buf); } - { - const threading::lock lock(*management_mutex); - socket_state_map::iterator lock_it = sockets_locked.find(sock); - assert(lock_it != sockets_locked.end()); - lock_it->second = result; - if(result == SOCKET_ERRORED) { - ++socket_errors; - } - if(result != SOCKET_READY || buf.empty()) continue; + if(result != SOCKET_READY || buf.empty()) + { + check_socket_result(sock,result); + continue; } //if we received data, add it to the queue - buffer *received_data = new buffer(sock); + bufferPtr received_data(new buffer(sock)); std::string buffer(buf.begin(), buf.end()); std::istringstream stream(buffer); // Binary wml starts with a char < 4, the first char of a gzip header is 31 @@ -508,6 +516,7 @@ static int process_queue(void*) const threading::lock lock_received(*received_mutex); received_data_queue.push_back(received_data); } + check_socket_result(sock,result); } // unreachable } @@ -610,16 +619,12 @@ TCPsocket get_received_data(TCPsocket sock, config& cfg) } else if (!(*itor)->config_error.empty()){ // throw the error in parent thread std::string error = (*itor)->config_error; - buffer *buf = *itor; received_data_queue.erase(itor); - delete buf; throw config::error(error); } else { cfg = (*itor)->config_buf; const TCPsocket res = (*itor)->sock; - buffer *buf = *itor; received_data_queue.erase(itor); - delete buf; return res; } } @@ -628,7 +633,7 @@ void queue_data(TCPsocket sock,const config& buf, const bool gzipped) { DBG_NW << "queuing data...\n"; - buffer *queued_buf = new buffer(sock); + bufferPtr queued_buf(new buffer(sock)); queued_buf->config_buf = buf; queued_buf->gzipped = gzipped; { @@ -649,12 +654,14 @@ namespace static void remove_buffers(TCPsocket sock) { { - for(buffer_set::iterator i = bufs.begin(), i_end = bufs.end(); i != i_end; ++i) { + for(buffer_set::iterator i = bufs.begin(), i_end = bufs.end(); i != i_end;) { if ((*i)->sock == sock) { - buffer *buf = *i; - bufs.erase(i); - delete buf; + i = bufs.erase(i); + } + else + { + ++i; } } } @@ -664,9 +671,7 @@ static void remove_buffers(TCPsocket sock) for(received_queue::iterator j = received_data_queue.begin(); j != received_data_queue.end(); ) { if((*j)->sock == sock) { - buffer *buf = *j; j = received_data_queue.erase(j); - delete buf; } else { ++j; } @@ -717,17 +722,21 @@ TCPsocket detect_error() { const threading::lock lock(*management_mutex); if(socket_errors > 0) { - for(socket_state_map::iterator i = sockets_locked.begin(); i != sockets_locked.end(); ++i) { + for(socket_state_map::iterator i = sockets_locked.begin(); i != sockets_locked.end();) { if(i->second == SOCKET_ERRORED) { --socket_errors; const TCPsocket sock = i->first; - sockets_locked.erase(i); + sockets_locked.erase(++i); pending_receives.erase(std::remove(pending_receives.begin(),pending_receives.end(),sock),pending_receives.end()); remove_buffers(sock); const threading::lock lock_schema(*schemas_mutex); schemas.erase(sock); return sock; } + else + { + ++i; + } } }