From c107f2e5002dc63da9451086fbd536c54a9e2142 Mon Sep 17 00:00:00 2001
From: Bensong Liu <bensl@microsoft.com>
Date: Tue, 28 Jul 2020 16:51:54 +0800
Subject: [PATCH] framework design mostly done

---
 src/common.hpp          |  3 +++
 src/protocols/base.hpp  | 52 ++++++++++++++++++++++++++++++++---------
 src/protocols/plain.hpp | 10 ++++----
 src/utils.hpp           | 28 ++++++++++++++++++++++
 4 files changed, 78 insertions(+), 15 deletions(-)

diff --git a/src/common.hpp b/src/common.hpp
index 02f3427..d3db0c6 100644
--- a/src/common.hpp
+++ b/src/common.hpp
@@ -16,5 +16,8 @@ constexpr size_t DGRAM_BUFFER_SIZE = 20480;
 //   to the real openvpn server.
 constexpr size_t SERVER_ENCRYPT_CONNECTION_TIMEOUT_SECONDS = 60;
 
+// MAGIC PORT NUMBER! Warning!
+constexpr uint16_t TCP_TMP_PORT_NUMBER = 50999;
+
 #endif
 
diff --git a/src/protocols/base.hpp b/src/protocols/base.hpp
index 9aa17f7..e65f0c9 100644
--- a/src/protocols/base.hpp
+++ b/src/protocols/base.hpp
@@ -5,34 +5,64 @@
 #include <string>
 using std::string;
 
