Wesnothd Connection: formatting cleanup

This commit is contained in:
Charles Dang 2017-11-22 20:20:49 +11:00
parent 722f319c50
commit 111ca03510
2 changed files with 179 additions and 140 deletions

View File

@ -19,9 +19,10 @@
#include "serialization/parser.hpp"
#include "utils/functional.hpp"
#include <boost/thread.hpp>
#include <SDL_timer.h>
#include <boost/thread.hpp>
#include <cstdint>
#include <deque>
@ -45,14 +46,13 @@ struct mptest_log
};
}
#define MPTEST_LOG mptest_log mptest_log__( __func__)
#define MPTEST_LOG mptest_log mptest_log__(__func__)
#else
#define MPTEST_LOG ((void)0)
#endif
using boost::system::system_error;
using boost::system::error_code;
using boost::system::system_error;
// main thread
wesnothd_connection::wesnothd_connection(const std::string& host, const std::string& service)
@ -74,21 +74,26 @@ wesnothd_connection::wesnothd_connection(const std::string& host, const std::str
, bytes_read_(0)
{
MPTEST_LOG;
resolver_.async_resolve(
boost::asio::ip::tcp::resolver::query(host, service),
std::bind(&wesnothd_connection::handle_resolve, this, _1, _2)
);
resolver_.async_resolve(boost::asio::ip::tcp::resolver::query(host, service),
std::bind(&wesnothd_connection::handle_resolve, this, _1, _2));
LOG_NW << "Resolving hostname: " << host << '\n';
}
wesnothd_connection::~wesnothd_connection()
{
MPTEST_LOG;
}
// main thread
void wesnothd_connection::handle_resolve(const error_code& ec, resolver::iterator iterator)
{
MPTEST_LOG;
if (ec) {
if(ec) {
LOG_NW << __func__ << " Throwing: " << ec << "\n";
throw system_error(ec);
}
connect(iterator);
}
@ -96,29 +101,22 @@ void wesnothd_connection::handle_resolve(const error_code& ec, resolver::iterato
void wesnothd_connection::connect(resolver::iterator iterator)
{
MPTEST_LOG;
socket_.async_connect(*iterator, std::bind(
&wesnothd_connection::handle_connect, this, _1, iterator)
);
socket_.async_connect(*iterator, std::bind(&wesnothd_connection::handle_connect, this, _1, iterator));
LOG_NW << "Connecting to " << iterator->endpoint().address() << '\n';
}
// main thread
void wesnothd_connection::handle_connect(
const boost::system::error_code& ec,
resolver::iterator iterator
)
void wesnothd_connection::handle_connect(const boost::system::error_code& ec, resolver::iterator iterator)
{
MPTEST_LOG;
if(ec) {
WRN_NW << "Failed to connect to " <<
iterator->endpoint().address() << ": " <<
ec.message() << '\n';
WRN_NW << "Failed to connect to " << iterator->endpoint().address() << ": " << ec.message() << '\n';
socket_.close();
if(++iterator == resolver::iterator()) {
ERR_NW << "Tried all IPs. Giving up" << std::endl;
throw system_error(ec);
}
else {
} else {
connect(iterator);
}
} else {
@ -126,54 +124,58 @@ void wesnothd_connection::handle_connect(
handshake();
}
}
// main thread
void wesnothd_connection::handshake()
{
MPTEST_LOG;
static const uint32_t handshake = 0;
boost::asio::async_write(socket_,
boost::asio::buffer(reinterpret_cast<const char*>(&handshake), 4),
[](const error_code& ec, std::size_t) { if (ec) throw system_error(ec); }
);
boost::asio::async_read(socket_,
boost::asio::buffer(&handshake_response_.binary, 4),
std::bind(&wesnothd_connection::handle_handshake, this, _1)
);
boost::asio::async_write(socket_, boost::asio::buffer(reinterpret_cast<const char*>(&handshake), 4),
[](const error_code& ec, std::size_t) { if(ec) { throw system_error(ec); } });
boost::asio::async_read(socket_, boost::asio::buffer(&handshake_response_.binary, 4),
std::bind(&wesnothd_connection::handle_handshake, this, _1));
}
// main thread
void wesnothd_connection::handle_handshake(const error_code& ec)
{
MPTEST_LOG;
if (ec) {
if(ec) {
LOG_NW << __func__ << " Throwing: " << ec << "\n";
throw system_error(ec);
}
handshake_finished_ = true;
handshake_finished_ = true;
recv();
worker_thread_.reset(new boost::thread( [this](){
worker_thread_.reset(new boost::thread([this]() {
// worker thread
std::shared_ptr<wesnothd_connection> this_ptr = this->shared_from_this();
io_service_.run();
LOG_NW << "wesnothd_connection::io_service::run() returned\n";
} ));
}));
}
// main thread
void wesnothd_connection::send_data(const configr_of& request)
{
MPTEST_LOG;
//C++11 doesnt allow lambda captuting by moving, this could maybe use std::unique_ptr in c++14;
std::shared_ptr<boost::asio::streambuf> buf_ptr( new boost::asio::streambuf());
// C++11 doesn't allow lambda captuting by moving. This could maybe use std::unique_ptr in c++14;
std::shared_ptr<boost::asio::streambuf> buf_ptr(new boost::asio::streambuf());
std::ostream os(buf_ptr.get());
write_gz(os, request);
//TODO: shoudl i capturea shared_ptr for this?
io_service_.post([this, buf_ptr](){
// TODO: should I capture a shared_ptr for this?
io_service_.post([this, buf_ptr]() {
DBG_NW << "In wesnothd_connection::send_data::lambda\n";
send_queue_.push_back(buf_ptr);
if (send_queue_.size() == 1) {
if(send_queue_.size() == 1) {
send();
}
});
@ -186,6 +188,7 @@ void wesnothd_connection::cancel()
if(socket_.is_open()) {
boost::system::error_code ec;
socket_.cancel(ec);
if(ec) {
WRN_NW << "Failed to cancel network operations: " << ec.message() << std::endl;
}
@ -200,99 +203,105 @@ void wesnothd_connection::stop()
io_service_.stop();
}
//worker thread
// worker thread
std::size_t wesnothd_connection::is_write_complete(const boost::system::error_code& ec, size_t bytes_transferred)
{
MPTEST_LOG;
if(ec)
{
if(ec) {
{
std::lock_guard<std::mutex> lock(last_error_mutex_);
last_error_ = ec;
}
LOG_NW << __func__ << " Error: " << ec << "\n";
io_service_.stop();
return bytes_to_write_ - bytes_transferred;
}
bytes_written_ = bytes_transferred;
return bytes_to_write_ - bytes_transferred;
}
//worker thread
void wesnothd_connection::handle_write(
const boost::system::error_code& ec,
std::size_t bytes_transferred
)
// worker thread
void wesnothd_connection::handle_write(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
MPTEST_LOG;
DBG_NW << "Written " << bytes_transferred << " bytes.\n";
send_queue_.pop_front();
if(ec)
{
if(ec) {
{
std::lock_guard<std::mutex> lock(last_error_mutex_);
last_error_ = ec;
}
LOG_NW << __func__ << " Error: " << ec << "\n";
io_service_.stop();
return;
}
if (!send_queue_.empty()) {
if(!send_queue_.empty()) {
send();
}
}
//worker thread
std::size_t wesnothd_connection::is_read_complete(
const boost::system::error_code& ec,
std::size_t bytes_transferred
)
// worker thread
std::size_t wesnothd_connection::is_read_complete(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
//We use custom is_write/read_complete function to be able to see the current progress of the upload/download
// We use custom is_write/read_complete function to be able to see the current progress of the upload/download
MPTEST_LOG;
if(ec)
{
if(ec) {
{
std::lock_guard<std::mutex> lock(last_error_mutex_);
last_error_ = ec;
}
LOG_NW << __func__ << " Error: " << ec << "\n";
io_service_.stop();
return bytes_to_read_ - bytes_transferred;
}
bytes_read_ = bytes_transferred;
if(bytes_transferred < 4) {
return 4;
} else {
if(!bytes_to_read_) {
std::istream is(&read_buf_);
union { char binary[4]; uint32_t num; } data_size;
is.read(data_size.binary, 4);
bytes_to_read_ = ntohl(data_size.num) + 4;
//Close immediately if we receive an invalid length
if (bytes_to_read_ < 4)
bytes_to_read_ = bytes_transferred;
}
return bytes_to_read_ - bytes_transferred;
}
if(!bytes_to_read_) {
std::istream is(&read_buf_);
data_union data_size;
is.read(data_size.binary, 4);
bytes_to_read_ = ntohl(data_size.num) + 4;
// Close immediately if we receive an invalid length
if(bytes_to_read_ < 4) {
bytes_to_read_ = bytes_transferred;
}
}
return bytes_to_read_ - bytes_transferred;
}
//worker thread
void wesnothd_connection::handle_read(
const boost::system::error_code& ec,
std::size_t bytes_transferred
)
// worker thread
void wesnothd_connection::handle_read(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
MPTEST_LOG;
DBG_NW << "Read " << bytes_transferred << " bytes.\n";
bytes_to_read_ = 0;
if(last_error_ && ec != boost::asio::error::eof)
{
if(last_error_ && ec != boost::asio::error::eof) {
{
std::lock_guard<std::mutex> lock(last_error_mutex_);
last_error_ = ec;
}
LOG_NW << __func__ << " Error: " << ec << "\n";
io_service_.stop();
return;
}
@ -304,16 +313,17 @@ void wesnothd_connection::handle_read(
std::lock_guard<std::mutex> lock(recv_queue_mutex_);
recv_queue_.emplace_back(std::move(data));
}
DBG_NW << "Received " << recv_queue_.back() << " bytes.\n";
DBG_NW << "Received " << recv_queue_.back() << " bytes.\n";
recv();
}
//worker thread
// worker thread
void wesnothd_connection::send()
{
MPTEST_LOG;
auto& buf = *send_queue_.front();
size_t buf_size = buf.size();
bytes_to_write_ = buf_size + 4;
bytes_written_ = 0;
@ -321,20 +331,22 @@ void wesnothd_connection::send()
boost::asio::streambuf::const_buffers_type gzipped_data = buf.data();
std::deque<boost::asio::const_buffer> bufs(gzipped_data.begin(), gzipped_data.end());
bufs.push_front(boost::asio::buffer(reinterpret_cast<const char*>(&payload_size_), 4));
boost::asio::async_write(socket_, bufs,
std::bind(&wesnothd_connection::is_write_complete, this, _1, _2),
std::bind(&wesnothd_connection::handle_write, this, _1, _2)
);
std::bind(&wesnothd_connection::handle_write, this, _1, _2));
}
//worker thread
// worker thread
void wesnothd_connection::recv()
{
MPTEST_LOG;
boost::asio::async_read(socket_, read_buf_,
std::bind(&wesnothd_connection::is_read_complete, this, _1, _2),
std::bind(&wesnothd_connection::handle_read, this, _1, _2)
);
std::bind(&wesnothd_connection::handle_read, this, _1, _2));
}
// main thread, during handshake
@ -342,14 +354,15 @@ std::size_t wesnothd_connection::poll()
{
MPTEST_LOG;
assert(!worker_thread_);
try {
return io_service_.poll();
}
catch (const boost::system::system_error& err) {
if( err.code() == boost::asio::error::operation_aborted || err.code() == boost::asio::error::eof) {
} catch(const boost::system::system_error& err) {
if(err.code() == boost::asio::error::operation_aborted || err.code() == boost::asio::error::eof) {
return 1;
}
WRN_NW << __func__ << " Rehrowing: " << err.code() << "\n";
WRN_NW << __func__ << " Rethrowing: " << err.code() << "\n";
throw error(err.code());
}
}
@ -361,7 +374,7 @@ bool wesnothd_connection::receive_data(config& result)
{
std::lock_guard<std::mutex> lock(recv_queue_mutex_);
if (!recv_queue_.empty()) {
if(!recv_queue_.empty()) {
result.swap(recv_queue_.front());
recv_queue_.pop_front();
return true;
@ -370,7 +383,7 @@ bool wesnothd_connection::receive_data(config& result)
{
std::lock_guard<std::mutex> lock(last_error_mutex_);
if (last_error_) {
if(last_error_) {
throw error(last_error_);
}
}
@ -387,7 +400,6 @@ bool wesnothd_connection::wait_and_receive_data(config& data)
return receive_data(data);
};
bool wesnothd_connection::fetch_data_with_loading_screen(config& cfg, loading_stage stage)
{
assert(stage != loading_stage::none);
@ -402,14 +414,8 @@ bool wesnothd_connection::fetch_data_with_loading_screen(config& cfg, loading_st
return res;
}
wesnothd_connection::~wesnothd_connection()
{
MPTEST_LOG;
}
// wesnothd_connection_ptr
wesnothd_connection_ptr& wesnothd_connection_ptr::operator=(wesnothd_connection_ptr&& other)
{
MPTEST_LOG;
@ -417,13 +423,14 @@ wesnothd_connection_ptr& wesnothd_connection_ptr::operator=(wesnothd_connection_
ptr_->stop();
ptr_.reset();
}
ptr_ = std::move(other.ptr_);
return *this;
}
wesnothd_connection_ptr wesnothd_connection::create(const std::string& host, const std::string& service)
{
//can't use make_shared becasue the ctor is private
// Can't use make_shared because the ctor is private
return wesnothd_connection_ptr(std::shared_ptr<wesnothd_connection>(new wesnothd_connection(host, service)));
}

View File

@ -16,7 +16,7 @@
#ifdef _WIN32
// MSVC compilation throws deprecation warnings on boost's use of gethostbyaddr and
// MSVC compilation throws deprecation warnings on boost's use of gethostbyaddr and
// gethostbyname in socket_ops.ipp. This define silences that.
#define _WINSOCK_DEPRECATED_NO_WARNINGS
@ -38,23 +38,30 @@
#endif // endif _WIN32
#include "configr_assign.hpp"
#include "exceptions.hpp"
#include "wesnothd_connection_error.hpp"
#include <boost/asio.hpp>
#include <deque>
#include <list>
#include <mutex>
#include "exceptions.hpp"
#include "wesnothd_connection_error.hpp"
#include "configr_assign.hpp"
namespace boost
{
class thread;
class thread;
}
class config;
class wesnothd_connection_ptr;
enum class loading_stage;
union data_union {
char binary[4];
uint32_t num;
};
/** A class that represents a TCP/IP connection to the wesnothd server. */
class wesnothd_connection : public std::enable_shared_from_this<wesnothd_connection>
{
@ -63,16 +70,20 @@ public:
wesnothd_connection(const wesnothd_connection&) = delete;
wesnothd_connection& operator=(const wesnothd_connection&) = delete;
~wesnothd_connection();
private:
/**
* Constructor.
*
* May only be called via wesnothd_connection_ptr
*
* @param host Name of the host to connect to
* @param service Service identifier such as "80" or "http"
*/
wesnothd_connection(const std::string& host, const std::string& service);
public:
static wesnothd_connection_ptr create(const std::string& host, const std::string& service);
@ -88,48 +99,55 @@ public:
*/
bool wait_and_receive_data(config& data);
/** Handle all pending asynchonous events and return */
/** Handles all pending asynchornous events and returns. */
std::size_t poll();
/** Run asio's event loop
*
* Handle asynchronous events blocking until all asynchronous
* operations have finished
*/
void cancel();
// Destroys this object.
void stop();
/** True if connected and no high-level operation is in progress */
bool handshake_finished() const { return handshake_finished_; }
bool handshake_finished() const
{
return handshake_finished_;
}
std::size_t bytes_to_write() const
{
return bytes_to_write_;
}
std::size_t bytes_written() const
{
return bytes_written_;
}
std::size_t bytes_to_read() const
{
return bytes_to_read_;
}
std::size_t bytes_read() const
{
return bytes_read_;
}
bool has_data_received() const
{
return !recv_queue_.empty();
}
bool is_sending_data() const
{
return !send_queue_.empty();
}
private:
std::unique_ptr<boost::thread> worker_thread_;
boost::asio::io_service io_service_;
typedef boost::asio::ip::tcp::resolver resolver;
resolver resolver_;
@ -137,32 +155,26 @@ private:
socket socket_;
boost::system::error_code last_error_;
std::mutex last_error_mutex_;
bool handshake_finished_;
boost::asio::streambuf read_buf_;
void handle_resolve(
const boost::system::error_code& ec,
resolver::iterator iterator
);
void handle_resolve(const boost::system::error_code& ec, resolver::iterator iterator);
void connect(resolver::iterator iterator);
void handle_connect(
const boost::system::error_code& ec,
resolver::iterator iterator
);
void handle_connect(const boost::system::error_code& ec, resolver::iterator iterator);
void handshake();
void handle_handshake(
const boost::system::error_code& ec
);
union {
char binary[4];
uint32_t num;
} handshake_response_;
void handle_handshake(const boost::system::error_code& ec);
data_union handshake_response_;
std::size_t is_write_complete(const boost::system::error_code& error, std::size_t bytes_transferred);
void handle_write(const boost::system::error_code& ec, std::size_t bytes_transferred);
std::size_t is_read_complete(const boost::system::error_code& error, std::size_t bytes_transferred);
void handle_read(const boost::system::error_code& ec, std::size_t bytes_transferred);
@ -171,38 +183,47 @@ private:
std::list<std::shared_ptr<boost::asio::streambuf>> send_queue_;
std::list<config> recv_queue_;
std::mutex recv_queue_mutex_;
uint32_t payload_size_;
// TODO: do i need to guard the follwing 4 values with a mutex?
std::size_t bytes_to_write_;
std::size_t bytes_written_;
std::size_t bytes_to_read_;
std::size_t bytes_read_;
};
/*
* This class acts like a unique_ptr<wesnothd_connection>, wesnothd_connection objects may only be owned though this pointer.
* The reason why we need this is that wesnothd_connection runs a workerthread so we use a shared_ptr to make sure
* the wesnothd_connection isn't destroyed before the worker thread has finished. When this object is destroed, it calls
* wesnothd_connection::stop() which stops the worker thread which will then destroy the other shared_ptr<wesnothd_connection>
* which destroys the wesnothd_connection object.
/**
* This class acts like a unique_ptr<wesnothd_connection>, wesnothd_connection objects may only be owned though this
* pointer. The reason why we need this is that wesnothd_connection runs a workerthread so we use a shared_ptr to make
* sure the wesnothd_connection isn't destroyed before the worker thread has finished. When this object is destroyed, it
* calls wesnothd_connection::stop() which stops the worker thread which will then destroy the other
* shared_ptr<wesnothd_connection> which destroys the wesnothd_connection object.
*/
class wesnothd_connection_ptr
{
private:
friend class wesnothd_connection;
wesnothd_connection_ptr(std::shared_ptr<wesnothd_connection>&& ptr)
: ptr_(std::move(ptr))
{}
public:
{
}
public:
wesnothd_connection_ptr() = default;
wesnothd_connection_ptr(const wesnothd_connection_ptr&) = delete;
wesnothd_connection_ptr& operator=(const wesnothd_connection_ptr&) = delete;
#if defined(_MSC_VER) &&_MSC_VER == 1800
wesnothd_connection_ptr(wesnothd_connection_ptr&& other) : ptr_(std::move(other.ptr_)) {}
#if defined(_MSC_VER) && _MSC_VER == 1800
wesnothd_connection_ptr(wesnothd_connection_ptr&& other)
: ptr_(std::move(other.ptr_))
{
}
#else
wesnothd_connection_ptr(wesnothd_connection_ptr&&) = default;
#endif
@ -210,22 +231,33 @@ public:
~wesnothd_connection_ptr();
explicit operator bool() const {
explicit operator bool() const
{
return !!ptr_;
}
wesnothd_connection& operator*() {
wesnothd_connection& operator*()
{
return *ptr_;
}
const wesnothd_connection& operator*() const {
const wesnothd_connection& operator*() const
{
return *ptr_;
}
wesnothd_connection* operator->() {
wesnothd_connection* operator->()
{
return ptr_.get();
}
const wesnothd_connection* operator->() const {
const wesnothd_connection* operator->() const
{
return ptr_.get();
}
wesnothd_connection* get() const {
wesnothd_connection* get() const
{
return ptr_.get();
}