moved server to seperate object and thread, eventloop now runs in main thread

Dieser Commit ist enthalten in:
Reinder Feenstra 2022-05-22 11:33:25 +02:00
Ursprung 95b4d88693
Commit cdc5dab129
10 geänderte Dateien mit 463 neuen und 356 gelöschten Zeilen

Datei anzeigen

@ -25,7 +25,7 @@
#include <queue>
#include <mutex>
#include <thread>
#include <atomic>
#include <condition_variable>
#include <functional>
#include <iostream>
@ -34,47 +34,34 @@
class EventLoop
{
private:
inline static EventLoop* s_instance;
std::queue<std::function<void()>> m_queue;
std::mutex m_queueMutex;
std::condition_variable m_condition;
bool m_run;
std::thread m_thread;
inline static std::queue<std::function<void()>> s_queue;
inline static std::mutex s_queueMutex;
inline static std::condition_variable s_condition;
inline static std::atomic<bool> s_run;
EventLoop() :
m_run{true},
m_thread(&EventLoop::run, this)
{
}
EventLoop() = default;
~EventLoop() = default;
EventLoop(const EventLoop&) = delete;
EventLoop& operator =(const EventLoop&) = delete;
~EventLoop()
public:
inline static const std::thread::id threadId = std::this_thread::get_id();
static void exec()
{
}
std::unique_lock<std::mutex> lock(s_queueMutex);
void add(std::function<void()>&& f)
{
std::lock_guard<std::mutex> lock(m_queueMutex);
m_queue.emplace(f);
m_condition.notify_one();
}
void run()
{
setThreadName("eventloop");
std::unique_lock<std::mutex> lock(m_queueMutex);
while(m_run)
s_run = true;
while(s_run)
{
if(m_queue.empty())
m_condition.wait(lock, [this]{ return !m_queue.empty(); });
if(s_queue.empty())
s_condition.wait(lock, []{ return !s_queue.empty(); });
if(m_queue.empty())
if(s_queue.empty())
continue; // a suspisius wakeup may occur
std::function<void()>& f{m_queue.front()};
std::function<void()>& f{s_queue.front()};
lock.unlock();
@ -89,34 +76,27 @@ class EventLoop
lock.lock();
m_queue.pop();
s_queue.pop();
}
}
void exit()
{
add([this](){ m_run = false; });
m_thread.join();
}
public:
static void start()
{
s_instance = new EventLoop();
}
static void stop()
{
s_instance->exit();
delete s_instance;
s_instance = nullptr;
s_run = false;
}
template<typename _Callable, typename... _Args>
inline static void call(_Callable&& __f, _Args&&... __args)
{
s_instance->add(std::bind(__f, __args...));
std::lock_guard<std::mutex> lock(s_queueMutex);
s_queue.emplace(std::bind(__f, __args...));
s_condition.notify_one();
}
};
inline bool isEventLoopThread()
{
return std::this_thread::get_id() == EventLoop::threadId;
}
#endif

Datei anzeigen

@ -25,7 +25,6 @@
#include <csignal>
#endif
#include "options.hpp"
#include "core/eventloop.hpp"
#include "traintastic/traintastic.hpp"
#include "log/log.hpp"
#include <traintastic/locale/locale.hpp>
@ -46,12 +45,8 @@ void signalHandler(int signum)
signal(SIGINT, SIG_DFL);
signal(SIGQUIT, SIG_DFL);
EventLoop::call(
[signum]()
{
Log::log(*Traintastic::instance, LogMessage::N1001_RECEIVED_SIGNAL_X, std::string_view{strsignal(signum)});
Traintastic::instance->exit();
});
Log::log(*Traintastic::instance, LogMessage::N1001_RECEIVED_SIGNAL_X, std::string_view{strsignal(signum)});
Traintastic::instance->exit();
break;
}
}
@ -174,7 +169,6 @@ int main(int argc, char* argv[])
Log::disableFileLogger();
}
EventLoop::start();
#ifdef WIN32
if(options.tray)
Windows::TrayIcon::add();
@ -209,7 +203,6 @@ int main(int argc, char* argv[])
if(options.tray)
Windows::TrayIcon::remove();
#endif
EventLoop::stop();
}
while(restart);

