Fixes a couple of bugs in the ana implementation.

Testing looks very promising. A lot of code stays for testing,
i.e. all operations copy the buffers (commented the ZERO_COPY
parameters.) and I add the new implementation of compress_config which
is bugged but looks cleaner, you can look at reduced code of the
problem here: http://wesnoth.pastebin.com/MRTiqadK. A ran a few tests
without getting any error.
This commit is contained in:
Guillermo Biset 2010-07-20 22:20:25 +00:00
parent cf7efc91e8
commit 4915512f62
2 changed files with 71 additions and 36 deletions

View File

@ -140,15 +140,15 @@ void ana_receive_handler::handle_message(ana::error_code error_c,
{
boost::mutex::scoped_lock lock( handler_mutex_);
delete receive_timer_;
receive_timer_ = NULL;
(*iterator_)->add_buffer( read_buffer, client );
error_code_ = error_c;
if (! finished_ )
{
delete receive_timer_;
receive_timer_ = NULL;
(*iterator_)->add_buffer( read_buffer, client );
error_code_ = error_c;
finished_ = true;
mutex_.unlock();
}
@ -158,11 +158,11 @@ void ana_receive_handler::handle_disconnect(ana::error_code error_c, ana::net_id
{
boost::mutex::scoped_lock lock( handler_mutex_);
delete receive_timer_;
receive_timer_ = NULL;
if (! finished_ )
{
delete receive_timer_;
receive_timer_ = NULL;
error_code_ = error_c;
finished_ = true;
@ -174,12 +174,13 @@ void ana_receive_handler::handle_timeout(ana::error_code error_code)
{
boost::mutex::scoped_lock lock( handler_mutex_ );
delete receive_timer_;
receive_timer_ = NULL;
if (! finished_ )
{
delete receive_timer_;
receive_timer_ = NULL;
error_code_ = error_code;
finished_ = true;
mutex_.unlock();
}
@ -664,7 +665,7 @@ network::connection ana_network_manager::create_client_and_connect(std::string h
ana_send_handler send_handler;
client->send( ana::buffer( bos.str()), &send_handler, ana::ZERO_COPY );
client->send( ana::buffer( bos.str()), &send_handler); //, ana::ZERO_COPY );
send_handler.wait_completion();
@ -810,14 +811,29 @@ size_t ana_network_manager::number_of_connections() const
return total;
}
/*
std::string ana_network_manager::compress_config( const config& cfg)
{
std::stringstream out;
boost::iostreams::filtering_stream<boost::iostreams::output> filter;
filter.push(boost::iostreams::gzip_compressor());
filter.push(out);
write(filter, cfg);
out.flush();
return out.str( );
}
*/
void ana_network_manager::compress_config( const config& cfg, std::ostringstream& out)
{
boost::iostreams::filtering_stream<boost::iostreams::output> filter;
filter.push(boost::iostreams::gzip_compressor());
filter.push(out);
write(filter, cfg);
out.flush();
}
void ana_network_manager::read_config( const ana::detail::read_buffer& buffer, config& cfg)
{
std::istringstream input( buffer->string() );
@ -829,9 +845,11 @@ size_t ana_network_manager::send_all( const config& cfg, bool zipped )
{
std::cout << "DEBUG: Sending to everybody. " << (zipped ? "Zipped":"Raw") << "\n";
std::ostringstream out;
compress_config(cfg,out);
compress_config( cfg, out);
const std::string output_string = out.str();
// const std::string output_string = compress_config(cfg);
std::set<ana_component*>::iterator it;
@ -842,29 +860,36 @@ size_t ana_network_manager::send_all( const config& cfg, bool zipped )
const size_t necessary_calls = server_manager_[ (*it)->server() ]->client_amount();
ana_send_handler handler( necessary_calls );
(*it)->server()->send_all( ana::buffer( out.str() ), &handler, ana::ZERO_COPY);
(*it)->server()->send_all( ana::buffer( output_string ), &handler); //, ana::ZERO_COPY);
handler.wait_completion(); // the handler will release the mutex after necessary_calls calls
}
else
{
ana_send_handler handler;
(*it)->client()->send( ana::buffer( out.str() ), &handler, ana::ZERO_COPY );
(*it)->client()->send( ana::buffer( output_string ), &handler); //, ana::ZERO_COPY );
handler.wait_completion();
}
}
std::cout << "Sent data.\n";
return out.str().size();
return output_string.size();
}
size_t ana_network_manager::send( network::connection connection_num , const config& cfg, bool /*zipped*/ )
size_t ana_network_manager::send( network::connection connection_num , const config& cfg, bool zipped )
{
if ( ! zipped )
throw std::runtime_error("All send operations should be zipped");
std::cout << "DEBUG: Single send...\n";
std::ostringstream out;
compress_config(cfg, out );
compress_config( cfg, out);
return send_raw_data( out.str().c_str(), out.str().size(), connection_num );
const std::string output_string = out.str();
// const std::string output_string = compress_config(cfg);
return send_raw_data( output_string.c_str(), output_string.size(), connection_num );
}
size_t ana_network_manager::send_raw_data( const char* base_char, size_t size, network::connection connection_num )
@ -881,7 +906,7 @@ size_t ana_network_manager::send_raw_data( const char* base_char, size_t size, n
throw std::runtime_error("Can't send to the server itself.");
ana_send_handler handler;
(*it)->client()->send( ana::buffer( base_char, size ), &handler, ana::ZERO_COPY);
(*it)->client()->send( ana::buffer( base_char, size ), &handler); //, ana::ZERO_COPY);
handler.wait_completion();
if ( handler.error() )
@ -896,7 +921,7 @@ size_t ana_network_manager::send_raw_data( const char* base_char, size_t size, n
if ((*it)->is_server())
{
ana_send_handler handler;
(*it)->server()->send_one( id, ana::buffer( base_char, size ), &handler, ana::ZERO_COPY);
(*it)->server()->send_one( id, ana::buffer( base_char, size ), &handler); //, ana::ZERO_COPY);
handler.wait_completion();
if ( handler.error() )
return 0;
@ -914,7 +939,11 @@ void ana_network_manager::send_all_except(const config& cfg, network::connection
std::cout << "DEBUG: send_all_except " << connection_num << "\n";
std::ostringstream out;
compress_config(cfg, out );
compress_config( cfg, out);
const std::string output_string = out.str();
// const std::string output_string = compress_config(cfg);
ana_component_set::iterator it;
@ -930,7 +959,9 @@ void ana_network_manager::send_all_except(const config& cfg, network::connection
{
const size_t clients_receiving_number = server_manager_[ (*it)->server() ]->client_amount() - 1;
ana_send_handler handler( clients_receiving_number );
(*it)->server()->send_all_except( id_to_avoid, ana::buffer( out.str() ), &handler, ana::ZERO_COPY);
(*it)->server()->send_all_except( id_to_avoid,
ana::buffer( output_string ),
&handler); //, ana::ZERO_COPY);
handler.wait_completion();
}
}
@ -940,7 +971,7 @@ void ana_network_manager::send_all_except(const config& cfg, network::connection
if ( (*it)->get_wesnoth_id() != connection_num )
{
ana_send_handler handler;
(*it)->client()->send( ana::buffer( out.str() ), &handler, ana::ZERO_COPY);
(*it)->client()->send( ana::buffer( output_string ), &handler); //, ana::ZERO_COPY);
handler.wait_completion();
}
}
@ -967,20 +998,22 @@ network::connection ana_network_manager::read_from( const ana_component_set::ite
ana_receive_handler handler(it);
(*it)->listener()->set_listener_handler( &handler );
if ( (*it)->is_server() )
handler.wait_completion( (*it)->server(), timeout_ms );
if ( (*it)->new_buffer_ready() )
return read_from_ready_buffer( it, cfg );
else
handler.wait_completion( (*it)->client(), timeout_ms );
{
if ( (*it)->is_server() )
handler.wait_completion( (*it)->server(), timeout_ms );
else
handler.wait_completion( (*it)->client(), timeout_ms );
}
(*it)->listener()->set_listener_handler( this );
if ( handler.error() )
return 0;
else
{
read_config( (*it)->wait_for_element(), cfg );
return (*it)->get_wesnoth_id();
}
return read_from_ready_buffer( it, cfg );
}
}
@ -1232,8 +1265,9 @@ void ana_network_manager::handle_disconnect(ana::error_code /*error_code*/, ana:
if ( it != components_.end() )
{
std::cout << "DEBUG: Removing bad component.\n";
delete *it;
components_.erase(it);
close_connections_and_cleanup();
// delete *it;
// components_.erase(it);
}
else
{

View File

@ -485,6 +485,7 @@ class ana_network_manager : public ana::listener_handler,
* @param out : The output stream as output.
*/
void compress_config( const config& cfg, std::ostringstream& out);
// std::string compress_config( const config& cfg);
/**
* Read a config object from an input buffer.