+/*
+User
+      |----------------------|       |----------------------|       
+ ---> |PlainInbound          |  /==> |MiscInbound --PIPE-\  |
+      |       \--PIPE---\    |  |    |                    | |
+      |          MiscOutbound| =/    |         PlainOutbound| ----> UDP App
+      |----------------------|       |----------------------|       
+         UDP Forwarder Client          UDP Forwarder Server
+*/
+
 namespace Protocols {
-	// Handler holds the senderId=>nextHopFd mapping.
+	// Outbound holds the senderId=>nextHopFd mapping.
 	// senderId is "$ip@$port", for example, `fe80:8100::1@1080`. 
 	// Misc protocol may use duplicateSenderId to work on port migration.
 	// Any listener may use removeSenderId to disconnect a sender.
 	// Note: this interface works for both TCP and UDP.
-	struct BaseHandler : rlib::noncopyable {
-		BaseHandler(string outboundConfig) {
+	struct BaseOutbound : rlib::noncopyable {
+		BaseOutbound(string outboundConfig) {
 			loadConfig(outboundConfig);
 		}
-		virtual ~BaseHandler = default;
+		virtual ~BaseOutbound = default;
 
-		// Interfaces
+		// Init data structures.
 		virtual void loadConfig(string config) = 0;
+
+		// InboundThread calls this function. Check the mapping between senderId and serverConn, wake up listenThread, and deliver the msg. 
 		virtual void handleMessage(string binaryMessage, string senderId) = 0;
-		virtual void duplicateSenderId(string newSenderId, string oldSenderId) = 0;
-		virtual void removeSenderId(string senderId) = 0;
+
+		// Listen the PIPE. handleMessage will wake up this thread from epoll.
+		// Also listen the connection fileDescriptors.
+		virtual void listenForever(BaseInbound *previousHop) = 0;
+
+		// Inbound.listenForever MUST initialize this field. 
+		fd_t ipcPipeOutboundEnd = -1;
 	};
 
-	struct BaseListener : rlib::noncopyable {
-		BaseListener(string inboundConfig) {
+	struct BaseInbound : rlib::noncopyable {
+		BaseInbound(string inboundConfig) {
 			loadConfig(inboundConfig);
 		}
-		virtual ~BaseListener = default;
+		virtual ~BaseInbound = default;
 
+		// Init data structures.
 		virtual void loadConfig(string config) = 0;
-		virtual void listenForever(BaseHandler *nextHop) = 0;
+
+		// OutboundThread calls this function. Wake up 'listenForever' thread, and send back a message. Outbound provides the senderId.
+		virtual void handleMessage(string binaryMessage, string senderId) = 0;
+
+		// Listen the addr:port in config, for inbound connection.
+		// Also listen the accepted connection fileDescriptors, and listen the PIPE.
+		virtual void listenForever(BaseOutbound *nextHop) = 0;
+
+		// Inbound.listenForever MUST initialize this field. 
+		fd_t ipcPipeInboundEnd = -1;
 	};
+
+	// TODO: PIPE only works on linux epoll. The windows epoll only works on SOCKET. 
+	//       Do this if you would like to support windows. 
 }
 
 #endif
diff --git a/src/protocols/plain.hpp b/src/protocols/plain.hpp
index c2f6b63..b9caa8b 100644
--- a/src/protocols/plain.hpp
+++ b/src/protocols/plain.hpp
@@ -8,7 +8,7 @@
 #include <common.hpp>
 
 namespace Protocols {
-	class PlainInboundListener : public BaseListener {
+	class PlainInbound : public BaseInbound {
 	public:
 		virtual loadConfig(string config) override {
 			auto ar = rlib::string(config).split('@'); // Also works for ipv6.
@@ -18,7 +18,9 @@ namespace Protocols {
 			listenPort = ar[2].as<uint16_t>();
 
 		}
-		virtual listenForever(BaseHandler* nextHop) override {
+		virtual listenForever(BaseOutbound* nextHop) override {
+			std::tie(this->ipcPipeInboundEnd, nextHop->ipcPipeOutboundEnd) = mk_tcp_pipe();
+
 			auto listenFd = rlib::quick_listen(listenAddr, listenPort, true);
 			rlib_defer([&] {close(listenFd);});
 
@@ -34,6 +36,7 @@ namespace Protocols {
 			rlog.info("PlainListener listening [{}]:{} ...", listenAddr, listenPort);
 			while (true) {
 				// ...
+				// epoll
 			}
 
 		}
@@ -43,9 +46,8 @@ namespace Protocols {
 		uint16_t listenPort;
 	};
 
-	using PlainOutboundListener = PlainInboundListener;
 
-	class PlainOutboundHandler {
+	class PlainOutbound : public BaseOutbound {
 
 	};
 }
diff --git a/src/utils.hpp b/src/utils.hpp
index 97313e1..e21938c 100644
--- a/src/utils.hpp
+++ b/src/utils.hpp
@@ -2,6 +2,9 @@
 #define UDP_FORWARDER_DYN_UTILs_HPP_ 1
 
 #include <rlib/sys/os.hpp>
+#include <rlib/sys/sio.hpp>
+#include <thread>
+#include "common.hpp"
 
 #if RLIB_OS_ID == OS_LINUX
 #include <sys/epoll.h>
@@ -33,6 +36,31 @@ inline void epoll_del_fd(fd_t epollFd, fd_t fd) {
         throw std::runtime_error("epoll_ctl failed.");
 }
 
+/*
+#if RLIB_OS_ID == OS_WINDOWS
+#error This code "mkpipe" is POSIX only. You may disable this error while developing on windows. 
+// Even you make pipe working on windows, this code may still crash. because wepoll doesn't support pipe on windows!
+#else
+#include <unistd.h>
+inline auto mkpipe() {
+    int pipefd[2];
+    if(0 != pipe(pipefd))
+        throw std::runtime_error("mkpipe failed.");
+    return std::make_pair(pipefd[0], pipefd[1]);
+}
+#endif
+*/
+
+inline auto mk_tcp_pipe() {
+    fd_t connfd_cli_side, connfd_srv_side;
+    auto listenfd = rlib::quick_listen("::1", TCP_TMP_PORT_NUMBER);
+    auto serverThread = std::thread([&] {
+        connfd_srv_side = rlib::quick_accept(listenfd);
+    });
+    connfd_cli_side = rlib::quick_connect("::1", TCP_TMP_PORT_NUMBER);
+    serverThread.join();
+    return std::make_pair(connfd_cli_side, connfd_srv_side);
+}
 
 
 #endif
-- 
GitLab