Datei anzeigen

@ -21,21 +21,24 @@
*/
#include "client.hpp"
//#include "console.hpp"
//#include "objectregistry.hpp"
#include "server.hpp"
#include "../traintastic/traintastic.hpp"
#include "../core/eventloop.hpp"
#include "session.hpp"
#include "../log/log.hpp"
Client::Client(Traintastic& server, const std::string& id, boost::asio::ip::tcp::socket socket) :
m_server{server},
m_strand{Traintastic::instance->ioContext()},
m_socket(std::move(socket)),
m_id{id},
m_authenticated{false}//,
//m_lastObjectHandle{0}
#ifndef NDEBUG
#define IS_SERVER_THREAD (std::this_thread::get_id() == m_server.threadId())
#endif
Client::Client(Server& server, std::string id, boost::asio::ip::tcp::socket socket)
: m_server{server}
, m_socket(std::move(socket))
, m_id{std::move(id)}
, m_authenticated{false}
{
assert(IS_SERVER_THREAD);
m_socket.set_option(boost::asio::socket_base::linger(true, 0));
m_socket.set_option(boost::asio::ip::tcp::no_delay(true));
@ -44,20 +47,22 @@ Client::Client(Traintastic& server, const std::string& id, boost::asio::ip::tcp:
Client::~Client()
{
assert(IS_SERVER_THREAD);
stop();
}
void Client::start()
{
m_strand.post(
[this]()
{
doReadHeader();
});
assert(IS_SERVER_THREAD);
doReadHeader();
}
void Client::stop()
{
assert(IS_SERVER_THREAD);
m_session.reset();
if(!m_socket.is_open())
@ -72,47 +77,54 @@ void Client::stop()
void Client::doReadHeader()
{
auto self(shared_from_this());
assert(IS_SERVER_THREAD);
boost::asio::async_read(m_socket,
boost::asio::buffer(&m_readBuffer.header, sizeof(m_readBuffer.header)),
m_strand.wrap(
[this, self](const boost::system::error_code& ec, std::size_t /*bytesReceived*/)
[this, weak=weak_from_this()](const boost::system::error_code& ec, std::size_t /*bytesReceived*/)
{
if(weak.expired())
return;
if(!ec)
{
if(!ec)
m_readBuffer.message.reset(new Message(m_readBuffer.header));
if(m_readBuffer.message->dataSize() == 0)
{
m_readBuffer.message.reset(new Message(m_readBuffer.header));
if(m_readBuffer.message->dataSize() == 0)
{
if(m_readBuffer.message->command() != Message::Command::Ping)
EventLoop::call(&Client::processMessage, this, m_readBuffer.message);
else
{} // TODO: ping hier replyen
m_readBuffer.message.reset();
doReadHeader();
}
if(m_readBuffer.message->command() != Message::Command::Ping)
EventLoop::call(&Client::processMessage, this, m_readBuffer.message);
else
doReadData();
{} // TODO: ping hier replyen
m_readBuffer.message.reset();
doReadHeader();
}
else if(ec == boost::asio::error::eof || ec == boost::asio::error::connection_aborted || ec == boost::asio::error::connection_reset)
EventLoop::call(&Client::connectionLost, this);
else if(ec != boost::asio::error::operation_aborted)
EventLoop::call(
[self, ec]()
{
Log::log(self->m_id, LogMessage::E1007_SOCKET_READ_FAILED_X, ec);
self->disconnect();
});
}));
else
doReadData();
}
else if(ec == boost::asio::error::eof || ec == boost::asio::error::connection_aborted || ec == boost::asio::error::connection_reset)
{
connectionLost();
}
else if(ec != boost::asio::error::operation_aborted)
{
Log::log(m_id, LogMessage::E1007_SOCKET_READ_FAILED_X, ec);
disconnect();
}
});
}
void Client::doReadData()
{
auto self(shared_from_this());
assert(IS_SERVER_THREAD);
boost::asio::async_read(m_socket,
boost::asio::buffer(m_readBuffer.message->data(), m_readBuffer.message->dataSize()),
m_strand.wrap(
[this, self](const boost::system::error_code& ec, std::size_t /*bytesReceived*/)
//m_strand.wrap(
[this, weak=weak_from_this()](const boost::system::error_code& ec, std::size_t /*bytesReceived*/)
{
if(weak.expired())
return;
if(!ec)
{
if(m_readBuffer.message->command() != Message::Command::Ping)
@ -123,42 +135,45 @@ void Client::doReadData()
doReadHeader();
}
else if(ec == boost::asio::error::eof || ec == boost::asio::error::connection_aborted || ec == boost::asio::error::connection_reset)
EventLoop::call(&Client::connectionLost, this);
{
connectionLost();
}
else if(ec != boost::asio::error::operation_aborted)
EventLoop::call(
[self, ec]()
{
Log::log(self->m_id, LogMessage::E1007_SOCKET_READ_FAILED_X, ec);
self->disconnect();
});
}));
{
Log::log(m_id, LogMessage::E1007_SOCKET_READ_FAILED_X, ec);
disconnect();
}
});
}
void Client::doWrite()
{
auto self(shared_from_this());
assert(IS_SERVER_THREAD);
boost::asio::async_write(m_socket, boost::asio::buffer(**m_writeQueue.front(), m_writeQueue.front()->size()),
m_strand.wrap(
[this, self](const boost::system::error_code& ec, std::size_t /*bytesTransferred*/)
[this, weak=weak_from_this()](const boost::system::error_code& ec, std::size_t /*bytesTransferred*/)
{
if(weak.expired())
return;
if(!ec)
{
if(!ec)
{
m_writeQueue.pop();
if(!m_writeQueue.empty())
doWrite();
}
else if(ec != boost::asio::error::operation_aborted)
EventLoop::call(
[self, ec]()
{
Log::log(self->m_id, LogMessage::E1006_SOCKET_WRITE_FAILED_X, ec);
self->disconnect();
});
}));
m_writeQueue.pop();
if(!m_writeQueue.empty())
doWrite();
}
else if(ec != boost::asio::error::operation_aborted)
{
Log::log(m_id, LogMessage::E1006_SOCKET_WRITE_FAILED_X, ec);
disconnect();
}
});
}
void Client::processMessage(const std::shared_ptr<Message> message)
{
assert(isEventLoopThread());
if(m_authenticated && m_session)
{
if(m_session->processMessage(*message))
@ -281,7 +296,9 @@ void Client::processMessage(const std::shared_ptr<Message> message)
void Client::sendMessage(std::unique_ptr<Message> message)
{
m_strand.post(
assert(isEventLoopThread());
m_server.m_ioContext.post(
[this, msg=std::make_shared<std::unique_ptr<Message>>(std::move(message))]()
{
const bool wasEmpty = m_writeQueue.empty();
@ -293,12 +310,16 @@ void Client::sendMessage(std::unique_ptr<Message> message)
void Client::connectionLost()
{
assert(IS_SERVER_THREAD);
Log::log(m_id, LogMessage::I1004_CONNECTION_LOST);
disconnect();
}
void Client::disconnect()
{
assert(IS_SERVER_THREAD);
stop();
m_server.clientGone(shared_from_this());
}

Datei anzeigen

@ -23,23 +23,13 @@
#ifndef TRAINTASTIC_SERVER_NETWORK_CLIENT_HPP
#define TRAINTASTIC_SERVER_NETWORK_CLIENT_HPP
//#include "Message.hpp"
//#include "MessageQueue.hpp"
//#include <thread>
#include <memory>
#include <queue>
//#include <deque>
//#include <list>
//#include <condition_variable>
#include <boost/asio.hpp>
//#include "connection-callback.hpp"
//#include "status.hpp"
//#include <chrono>
//#include <cmath>
#include "../core/objectptr.hpp"
#include <traintastic/network/message.hpp>
class Traintastic;
class Server;
class Session;
class Client : public std::enable_shared_from_this<Client>
@ -49,8 +39,7 @@ class Client : public std::enable_shared_from_this<Client>
protected:
using ObjectHandle = uint32_t;
Traintastic& m_server;
boost::asio::io_service::strand m_strand;
Server& m_server;
boost::asio::ip::tcp::socket m_socket;
const std::string m_id;
struct
@ -62,8 +51,6 @@ class Client : public std::enable_shared_from_this<Client>
std::queue<std::unique_ptr<Message>> m_writeQueue;
bool m_authenticated;
std::shared_ptr<Session> m_session;
//ObjectHandle m_lastObjectHandle;
//std::map<ObjectHandle,ObjectPtr> m_objects;
void doReadHeader();
void doReadData();
@ -76,7 +63,7 @@ class Client : public std::enable_shared_from_this<Client>
void disconnect();
public:
Client(Traintastic& server, const std::string& id, boost::asio::ip::tcp::socket socket);
Client(Server& server, std::string id, boost::asio::ip::tcp::socket socket);
virtual ~Client();
void start();

256
server/src/network/server.cpp Normale Datei
Datei anzeigen

@ -0,0 +1,256 @@
/**
* server/src/network/server.cpp
*
* This file is part of the traintastic source code.
*
* Copyright (C) 2022 Reinder Feenstra
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "server.hpp"
#include <traintastic/network/message.hpp>
#include <version.hpp>
#include "client.hpp"
#include "../core/eventloop.hpp"
#include "../log/log.hpp"
#include "../utils/setthreadname.hpp"
#define IS_SERVER_THREAD (std::this_thread::get_id() == m_thread.get_id())
Server::Server()
: m_ioContext{1}
, m_acceptor{m_ioContext}
, m_socketTCP{m_ioContext}
, m_socketUDP{m_ioContext}
{
assert(isEventLoopThread());
}
Server::~Server()
{
assert(isEventLoopThread());
assert(m_ioContext.stopped());
}
bool Server::start(bool localhostOnly, uint16_t port, bool discoverable)
{
assert(isEventLoopThread());
m_localhostOnly = localhostOnly;
boost::system::error_code ec;
boost::asio::ip::tcp::endpoint endpoint(localhostOnly ? boost::asio::ip::address_v4::loopback() : boost::asio::ip::address_v4::any(), port);
m_acceptor.open(endpoint.protocol(), ec);
if(ec)
{
Log::log(id, LogMessage::F1001_OPENING_TCP_SOCKET_FAILED_X, ec.message());
return false;
}
m_acceptor.set_option(boost::asio::socket_base::reuse_address(true), ec);
if(ec)
{
Log::log(id, LogMessage::F1002_TCP_SOCKET_ADDRESS_REUSE_FAILED_X, ec.message());
return false;
}
m_acceptor.bind(endpoint, ec);
if(ec)
{
Log::log(id, LogMessage::F1003_BINDING_TCP_SOCKET_FAILED_X, ec.message());
return false;
}
m_acceptor.listen(5, ec);
if(ec)
{
Log::log(id, LogMessage::F1004_TCP_SOCKET_LISTEN_FAILED_X, ec.message());
return false;
}
if(discoverable)
{
if(port == defaultPort)
{
m_socketUDP.open(boost::asio::ip::udp::v4(), ec);
if(ec)
{
Log::log(id, LogMessage::F1005_OPENING_UDP_SOCKET_FAILED_X, ec.message());
return false;
}
m_socketUDP.set_option(boost::asio::socket_base::reuse_address(true), ec);
if(ec)
{
Log::log(id, LogMessage::F1006_UDP_SOCKET_ADDRESS_REUSE_FAILED_X, ec.message());
return false;
}
m_socketUDP.bind(boost::asio::ip::udp::endpoint(boost::asio::ip::address_v4::any(), defaultPort), ec);
if(ec)
{
Log::log(id, LogMessage::F1007_BINDING_UDP_SOCKET_FAILED_X, ec.message());
return false;
}
Log::log(id, LogMessage::N1005_DISCOVERY_ENABLED);
}
else
{
Log::log(id, LogMessage::W1001_DISCOVERY_DISABLED_ONLY_ALLOWED_ON_PORT_X, defaultPort);
discoverable = false;
}
}
else
Log::log(id, LogMessage::N1006_DISCOVERY_DISABLED);
Log::log(id, LogMessage::N1007_LISTENING_AT_X_X, m_acceptor.local_endpoint().address().to_string(), m_acceptor.local_endpoint().port());
m_thread = std::thread(
[this]()
{
setThreadName("server");
auto work = std::make_shared<boost::asio::io_context::work>(m_ioContext);
m_ioContext.run();
});
m_ioContext.post(
[this, discoverable]()
{
if(discoverable)
doReceive();
doAccept();
});
return true;
}
void Server::stop()
{
assert(isEventLoopThread());
m_ioContext.post(
[this]()
{
boost::system::error_code ec;
if(m_acceptor.cancel(ec))
Log::log(id, LogMessage::E1008_SOCKET_ACCEPTOR_CANCEL_FAILED_X, ec);
m_acceptor.close();
m_socketUDP.close();
for(auto& client : m_clients)
{
client->stop();
client.reset();
}
});
m_ioContext.stop();
m_thread.join();
}
void Server::clientGone(const std::shared_ptr<Client>& client)
{
assert(IS_SERVER_THREAD);
m_clients.erase(std::find(m_clients.begin(), m_clients.end(), client));
}
void Server::doReceive()
{
assert(IS_SERVER_THREAD);
m_socketUDP.async_receive_from(boost::asio::buffer(m_udpBuffer), m_remoteEndpoint,
[this](const boost::system::error_code& ec, std::size_t bytesReceived)
{
if(!ec)
{
if(bytesReceived == sizeof(Message::Header))
{
Message message(*reinterpret_cast<Message::Header*>(m_udpBuffer.data()));
if(!m_localhostOnly || m_remoteEndpoint.address().is_loopback())
{
if(message.dataSize() == 0)
{
std::unique_ptr<Message> response = processMessage(message);
if(response)
{
m_socketUDP.async_send_to(boost::asio::buffer(**response, response->size()), m_remoteEndpoint,
[this](const boost::system::error_code& /*ec*/, std::size_t /*bytesTransferred*/)
{
doReceive();
});
return;
}
}
}
}
doReceive();
}
else
Log::log(id, LogMessage::E1003_UDP_RECEIVE_ERROR_X, ec.message());
});
}
std::unique_ptr<Message> Server::processMessage(const Message& message)
{
assert(IS_SERVER_THREAD);
if(message.command() == Message::Command::Discover && message.isRequest())
{
std::unique_ptr<Message> response = Message::newResponse(message.command(), message.requestId());
response->write(boost::asio::ip::host_name());
response->write<uint16_t>(TRAINTASTIC_VERSION_MAJOR);
response->write<uint16_t>(TRAINTASTIC_VERSION_MINOR);
response->write<uint16_t>(TRAINTASTIC_VERSION_PATCH);
assert(response->size() <= 1500); // must fit in a UDP packet
return response;
}
return {};
}
void Server::doAccept()
{
assert(IS_SERVER_THREAD);
m_acceptor.async_accept(m_socketTCP,
[this](boost::system::error_code ec)
{
if(!ec)
{
try
{
std::shared_ptr<Client> client = std::make_shared<Client>(*this, "client[" + m_socketTCP.remote_endpoint().address().to_string() + ":" + std::to_string(m_socketTCP.remote_endpoint().port()) + "]", std::move(m_socketTCP));
client->start();
m_clients.push_back(client);
doAccept();
}
catch(const std::exception& e)
{
Log::log(id, LogMessage::C1002_CREATING_CLIENT_FAILED_X, e.what());
}
}
else
Log::log(id, LogMessage::E1004_TCP_ACCEPT_ERROR_X, ec.message());
});
}

