another experiment, see #33

create connection in server thread and strart reading without using the eventloop
Dieser Commit ist enthalten in:
Reinder Feenstra 2023-11-16 23:27:33 +01:00
Ursprung e4666e3fae
Commit 0337ac6a4d
4 geänderte Dateien mit 52 neuen und 57 gelöschten Zeilen

Datei anzeigen

@ -31,24 +31,16 @@
#define IS_SERVER_THREAD (std::this_thread::get_id() == m_server.threadId())
#endif
Connection::Connection(Server& server, std::unique_ptr<boost::asio::ip::tcp::socket> socket, std::string id)
Connection::Connection(Server& server, boost::asio::ip::tcp::socket socket, std::string id_)
: m_server{server}
, m_socket(std::move(socket))
, m_id{std::move(id)}
, m_authenticated{false}
, id{std::move(id_)}
{
assert(isEventLoopThread());
assert(IS_SERVER_THREAD);
Log::log(m_id, LogMessage::I1003_NEW_CONNECTION);
m_server.m_ioContext.post(
[this]()
{
m_socket->set_option(boost::asio::socket_base::linger(true, 0));
m_socket->set_option(boost::asio::ip::tcp::no_delay(true));
doReadHeader();
});
m_socket.set_option(boost::asio::socket_base::linger(true, 0));
m_socket.set_option(boost::asio::ip::tcp::no_delay(true));
}
Connection::~Connection()
@ -58,13 +50,18 @@ Connection::~Connection()
assert(!m_socket->is_open());
}
void Connection::start()
{
doReadHeader();
}
void Connection::doReadHeader()
{
assert(IS_SERVER_THREAD);
boost::asio::async_read(*m_socket,
boost::asio::async_read(m_socket,
boost::asio::buffer(&m_readBuffer.header, sizeof(m_readBuffer.header)),
[this, weak=weak_from_this()](const boost::system::error_code& ec, std::size_t /*bytesReceived*/)
[this, weak=weak_from_this()](const boost::system::error_code& ec, std::size_t bytesReceived)
{
if(weak.expired())
return;
@ -90,7 +87,7 @@ void Connection::doReadHeader()
}
else if(ec != boost::asio::error::operation_aborted)
{
Log::log(m_id, LogMessage::E1007_SOCKET_READ_FAILED_X, ec);
Log::log(id, LogMessage::E1007_SOCKET_READ_FAILED_X, ec);
EventLoop::call(std::bind(&Connection::disconnect, this));
}
});
@ -100,7 +97,7 @@ void Connection::doReadData()
{
assert(IS_SERVER_THREAD);
boost::asio::async_read(*m_socket,
boost::asio::async_read(m_socket,
boost::asio::buffer(m_readBuffer.message->data(), m_readBuffer.message->dataSize()),
//m_strand.wrap(
[this, weak=weak_from_this()](const boost::system::error_code& ec, std::size_t /*bytesReceived*/)
@ -123,7 +120,7 @@ void Connection::doReadData()
}
else if(ec != boost::asio::error::operation_aborted)
{
Log::log(m_id, LogMessage::E1007_SOCKET_READ_FAILED_X, ec);
Log::log(id, LogMessage::E1007_SOCKET_READ_FAILED_X, ec);
EventLoop::call(std::bind(&Connection::disconnect, this));
}
});
@ -133,7 +130,7 @@ void Connection::doWrite()
{
assert(IS_SERVER_THREAD);
boost::asio::async_write(*m_socket, boost::asio::buffer(**m_writeQueue.front(), m_writeQueue.front()->size()),
boost::asio::async_write(m_socket, boost::asio::buffer(**m_writeQueue.front(), m_writeQueue.front()->size()),
[this, weak=weak_from_this()](const boost::system::error_code& ec, std::size_t /*bytesTransferred*/)
{
if(weak.expired())
@ -147,7 +144,7 @@ void Connection::doWrite()
}
else if(ec != boost::asio::error::operation_aborted)
{
Log::log(m_id, LogMessage::E1006_SOCKET_WRITE_FAILED_X, ec);
Log::log(id, LogMessage::E1006_SOCKET_WRITE_FAILED_X, ec);
EventLoop::call(std::bind(&Connection::disconnect, this));
}
});
@ -209,7 +206,7 @@ void Connection::connectionLost()
{
assert(isEventLoopThread());
Log::log(m_id, LogMessage::I1004_CONNECTION_LOST);
Log::log(id, LogMessage::I1004_CONNECTION_LOST);
disconnect();
}
@ -222,13 +219,13 @@ void Connection::disconnect()
m_server.m_ioContext.post(
[this]()
{
if(m_socket->is_open())
if(m_socket.is_open())
{
boost::system::error_code ec;
m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
if(ec && ec != boost::asio::error::not_connected)
Log::log(m_id, LogMessage::E1005_SOCKET_SHUTDOWN_FAILED_X, ec);
m_socket->close();
Log::log(id, LogMessage::E1005_SOCKET_SHUTDOWN_FAILED_X, ec);
m_socket.close();
}
EventLoop::call(

Datei anzeigen

@ -40,8 +40,7 @@ class Connection : public std::enable_shared_from_this<Connection>
using ObjectHandle = uint32_t;
Server& m_server;
std::unique_ptr<boost::asio::ip::tcp::socket> m_socket;
const std::string m_id;
boost::asio::ip::tcp::socket m_socket;
struct
{
Message::Header header;
@ -62,9 +61,13 @@ class Connection : public std::enable_shared_from_this<Connection>
void connectionLost();
public:
Connection(Server& server, std::unique_ptr<boost::asio::ip::tcp::socket> socket, std::string id);
const std::string id;
Connection(Server& server, boost::asio::ip::tcp::socket socket, std::string id_);
virtual ~Connection();
void start();
void disconnect();
};

Datei anzeigen

@ -199,43 +199,39 @@ void Server::doAccept()
{
assert(IS_SERVER_THREAD);
assert(!m_socketTCP);
m_socketTCP = std::make_unique<boost::asio::ip::tcp::socket>(m_ioContext);
m_acceptor.async_accept(*m_socketTCP,
[this](boost::system::error_code ec)
m_acceptor.async_accept(
[this](boost::system::error_code ec, boost::asio::ip::tcp::socket socket)
{
if(!ec)
{
const auto connectionId = std::string("connection[")
.append(m_socketTCP->remote_endpoint().address().to_string())
.append(":")
.append(std::to_string(m_socketTCP->remote_endpoint().port()))
.append("]");
try
{
const auto connectionId = std::string("connection[")
.append(socket.remote_endpoint().address().to_string())
.append(":")
.append(std::to_string(socket.remote_endpoint().port()))
.append("]");
EventLoop::call(
[this, connectionId]()
{
try
{
m_connections.emplace_back(std::make_shared<Connection>(*this, std::move(m_socketTCP), connectionId));
}
catch(const std::exception& e)
{
Log::log(id, LogMessage::C1002_CREATING_CONNECTION_FAILED_X, e.what());
}
auto connection = std::make_shared<Connection>(*this, std::move(socket), connectionId);
connection->start();
m_ioContext.post(
[this]()
{
doAccept();
});
});
EventLoop::call(
[this, connection]()
{
Log::log(connection->id, LogMessage::I1003_NEW_CONNECTION);
m_connections.push_back(connection);
});
}
catch(const std::exception& e)
{
Log::log(id, LogMessage::C1002_CREATING_CONNECTION_FAILED_X, e.what());
}
doAccept();
}
else
{
Log::log(id, LogMessage::E1004_TCP_ACCEPT_ERROR_X, ec.message());
m_socketTCP.reset();
}
});
}

Datei anzeigen

@ -42,7 +42,6 @@ class Server : public std::enable_shared_from_this<Server>
boost::asio::io_context m_ioContext;
std::thread m_thread;
boost::asio::ip::tcp::acceptor m_acceptor;
std::unique_ptr<boost::asio::ip::tcp::socket> m_socketTCP;
boost::asio::ip::udp::socket m_socketUDP;
std::array<char, 8> m_udpBuffer;
boost::asio::ip::udp::endpoint m_remoteEndpoint;