client: implemend strand to sync io

Dieser Commit ist enthalten in:
Reinder Feenstra 2020-08-25 23:23:12 +02:00
Ursprung 14a0c25d11
Commit c041bc3d6a
2 geänderte Dateien mit 63 neuen und 49 gelöschten Zeilen

Datei anzeigen

@ -29,6 +29,7 @@
Client::Client(Traintastic& server, const std::string& id, boost::asio::ip::tcp::socket socket) :
m_server{server},
m_strand{Traintastic::instance->ioContext()},
m_socket(std::move(socket)),
m_id{id},
m_authenticated{false}//,
@ -54,7 +55,11 @@ Client::~Client()
void Client::start()
{
doReadHeader();
m_strand.post(
[this]()
{
doReadHeader();
});
}
void Client::doReadHeader()
@ -62,28 +67,29 @@ void Client::doReadHeader()
auto self(shared_from_this());
boost::asio::async_read(m_socket,
boost::asio::buffer(&m_readBuffer.header, sizeof(m_readBuffer.header)),
[this, self](const boost::system::error_code& e, std::size_t)
{
if(!e)
{
m_readBuffer.message.reset(new Message(m_readBuffer.header));
if(m_readBuffer.message->dataSize() == 0)
m_strand.wrap(
[this, self](const boost::system::error_code& e, std::size_t)
{
if(m_readBuffer.message->command() != Message::Command::Ping)
EventLoop::call(&Client::processMessage, this, m_readBuffer.message);
if(!e)
{
m_readBuffer.message.reset(new Message(m_readBuffer.header));
if(m_readBuffer.message->dataSize() == 0)
{
if(m_readBuffer.message->command() != Message::Command::Ping)
EventLoop::call(&Client::processMessage, this, m_readBuffer.message);
else
{} // TODO: ping hier replyen
m_readBuffer.message.reset();
doReadHeader();
}
else
doReadData();
}
else if(e == boost::asio::error::eof || e == boost::asio::error::connection_aborted || e == boost::asio::error::connection_reset)
connectionLost();
else
{} // TODO: ping hier replyen
m_readBuffer.message.reset();
doReadHeader();
}
else
doReadData();
}
else if(e == boost::asio::error::eof || e == boost::asio::error::connection_aborted || e == boost::asio::error::connection_reset)
connectionLost();
else
connectionError("readheader", e.message());
});
connectionError("readheader", e.message());
}));
}
void Client::doReadData()
@ -91,39 +97,41 @@ void Client::doReadData()
auto self(shared_from_this());
boost::asio::async_read(m_socket,
boost::asio::buffer(m_readBuffer.message->data(), m_readBuffer.message->dataSize()),
[this, self](const boost::system::error_code& ec, std::size_t)
{
if(!ec)
m_strand.wrap(
[this, self](const boost::system::error_code& ec, std::size_t)
{
if(m_readBuffer.message->command() != Message::Command::Ping)
EventLoop::call(&Client::processMessage, this, m_readBuffer.message);
if(!ec)
{
if(m_readBuffer.message->command() != Message::Command::Ping)
EventLoop::call(&Client::processMessage, this, m_readBuffer.message);
else
{} // TODO: ping hier replyen
m_readBuffer.message.reset();
doReadHeader();
}
else if(ec == boost::asio::error::eof || ec == boost::asio::error::connection_aborted || ec == boost::asio::error::connection_reset)
connectionLost();
else
{} // TODO: ping hier replyen
m_readBuffer.message.reset();
doReadHeader();
}
else if(ec == boost::asio::error::eof || ec == boost::asio::error::connection_aborted || ec == boost::asio::error::connection_reset)
connectionLost();
else
connectionError("readdata", ec.message());
});
connectionError("readdata", ec.message());
}));
}
void Client::doWrite()
{
auto self(shared_from_this());
boost::asio::async_write(m_socket, boost::asio::buffer(**m_writeQueue.front(), m_writeQueue.front()->size()),
[this, self](const boost::system::error_code& ec, std::size_t)
{
if(!ec)
m_strand.wrap(
[this, self](const boost::system::error_code& ec, std::size_t)
{
m_writeQueue.pop();
if(!m_writeQueue.empty())
doWrite();
}
else
connectionError("write", ec.message());
});
if(!ec)
{
m_writeQueue.pop();
if(!m_writeQueue.empty())
doWrite();
}
else
connectionError("write", ec.message());
}));
}
void Client::processMessage(const std::shared_ptr<Message> message)
@ -258,10 +266,14 @@ void Client::processMessage(const std::shared_ptr<Message> message)
void Client::sendMessage(std::unique_ptr<Message> message)
{
bool wasEmpty = m_writeQueue.empty();
m_writeQueue.emplace(std::move(message));
if(wasEmpty)
doWrite();
m_strand.post(
[this, msg=std::make_shared<std::unique_ptr<Message>>(std::move(message))]()
{
const bool wasEmpty = m_writeQueue.empty();
m_writeQueue.emplace(std::move(*msg));
if(wasEmpty)
doWrite();
});
}
void Client::connectionLost()

Datei anzeigen

@ -50,6 +50,7 @@ class Client : public std::enable_shared_from_this<Client>
using ObjectHandle = uint32_t;
Traintastic& m_server;
boost::asio::io_service::strand m_strand;
boost::asio::ip::tcp::socket m_socket;
const std::string m_id;
struct
@ -57,6 +58,7 @@ class Client : public std::enable_shared_from_this<Client>
Message::Header header;
std::shared_ptr<Message> message;
} m_readBuffer;
std::mutex m_writeQueueMutex;
std::queue<std::unique_ptr<Message>> m_writeQueue;
bool m_authenticated;
std::shared_ptr<Session> m_session;