Datei anzeigen

@ -0,0 +1,73 @@
/**
* server/src/network/server.hpp
*
* This file is part of the traintastic source code.
*
* Copyright (C) 2022 Reinder Feenstra
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#ifndef TRAINTASTIC_SERVER_NETWORK_SERVER_HPP
#define TRAINTASTIC_SERVER_NETWORK_SERVER_HPP
#include <memory>
#include <array>
#include <list>
#include <thread>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ip/udp.hpp>
class Client;
class Message;
class Server : public std::enable_shared_from_this<Server>
{
friend class Client;
private:
static constexpr std::string_view id{"server"};
boost::asio::io_context m_ioContext;
std::thread m_thread;
boost::asio::ip::tcp::acceptor m_acceptor;
boost::asio::ip::tcp::socket m_socketTCP;
boost::asio::ip::udp::socket m_socketUDP;
std::array<char, 8> m_udpBuffer;
boost::asio::ip::udp::endpoint m_remoteEndpoint;
bool m_localhostOnly;
std::list<std::shared_ptr<Client>> m_clients;
void doReceive();
std::unique_ptr<Message> processMessage(const Message& message);
void doAccept();
void clientGone(const std::shared_ptr<Client>& client);
public:
static constexpr uint16_t defaultPort = 5740; //!< unoffical, not (yet) assigned by IANA
Server();
~Server();
#ifndef NDEBUG
inline auto threadId() const { return m_thread.get_id(); }
#endif
bool start(bool localhostOnly, uint16_t port, bool discoverable);
void stop();
};
#endif

Datei anzeigen

@ -25,6 +25,7 @@
#include <iomanip>
#include "../core/attributes.hpp"
#include "traintastic.hpp"
#include "../network/server.hpp"
#include "../log/log.hpp"
#include "../utils/category.hpp"
@ -47,7 +48,7 @@ Settings::PreStart Settings::getPreStartSettings(const std::filesystem::path& pa
Settings::Settings(const std::filesystem::path& path)
: m_filename{path / filename}
, localhostOnly{this, "localhost_only", true, PropertyFlags::ReadWrite, [this](const bool& /*value*/){ saveToFile(); }}
, port{this, "port", defaultPort, PropertyFlags::ReadWrite, [this](const uint16_t& /*value*/){ saveToFile(); }}
, port{this, "port", Server::defaultPort, PropertyFlags::ReadWrite, [this](const uint16_t& /*value*/){ saveToFile(); }}
, discoverable{this, "discoverable", true, PropertyFlags::ReadWrite, [this](const bool& /*value*/){ saveToFile(); }}
, lastWorld{this, "last_world", "", PropertyFlags::ReadWrite | PropertyFlags::Internal, [this](const std::string& /*value*/){ saveToFile(); }}
, loadLastWorldOnStartup{this, "load_last_world_on_startup", true, PropertyFlags::ReadWrite, [this](const bool& /*value*/){ saveToFile(); }}

