diff --git a/server/src/core/client.cpp b/server/src/core/client.cpp index b0b3fd53..a277a6ad 100644 --- a/server/src/core/client.cpp +++ b/server/src/core/client.cpp @@ -29,6 +29,7 @@ Client::Client(Traintastic& server, const std::string& id, boost::asio::ip::tcp::socket socket) : m_server{server}, + m_strand{Traintastic::instance->ioContext()}, m_socket(std::move(socket)), m_id{id}, m_authenticated{false}//, @@ -54,7 +55,11 @@ Client::~Client() void Client::start() { - doReadHeader(); + m_strand.post( + [this]() + { + doReadHeader(); + }); } void Client::doReadHeader() @@ -62,28 +67,29 @@ void Client::doReadHeader() auto self(shared_from_this()); boost::asio::async_read(m_socket, boost::asio::buffer(&m_readBuffer.header, sizeof(m_readBuffer.header)), - [this, self](const boost::system::error_code& e, std::size_t) - { - if(!e) - { - m_readBuffer.message.reset(new Message(m_readBuffer.header)); - if(m_readBuffer.message->dataSize() == 0) + m_strand.wrap( + [this, self](const boost::system::error_code& e, std::size_t) { - if(m_readBuffer.message->command() != Message::Command::Ping) - EventLoop::call(&Client::processMessage, this, m_readBuffer.message); + if(!e) + { + m_readBuffer.message.reset(new Message(m_readBuffer.header)); + if(m_readBuffer.message->dataSize() == 0) + { + if(m_readBuffer.message->command() != Message::Command::Ping) + EventLoop::call(&Client::processMessage, this, m_readBuffer.message); + else + {} // TODO: ping hier replyen + m_readBuffer.message.reset(); + doReadHeader(); + } + else + doReadData(); + } + else if(e == boost::asio::error::eof || e == boost::asio::error::connection_aborted || e == boost::asio::error::connection_reset) + connectionLost(); else - {} // TODO: ping hier replyen - m_readBuffer.message.reset(); - doReadHeader(); - } - else - doReadData(); - } - else if(e == boost::asio::error::eof || e == boost::asio::error::connection_aborted || e == boost::asio::error::connection_reset) - connectionLost(); - else - connectionError("readheader", e.message()); - }); + connectionError("readheader", e.message()); + })); } void Client::doReadData() @@ -91,39 +97,41 @@ void Client::doReadData() auto self(shared_from_this()); boost::asio::async_read(m_socket, boost::asio::buffer(m_readBuffer.message->data(), m_readBuffer.message->dataSize()), - [this, self](const boost::system::error_code& ec, std::size_t) - { - if(!ec) + m_strand.wrap( + [this, self](const boost::system::error_code& ec, std::size_t) { - if(m_readBuffer.message->command() != Message::Command::Ping) - EventLoop::call(&Client::processMessage, this, m_readBuffer.message); + if(!ec) + { + if(m_readBuffer.message->command() != Message::Command::Ping) + EventLoop::call(&Client::processMessage, this, m_readBuffer.message); + else + {} // TODO: ping hier replyen + m_readBuffer.message.reset(); + doReadHeader(); + } + else if(ec == boost::asio::error::eof || ec == boost::asio::error::connection_aborted || ec == boost::asio::error::connection_reset) + connectionLost(); else - {} // TODO: ping hier replyen - m_readBuffer.message.reset(); - doReadHeader(); - } - else if(ec == boost::asio::error::eof || ec == boost::asio::error::connection_aborted || ec == boost::asio::error::connection_reset) - connectionLost(); - else - connectionError("readdata", ec.message()); - }); + connectionError("readdata", ec.message()); + })); } void Client::doWrite() { auto self(shared_from_this()); boost::asio::async_write(m_socket, boost::asio::buffer(**m_writeQueue.front(), m_writeQueue.front()->size()), - [this, self](const boost::system::error_code& ec, std::size_t) - { - if(!ec) + m_strand.wrap( + [this, self](const boost::system::error_code& ec, std::size_t) { - m_writeQueue.pop(); - if(!m_writeQueue.empty()) - doWrite(); - } - else - connectionError("write", ec.message()); - }); + if(!ec) + { + m_writeQueue.pop(); + if(!m_writeQueue.empty()) + doWrite(); + } + else + connectionError("write", ec.message()); + })); } void Client::processMessage(const std::shared_ptr message) @@ -258,10 +266,14 @@ void Client::processMessage(const std::shared_ptr message) void Client::sendMessage(std::unique_ptr message) { - bool wasEmpty = m_writeQueue.empty(); - m_writeQueue.emplace(std::move(message)); - if(wasEmpty) - doWrite(); + m_strand.post( + [this, msg=std::make_shared>(std::move(message))]() + { + const bool wasEmpty = m_writeQueue.empty(); + m_writeQueue.emplace(std::move(*msg)); + if(wasEmpty) + doWrite(); + }); } void Client::connectionLost() diff --git a/server/src/core/client.hpp b/server/src/core/client.hpp index da10def1..2eb468ed 100644 --- a/server/src/core/client.hpp +++ b/server/src/core/client.hpp @@ -50,6 +50,7 @@ class Client : public std::enable_shared_from_this using ObjectHandle = uint32_t; Traintastic& m_server; + boost::asio::io_service::strand m_strand; boost::asio::ip::tcp::socket m_socket; const std::string m_id; struct @@ -57,6 +58,7 @@ class Client : public std::enable_shared_from_this Message::Header header; std::shared_ptr message; } m_readBuffer; + std::mutex m_writeQueueMutex; std::queue> m_writeQueue; bool m_authenticated; std::shared_ptr m_session;