From da502b5692f6167b1d659ab2cb2e78fe4244557c Mon Sep 17 00:00:00 2001
From: Bensong Liu <bensl@microsoft.com>
Date: Wed, 29 Jul 2020 15:21:31 +0800
Subject: [PATCH] fix some syntax error

---
 src/main.cc             |  3 ++-
 src/protocols/base.hpp  | 12 ++++++------
 src/protocols/plain.hpp | 27 +++++++++++++++++----------
 src/utils.hpp           | 12 ++++++------
 4 files changed, 31 insertions(+), 23 deletions(-)

diff --git a/src/main.cc b/src/main.cc
index 65169f2..f088caa 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -3,6 +3,7 @@
 #include <rlib/sys/os.hpp>
 #include <thread>
 #include "common.hpp"
+#include "forwarder.hpp"
 
 rlib::logger rlog(std::cerr);
 using namespace rlib::literals;
@@ -38,7 +39,7 @@ int real_main(int argc, char **argv) {
     else
         throw std::runtime_error("Unknown log level: " + log_level);
 
-    // Forwarder(inboundConfig, outboundConfig).run_forever();
+    Forwarder(inboundConfig, outboundConfig).run_forever();
 
     return 0;
 }
diff --git a/src/protocols/base.hpp b/src/protocols/base.hpp
index 75d75f8..ddf3af3 100644
--- a/src/protocols/base.hpp
+++ b/src/protocols/base.hpp
@@ -23,40 +23,40 @@ namespace Protocols {
 		BaseOutbound(string outboundConfig) {
 			loadConfig(outboundConfig);
 		}
-		virtual ~BaseOutbound = default;
+		virtual ~BaseOutbound() = default;
 
 		// Init data structures.
 		virtual void loadConfig(string config) = 0;
 
 		// InboundThread calls this function. Check the mapping between senderId and serverConn, wake up listenThread, and deliver the msg. 
-		virtual void handleMessage(string binaryMessage, string senderId) = 0;
+		virtual void forwardMessageToInbound(string binaryMessage, string senderId) = 0;
 
 		// Listen the PIPE. handleMessage will wake up this thread from epoll.
 		// Also listen the connection fileDescriptors.
 		virtual void listenForever(BaseInbound *previousHop) = 0;
 
 		// Inbound.listenForever MUST initialize this field. 
-		fd_t ipcPipeOutboundEnd = -1;
+		sockfd_t ipcPipe = -1;
 	};
 
 	struct BaseInbound : rlib::noncopyable {
 		BaseInbound(string inboundConfig) {
 			loadConfig(inboundConfig);
 		}
-		virtual ~BaseInbound = default;
+		virtual ~BaseInbound() = default;
 
 		// Init data structures.
 		virtual void loadConfig(string config) = 0;
 
 		// OutboundThread calls this function. Wake up 'listenForever' thread, and send back a message. Outbound provides the senderId.
-		virtual void handleMessage(string binaryMessage, string senderId) = 0;
+		virtual void forwardMessageToOutbound(string binaryMessage, string senderId) = 0;
 
 		// Listen the addr:port in config, for inbound connection.
 		// Also listen the accepted connection fileDescriptors, and listen the PIPE.
 		virtual void listenForever(BaseOutbound *nextHop) = 0;
 
 		// Inbound.listenForever MUST initialize this field. 
-		fd_t ipcPipeInboundEnd = -1;
+		sockfd_t ipcPipe = -1;
 	};
 
 	// TODO: PIPE only works on linux epoll. The windows epoll only works on SOCKET. 
