diff --git a/src/main.cc b/src/main.cc index 65169f2f7f7719e3993f51a5b795c3edc3b1dbcf..f088caa075206e7a158acfbc5fc230bcc36a47ec 100644 --- a/src/main.cc +++ b/src/main.cc @@ -3,6 +3,7 @@ #include <rlib/sys/os.hpp> #include <thread> #include "common.hpp" +#include "forwarder.hpp" rlib::logger rlog(std::cerr); using namespace rlib::literals; @@ -38,7 +39,7 @@ int real_main(int argc, char **argv) { else throw std::runtime_error("Unknown log level: " + log_level); - // Forwarder(inboundConfig, outboundConfig).run_forever(); + Forwarder(inboundConfig, outboundConfig).run_forever(); return 0; } diff --git a/src/protocols/base.hpp b/src/protocols/base.hpp index 75d75f86bb4434f9aedb1aa0f0e43dae12b0ef8a..ddf3af3adc674b4a400e3488b4cbe0e6ea4dbc5c 100644 --- a/src/protocols/base.hpp +++ b/src/protocols/base.hpp @@ -23,40 +23,40 @@ namespace Protocols { BaseOutbound(string outboundConfig) { loadConfig(outboundConfig); } - virtual ~BaseOutbound = default; + virtual ~BaseOutbound() = default; // Init data structures. virtual void loadConfig(string config) = 0; // InboundThread calls this function. Check the mapping between senderId and serverConn, wake up listenThread, and deliver the msg. - virtual void handleMessage(string binaryMessage, string senderId) = 0; + virtual void forwardMessageToInbound(string binaryMessage, string senderId) = 0; // Listen the PIPE. handleMessage will wake up this thread from epoll. // Also listen the connection fileDescriptors. virtual void listenForever(BaseInbound *previousHop) = 0; // Inbound.listenForever MUST initialize this field. - fd_t ipcPipeOutboundEnd = -1; + sockfd_t ipcPipe = -1; }; struct BaseInbound : rlib::noncopyable { BaseInbound(string inboundConfig) { loadConfig(inboundConfig); } - virtual ~BaseInbound = default; + virtual ~BaseInbound() = default; // Init data structures. virtual void loadConfig(string config) = 0; // OutboundThread calls this function. Wake up 'listenForever' thread, and send back a message. Outbound provides the senderId. - virtual void handleMessage(string binaryMessage, string senderId) = 0; + virtual void forwardMessageToOutbound(string binaryMessage, string senderId) = 0; // Listen the addr:port in config, for inbound connection. // Also listen the accepted connection fileDescriptors, and listen the PIPE. virtual void listenForever(BaseOutbound *nextHop) = 0; // Inbound.listenForever MUST initialize this field. - fd_t ipcPipeInboundEnd = -1; + sockfd_t ipcPipe = -1; }; // TODO: PIPE only works on linux epoll. The windows epoll only works on SOCKET. diff --git a/src/protocols/plain.hpp b/src/protocols/plain.hpp index 85b42ac8bf7cb685b094bc9137b7e3788a537465..4159459f15bb173726360c655ae5ab31d10e5879 100644 --- a/src/protocols/plain.hpp +++ b/src/protocols/plain.hpp @@ -11,16 +11,20 @@ namespace Protocols { class PlainInbound : public BaseInbound { public: using BaseInbound::BaseInbound; - virtual loadConfig(string config) override { + virtual void loadConfig(string config) override { auto ar = rlib::string(config).split('@'); // Also works for ipv6. if (ar.size() != 3) throw std::invalid_argument("Wrong parameter string for protocol 'plain'. Example: plain@fe00:1e10:ce95:1@10809"); listenAddr = ar[1]; listenPort = ar[2].as<uint16_t>(); + } + virtual void forwardMessageToOutbound(string binaryMessage, string senderId) override { + // Outbound calls this function, to alert the inbound listener thread, for the new msg. + } - virtual listenForever(BaseOutbound* nextHop) override { - std::tie(this->ipcPipeInboundEnd, nextHop->ipcPipeOutboundEnd) = mk_tcp_pipe(); + virtual void listenForever(BaseOutbound* nextHop) override { + std::tie(this->ipcPipe, nextHop->ipcPipe) = mk_tcp_pipe(); auto listenFd = rlib::quick_listen(listenAddr, listenPort, true); rlib_defer([&] {close(listenFd);}); @@ -28,15 +32,15 @@ namespace Protocols { auto epollFd = epoll_create1(0); dynamic_assert(epollFd != -1, "epoll_create1 failed"); epoll_add_fd(epollFd, listenFd); - epoll_add_fd(epollFd, ipcPipeInboundEnd); + epoll_add_fd(epollFd, ipcPipe); // ----------------------- Process an event ------------------------------ auto udpSenderSocket = socket(AF_INET, SOCK_DGRAM, 0); dynamic_assert(udpSenderSocket > 0, "socket create failed."); - std::string msgBuffer(DGRAM_BUFFER_SIZE, "\0"); + std::string msgBuffer(DGRAM_BUFFER_SIZE, '\0'); // WARN: If you want to modify this program to work for TCP, PLEASE use rlib::sockIO::recv instead of fixed buffer. auto onEvent = [&](auto activeFd) { - if (activeFd == ipcPipeInboundEnd) { + if (activeFd == ipcPipe) { // Outbound gave me a message to forward! Send it. auto targetClientId = rlib::sockIO::recv_msg(activeFd); auto msg = rlib::sockIO::recv_msg(activeFd); @@ -47,18 +51,18 @@ namespace Protocols { } else if (activeFd == listenFd) { SockAddr clientAddr; - auto msgLength = recvfrom(activeFd, buffer, sizeof(buffer), 0, &clientAddr.addr, &clientAddr.len); + auto msgLength = recvfrom(activeFd, msgBuffer.data(), msgBuffer.size(), 0, &clientAddr.addr, &clientAddr.len); dynamic_assert(msgLength != -1, "recvfrom failed"); - nextHop->handleMessage(msgBuffer.substr(msgLength), ConnectionMapping::makeClientId(clientAddr)); + forwardMessageToOutbound(msgBuffer.substr(msgLength), ConnectionMapping::makeClientId(clientAddr)); } }; // ----------------------- listener main loop ------------------------------ - epoll_event events[MAX_EVENTS]; + epoll_event events[EPOLL_MAX_EVENTS]; rlog.info("PlainListener listening InboundPort [{}]:{} ...", listenAddr, listenPort); while (true) { - auto nfds = epoll_wait(epollFd, events, MAX_EVENTS, -1); + auto nfds = epoll_wait(epollFd, events, EPOLL_MAX_EVENTS, -1); dynamic_assert(nfds != -1, "epoll_wait failed"); for (auto cter = 0; cter < nfds; ++cter) { @@ -76,6 +80,9 @@ namespace Protocols { class PlainOutbound : public BaseOutbound { public: using BaseOutbound::BaseOutbound; + virtual void loadConfig(string config) override { + + } }; } diff --git a/src/utils.hpp b/src/utils.hpp index c59efa6412e2c81f316cd61e271b0681ea115eaf..8faa49f1751c3a569d1ae40e565ab617acc8384b 100644 --- a/src/utils.hpp +++ b/src/utils.hpp @@ -26,13 +26,13 @@ struct SockAddr { struct ConnectionMapping { std::unordered_map<string, fd_t> client2server; std::unordered_multimap<fd_t, string> server2client; - static string makeClientId(const SockAddr &osStrust) const { + static string makeClientId(const SockAddr &osStruct) { // ClientId is a binary string. string result(sizeof(osStruct), '\0'); - std::memcpy(result.data(), &osStruct, sizeof(osStrust)); + std::memcpy(result.data(), &osStruct, sizeof(osStruct)); return result; } - static void parseClientId(const string &clientId, SockAddr &output) const { + static void parseClientId(const string &clientId, SockAddr &output) { static_assert(sizeof(output) == sizeof(SockAddr), "error: programming error detected."); if (clientId.size() != sizeof(output)) throw std::invalid_argument("parseClientId, invalid input binary string length."); @@ -40,7 +40,7 @@ struct ConnectionMapping { } }; -inline void epoll_add_fd(fd_t epollFd, fd_t fd) { +inline void epoll_add_fd(fd_t epollFd, sockfd_t fd) { epoll_event event { .events = EPOLLIN, .data = { @@ -51,7 +51,7 @@ inline void epoll_add_fd(fd_t epollFd, fd_t fd) { if(ret1 == -1) throw std::runtime_error("epoll_ctl failed."); } -inline void epoll_del_fd(fd_t epollFd, fd_t fd) { +inline void epoll_del_fd(fd_t epollFd, sockfd_t fd) { epoll_event event { .events = EPOLLIN, .data = { @@ -79,7 +79,7 @@ inline auto mkpipe() { */ inline auto mk_tcp_pipe() { - fd_t connfd_cli_side, connfd_srv_side; + sockfd_t connfd_cli_side, connfd_srv_side; auto listenfd = rlib::quick_listen("::1", TCP_TMP_PORT_NUMBER); auto serverThread = std::thread([&] { connfd_srv_side = rlib::quick_accept(listenfd);