Datei anzeigen

@ -3,7 +3,7 @@
*
* This file is part of the traintastic source code.
*
* Copyright (C) 2019-2021 Reinder Feenstra
* Copyright (C) 2019-2022 Reinder Feenstra
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
@ -60,7 +60,6 @@ class Settings : public Object
};
static constexpr std::string_view id = classId;
static constexpr uint16_t defaultPort = 5740; //!< unoffical, not (yet) assigned by IANA
static PreStart getPreStartSettings(const std::filesystem::path& path);

Datei anzeigen

@ -29,7 +29,7 @@
#include <traintastic/codename.hpp>
#include <traintastic/utils/str.hpp>
#include "../core/eventloop.hpp"
#include "../network/client.hpp"
#include "../network/server.hpp"
#include "../core/attributes.hpp"
#include "../world/world.hpp"
#include "../world/worldlist.hpp"
@ -43,10 +43,7 @@ std::shared_ptr<Traintastic> Traintastic::instance;
Traintastic::Traintastic(const std::filesystem::path& dataDir) :
m_restart{false},
m_dataDir{std::filesystem::absolute(dataDir)},
m_ioContext{},
m_acceptor{m_ioContext},
m_socketTCP{m_ioContext},
m_socketUDP{m_ioContext},
m_server{std::make_shared<Server>()},
settings{this, "settings", nullptr, PropertyFlags::ReadWrite/*ReadOnly*/},
world{this, "world", nullptr, PropertyFlags::ReadWrite,
[this](const std::shared_ptr<World>& /*newWorld*/)
@ -130,11 +127,6 @@ Traintastic::Traintastic(const std::filesystem::path& dataDir) :
m_interfaceItems.add(shutdown);
}
Traintastic::~Traintastic()
{
assert(m_ioContext.stopped());
}
bool Traintastic::importWorld(const std::vector<std::byte>& worldData)
{
try
@ -169,11 +161,10 @@ Traintastic::RunStatus Traintastic::run()
if(settings->loadLastWorldOnStartup && !settings->lastWorld.value().empty())
loadWorld(settings->lastWorld.value());
if(!start())
if(!m_server->start(settings->localhostOnly, settings->port, settings->discoverable))
return ExitFailure;
auto work = std::make_shared<boost::asio::io_service::work>(m_ioContext);
m_ioContext.run();
EventLoop::exec();
return m_restart ? Restart : ExitSuccess;
}
@ -188,99 +179,7 @@ void Traintastic::exit()
if(settings->autoSaveWorldOnExit && world)
world->save();
stop();
m_ioContext.stop();
}
bool Traintastic::start()
{
boost::system::error_code ec;
boost::asio::ip::tcp::endpoint endpoint(settings->localhostOnly ? boost::asio::ip::address_v4::loopback() : boost::asio::ip::address_v4::any(), settings->port);
m_acceptor.open(endpoint.protocol(), ec);
if(ec)
{
Log::log(*this, LogMessage::F1001_OPENING_TCP_SOCKET_FAILED_X, ec.message());
return false;
}
m_acceptor.set_option(boost::asio::socket_base::reuse_address(true), ec);
if(ec)
{
Log::log(*this, LogMessage::F1002_TCP_SOCKET_ADDRESS_REUSE_FAILED_X, ec.message());
return false;
}
m_acceptor.bind(endpoint, ec);
if(ec)
{
Log::log(*this, LogMessage::F1003_BINDING_TCP_SOCKET_FAILED_X, ec.message());
return false;
}
m_acceptor.listen(5, ec);
if(ec)
{
Log::log(*this, LogMessage::F1004_TCP_SOCKET_LISTEN_FAILED_X, ec.message());
return false;
}
if(settings->discoverable)
{
if(settings->port == Settings::defaultPort)
{
m_socketUDP.open(boost::asio::ip::udp::v4(), ec);
if(ec)
{
Log::log(*this, LogMessage::F1005_OPENING_UDP_SOCKET_FAILED_X, ec.message());
return false;
}
m_socketUDP.set_option(boost::asio::socket_base::reuse_address(true), ec);
if(ec)
{
Log::log(*this, LogMessage::F1006_UDP_SOCKET_ADDRESS_REUSE_FAILED_X, ec.message());
return false;
}
m_socketUDP.bind(boost::asio::ip::udp::endpoint(boost::asio::ip::address_v4::any(), Settings::defaultPort), ec);
if(ec)
{
Log::log(*this, LogMessage::F1007_BINDING_UDP_SOCKET_FAILED_X, ec.message());
return false;
}
Log::log(*this, LogMessage::N1005_DISCOVERY_ENABLED);
doReceive();
}
else
Log::log(*this, LogMessage::W1001_DISCOVERY_DISABLED_ONLY_ALLOWED_ON_PORT_X, Settings::defaultPort);
}
else
Log::log(*this, LogMessage::N1006_DISCOVERY_DISABLED);
Log::log(*this, LogMessage::N1007_LISTENING_AT_X_X, m_acceptor.local_endpoint().address().to_string(), m_acceptor.local_endpoint().port());
doAccept();
return true;
}
void Traintastic::stop()
{
for(auto& client : m_clients)
{
client->stop();
client.reset();
}
boost::system::error_code ec;
m_acceptor.cancel(ec);
if(ec)
Log::log(*this, LogMessage::E1008_SOCKET_ACCEPTOR_CANCEL_FAILED_X, ec);
m_acceptor.close();
m_socketUDP.close();
EventLoop::stop();
}
void Traintastic::loadWorldUUID(const boost::uuids::uuid& uuid)
@ -309,87 +208,3 @@ void Traintastic::loadWorldPath(const std::filesystem::path& path)
Log::log(*this, LogMessage::C1001_LOADING_WORLD_FAILED_X, e.what());
}
}
void Traintastic::clientGone(const std::shared_ptr<Client>& client)
{
m_clients.erase(std::find(m_clients.begin(), m_clients.end(), client));
}
void Traintastic::doReceive()
{
m_socketUDP.async_receive_from(boost::asio::buffer(m_udpBuffer), m_remoteEndpoint,
[this](const boost::system::error_code& ec, std::size_t bytesReceived)
{
if(!ec)
{
if(bytesReceived == sizeof(Message::Header))
{
Message message(*reinterpret_cast<Message::Header*>(m_udpBuffer.data()));
if(!settings->localhostOnly || m_remoteEndpoint.address().is_loopback())
{
if(message.dataSize() == 0)
{
std::unique_ptr<Message> response = processMessage(message);
if(response)
{
m_socketUDP.async_send_to(boost::asio::buffer(**response, response->size()), m_remoteEndpoint,
[this](const boost::system::error_code& /*ec*/, std::size_t /*bytesTransferred*/)
{
doReceive();
});
return;
}
}
}
}
doReceive();
}
else
Log::log(*this, LogMessage::E1003_UDP_RECEIVE_ERROR_X, ec.message());
});
}
std::unique_ptr<Message> Traintastic::processMessage(const Message& message)
{
if(message.command() == Message::Command::Discover && message.isRequest())
{
std::unique_ptr<Message> response = Message::newResponse(message.command(), message.requestId());
response->write(boost::asio::ip::host_name());
response->write<uint16_t>(TRAINTASTIC_VERSION_MAJOR);
response->write<uint16_t>(TRAINTASTIC_VERSION_MINOR);
response->write<uint16_t>(TRAINTASTIC_VERSION_PATCH);
assert(response->size() <= 1500); // must fit in a UDP packet
return response;
}
return nullptr;
}
void Traintastic::doAccept()
{
m_acceptor.async_accept(m_socketTCP,
[this](boost::system::error_code ec)
{
EventLoop::call(
[this, ec]()
{
if(!ec)
{
try
{
std::shared_ptr<Client> client = std::make_shared<Client>(*this, "client[" + m_socketTCP.remote_endpoint().address().to_string() + ":" + std::to_string(m_socketTCP.remote_endpoint().port()) + "]", std::move(m_socketTCP));
client->start();
m_clients.push_back(client);
doAccept();
}
catch(const std::exception& e)
{
Log::log(*this, LogMessage::C1002_CREATING_CLIENT_FAILED_X, e.what());
}
}
else
Log::log(*this, LogMessage::E1004_TCP_ACCEPT_ERROR_X, ec.message());
});
});
}