diff --git a/src/protocols/plain.hpp b/src/protocols/plain.hpp
index 85b42ac..4159459 100644
--- a/src/protocols/plain.hpp
+++ b/src/protocols/plain.hpp
@@ -11,16 +11,20 @@ namespace Protocols {
 	class PlainInbound : public BaseInbound {
 	public:
 		using BaseInbound::BaseInbound;
-		virtual loadConfig(string config) override {
+		virtual void loadConfig(string config) override {
 			auto ar = rlib::string(config).split('@'); // Also works for ipv6.
 			if (ar.size() != 3)
 				throw std::invalid_argument("Wrong parameter string for protocol 'plain'. Example:    plain@fe00:1e10:ce95:1@10809");
 			listenAddr = ar[1];
 			listenPort = ar[2].as<uint16_t>();
+		}
+		virtual void forwardMessageToOutbound(string binaryMessage, string senderId) override {
+			// Outbound calls this function, to alert the inbound listener thread, for the new msg.
+
 
 		}
-		virtual listenForever(BaseOutbound* nextHop) override {
-			std::tie(this->ipcPipeInboundEnd, nextHop->ipcPipeOutboundEnd) = mk_tcp_pipe();
+		virtual void listenForever(BaseOutbound* nextHop) override {
+			std::tie(this->ipcPipe, nextHop->ipcPipe) = mk_tcp_pipe();
 
 			auto listenFd = rlib::quick_listen(listenAddr, listenPort, true);
 			rlib_defer([&] {close(listenFd);});
@@ -28,15 +32,15 @@ namespace Protocols {
 			auto epollFd = epoll_create1(0);
 			dynamic_assert(epollFd != -1, "epoll_create1 failed");
 			epoll_add_fd(epollFd, listenFd);
-			epoll_add_fd(epollFd, ipcPipeInboundEnd);
+			epoll_add_fd(epollFd, ipcPipe);
 
 			// ----------------------- Process an event ------------------------------
 			auto udpSenderSocket = socket(AF_INET, SOCK_DGRAM, 0);
 			dynamic_assert(udpSenderSocket > 0, "socket create failed.");
-			std::string msgBuffer(DGRAM_BUFFER_SIZE, "\0");
+			std::string msgBuffer(DGRAM_BUFFER_SIZE, '\0');
 			// WARN: If you want to modify this program to work for TCP, PLEASE use rlib::sockIO::recv instead of fixed buffer.
 			auto onEvent = [&](auto activeFd) {
-				if (activeFd == ipcPipeInboundEnd) {
+				if (activeFd == ipcPipe) {
 					// Outbound gave me a message to forward! Send it. 
 					auto targetClientId = rlib::sockIO::recv_msg(activeFd);
 					auto msg = rlib::sockIO::recv_msg(activeFd);
@@ -47,18 +51,18 @@ namespace Protocols {
 				}
 				else if (activeFd == listenFd) {
 					SockAddr clientAddr;
-					auto msgLength = recvfrom(activeFd, buffer, sizeof(buffer), 0, &clientAddr.addr, &clientAddr.len);
+					auto msgLength = recvfrom(activeFd, msgBuffer.data(), msgBuffer.size(), 0, &clientAddr.addr, &clientAddr.len);
 					dynamic_assert(msgLength != -1, "recvfrom failed");
 
-					nextHop->handleMessage(msgBuffer.substr(msgLength), ConnectionMapping::makeClientId(clientAddr));
+					forwardMessageToOutbound(msgBuffer.substr(msgLength), ConnectionMapping::makeClientId(clientAddr));
 				}
 			};
 
 			// ----------------------- listener main loop ------------------------------
-			epoll_event events[MAX_EVENTS];
+			epoll_event events[EPOLL_MAX_EVENTS];
 			rlog.info("PlainListener listening InboundPort [{}]:{} ...", listenAddr, listenPort);
 			while (true) {
-				auto nfds = epoll_wait(epollFd, events, MAX_EVENTS, -1);
+				auto nfds = epoll_wait(epollFd, events, EPOLL_MAX_EVENTS, -1);
 				dynamic_assert(nfds != -1, "epoll_wait failed");
 				
 				for (auto cter = 0; cter < nfds; ++cter) {
@@ -76,6 +80,9 @@ namespace Protocols {
 	class PlainOutbound : public BaseOutbound {
 	public:
 		using BaseOutbound::BaseOutbound;
+		virtual void loadConfig(string config) override {
+
+		}
 
 	};
 }
diff --git a/src/utils.hpp b/src/utils.hpp
index c59efa6..8faa49f 100644
--- a/src/utils.hpp
+++ b/src/utils.hpp
@@ -26,13 +26,13 @@ struct SockAddr {
 struct ConnectionMapping {
     std::unordered_map<string, fd_t> client2server;
     std::unordered_multimap<fd_t, string> server2client;
-    static string makeClientId(const SockAddr &osStrust) const {
+    static string makeClientId(const SockAddr &osStruct) {
         // ClientId is a binary string.
         string result(sizeof(osStruct), '\0');
-        std::memcpy(result.data(), &osStruct, sizeof(osStrust));
+        std::memcpy(result.data(), &osStruct, sizeof(osStruct));
         return result;
     }
-    static void parseClientId(const string &clientId, SockAddr &output) const {
+    static void parseClientId(const string &clientId, SockAddr &output) {
         static_assert(sizeof(output) == sizeof(SockAddr), "error: programming error detected.");
         if (clientId.size() != sizeof(output))
             throw std::invalid_argument("parseClientId, invalid input binary string length.");
@@ -40,7 +40,7 @@ struct ConnectionMapping {
     }
 };
 
-inline void epoll_add_fd(fd_t epollFd, fd_t fd) {
+inline void epoll_add_fd(fd_t epollFd, sockfd_t fd) {
     epoll_event event {
         .events = EPOLLIN,
         .data = {
@@ -51,7 +51,7 @@ inline void epoll_add_fd(fd_t epollFd, fd_t fd) {
     if(ret1 == -1)
         throw std::runtime_error("epoll_ctl failed.");
 }
-inline void epoll_del_fd(fd_t epollFd, fd_t fd) {
+inline void epoll_del_fd(fd_t epollFd, sockfd_t fd) {
     epoll_event event {
         .events = EPOLLIN,
         .data = {
@@ -79,7 +79,7 @@ inline auto mkpipe() {
 */
 
 inline auto mk_tcp_pipe() {
-    fd_t connfd_cli_side, connfd_srv_side;
+    sockfd_t connfd_cli_side, connfd_srv_side;
     auto listenfd = rlib::quick_listen("::1", TCP_TMP_PORT_NUMBER);
     auto serverThread = std::thread([&] {
         connfd_srv_side = rlib::quick_accept(listenfd);
-- 
GitLab