Datei anzeigen

@ -35,13 +35,10 @@
#include "../world/world.hpp"
#include "../world/worldlist.hpp"
class Client;
class Message;
class Server;
class Traintastic final : public Object
{
friend class Client;
public:
enum RunStatus
{
@ -53,13 +50,7 @@ class Traintastic final : public Object
private:
bool m_restart;
const std::filesystem::path m_dataDir;
boost::asio::io_context m_ioContext;
boost::asio::ip::tcp::acceptor m_acceptor;
boost::asio::ip::tcp::socket m_socketTCP;
boost::asio::ip::udp::socket m_socketUDP;
std::array<char, 8> m_udpBuffer;
boost::asio::ip::udp::endpoint m_remoteEndpoint;
std::list<std::shared_ptr<Client>> m_clients;
std::shared_ptr<Server> m_server;
bool start();
void stop();
@ -67,13 +58,6 @@ class Traintastic final : public Object
void loadWorldUUID(const boost::uuids::uuid& uuid);
void loadWorldPath(const std::filesystem::path& path);
void doReceive();
static std::unique_ptr<Message> processMessage(const Message& message);
void doAccept();
protected:
void clientGone(const std::shared_ptr<Client>& client);
public:
CLASS_ID("traintastic");
@ -91,12 +75,10 @@ class Traintastic final : public Object
Method<void()> shutdown;
Traintastic(const std::filesystem::path& dataDir);
~Traintastic() final;
~Traintastic() final = default;
std::string getObjectId() const final { return std::string(id); }
boost::asio::io_context& ioContext() { return m_ioContext; }
const std::filesystem::path& dataDir() const { return m_dataDir; }
std::filesystem::path dataBackupDir() const { return m_dataDir / ".backup"; }