本文介绍这个点对点聊天系统的节点设计。
预备展开目录
上一篇文章中介绍了这套系统中的消息结构设计,这一篇文章中将介绍一个节点是怎么实现的。在上一篇文章中,我决定采用异步消息的设计,但为了避免收到一个消息就启动一个线程,这样对操作系统调度有压力,在编写同步控制的时候也会有额外的麻烦,所以我决定参考 Redis 的处理方式。
在正文开始之前,这里想做一个小铺垫,主要是 I2PHelper
这个类,我在里面写了一些工具方法,最主要的有两个:
- @JvmStatic
- fun runThread(name: String? = null, task: () -> Unit) {
- val t = I2PAppThread {
- try {
- task()
- } catch (t: Throwable) {
- logger.error(t) { "Uncaught exception cause thread terminated" }
- }
- }
- t.isDaemon = true
- name?.let { t.name = it }
- t.start()
- }
-
- @JvmStatic
- fun createThreadPool(nThreads: Int): ThreadPoolExecutor =
- ThreadPoolExecutor(
- nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- LinkedBlockingQueue(),
- { I2PAppThread(it).apply { isDaemon = true } },
- ThreadPoolExecutor.CallerRunsPolicy()
- )
这两个工具方法应该是用的最多的。I2P 自己的库中提供了一个叫做 I2PAppThread
的类,他的做用与 Thread
类似,不同之处在于前者提供了 I2P 提供的 OOM 监听器,推荐在使用 I2P 相关资源的时候使用这个线程。所以基于这种推荐,在使用 I2P 相关资源的时候将待执行的操作包装到 I2PAppThread 里,以及让线程池构造 I2PAppThread 实例。
不过据我后来观察,这个应该是作为嵌入式路由时使用的,也就是应用程序自己跑一个 I2P 路由的时候用。如果使用外部路由的话(就像这个库,它要求使用者自己单独运行一个 I2P 路由,运行时将通过 I2CP 连接到这个外部路由),这个 OOM 应该没有什么用,所以之后使用协程代替这部分的代码,应该不会引发什么问题。
会话展开目录
在开始介绍节点实现之前,我想介绍一下这个叫做会话(Session)的设计。
对于一个节点来说,其他节点的信息无外乎两个:与对方节点建立的 I2P 连接和与这个连接相关的状态。前者是节点通讯的基础,后者则是应用程序正常工作所依赖的基础,例如对方节点的验证信息,前文提到但没有实现的各类计时器,还有来自应用程序的各种状态信息(会话上下文)。
会话上下文展开目录
为了尽可能保持通用,与连接相关的状态设计为一个接口:
- interface SessionContext {
- val sessionSource: SessionSource
- var peerInfo: PeerInfo?
-
- val nickname: String?
-
- fun isAuthed(): Boolean = peerInfo != null
-
- fun onAuthAccepted()
-
- fun isAccepted(): Boolean
- }
这里的 SessionSource
是一个枚举:
- enum class SessionSource {
- CLIENT,
- SERVER
- }
当会话来源为 SERVER 的时候,就说明这个会话是我们的 Server 部分创建的,也就是说,这个会话是别人发起的,我们接收的;如果是 CLIENT,就意味着这个会话是我们的 Client 部分创建的,即我们发起,别人接收。这个字段对于这个库和后面 MC 聊天都没什么用,但是有些应用程序可能会依赖我们自己的角色产生不同的行为,这一点可以通过检查会话来源实现。
考虑到我们在 PEX 动作中交换节点信息,与其单独维护一个列表来存储 PeerInfo,我们不妨直接把 PeerInfo 存到每一个会话的上下文中。
关于 NickName,这个是一个后期引入的字段,主要是调试起来太麻烦。调试时节点的地址通常都是随机生成的,于是打印的日志也全部都是随机地址,虽然日志一开始会打印出来 Peer1 的地址是 AAA,peer2 的地址是 BBB,但是这看起来还是太令人头大了。所以便引入了 NickName 字段,在通过认证后,可以从 PeerInfo 里提取用户名等信息作为这个会话的昵称,这样打印日志的时候就可以采用 “昵称(地址)” 的方式,即 “Peer1(AAA)”,这样我们即看到了节点的地址,又看到了这个节点是 Peer1,不会再被随机地址搞得乱七八糟了。
后面还针对验证机制引入了两个字段:一个是判断对方是否通过我方的验证(isAuthed
),另一个是我方的验证是否被对方接收(isAccepted
),并且在收到 AuthAcceptedReply
时将会调用 onAuthAccepted
来进行一些自定义操作,例如允许我方向对方发送消息,或者在认证之后开始一些协商。
这个会话上下文不要求线程安全,这是为了避免后续使用时疏忽大意引发问题。在会话中,所有对上下文的访问将由同步锁保护。虽然在一定程度上会降低性能,但是避免了并发可能带来的问题。
会话展开目录
有了上下文,我们就可以设计会话了。在节点运行的过程中,节点需要做的事情就是接收入站连接,或者发起出站连接,总之这两者的操作都是得到一个 I2PSocket,接下来节点需要初始化一个会话,然后对于这个 Socket 的操作则完全委托给会话处理(因为会话里保存了针对这个连接的状态信息,任何会话外的操作都可能引发状态不同步)。会话的构造函数如下:
- class PeerSession<ContextType : SessionContext>(
- peerInfo: PeerInfo,
- applicationName: String,
- private val json: Json,
- private val socket: I2PSocket,
- private val sessionContext: ContextType,
- private val incomingMessageQueue: BlockingQueue<MessageQueueEntry<ContextType>>,
- private val logger: KLogger,
- private val onSocketClosed: (PeerSession<ContextType>) -> Unit = {},
- )
前两个参数用于创建会话后立刻发送认证请求,并不会持久化保存。后面的 json 对象则是用来序列化(发送)和反序列化(接收)消息的。Socket 则不言而喻,就是前文提到的用于通讯的 I2PSocket,上下文也是如此。这个上下文对象是由节点提供的,按照设计,上下文对于不同来源的连接可以有所区别,但不应当针对具体的连接产生区别。
例如服务器部分收到的连接与客户端发出的连接具有不同的初始状态,但同样是服务器的入站连接,所有节点的入站连接都应当得到相同的上下文。
再后面有一个所有会话共享的入站消息队列。会话中只负责解析入站消息,并不处理。解析后的消息由读取线程放入这个消息队列,然后进入下一轮读取。后面的 logger 实际上是为了保持日志干净设计的,我在设计的时候希望各个类打印日志的时候不要各自为政,而是同一个节点的组件都应该以这个节点的名义打印日志,虽然这会给排查问题带来一些麻烦(现在我们不知道是哪个组件打印的日志了),但通过一些特殊的技巧(例如不同组件打日志时使用唯一的字符串来描述,通过搜索这个字符串便可以找到打印日志的点)可以缓解。这一点在一个进程中运行多个节点进行测试时显得尤为重要。
想想看,你有 10 个叫做 “PeerSession” 的 logger 同时打日志,你知道哪一行是哪个节点打印的吗?
最后有一个简单的回调,将会在 I2PSocket 结束时被调用。无论是用户主动关闭会话,还是会话底层的套接字中断,这个回调都会被调用,因为在调用时,这个会话已经不再具备通信的功能了。在节点中我们使用了一个 HashMap 来存储当前活跃的会话,而会话可能会因为各种各样的原因随时中断,因此将清理过期会话的代码交给会话自己来执行,可以降低编码的复杂度。
当然,我们可以拉一个线程定期清理失效的会话,但这样毕竟还是占用了一个线程。不够优雅。
当然,只有这些还不够。第一点我们就会发现,I2PSocket 好像直接用不太方便啊。我们想按行收发文本数据,那最方便的还得是 BufferedWriter 和 BufferedReader。所以还要初始化这两个东西:
- private val input = socket.inputStream.bufferedReader()
- private val output = socket.outputStream.bufferedWriter()
有了这些,就可以执行我们的构造函数了(在 Kotlin 中,构造函数的形参是写在类声明上的,而实际的过程则由 init 函数体来承载):
- init {
- runThread {
- try {
- input.useLines { lines ->
- // read line one by one forever
- lines.forEach { base64Line ->
- val jsonText = try {
- Base64.getDecoder().decode(base64Line).decodeToString()
- } catch (e: IllegalArgumentException) {
- sendError(InvalidIncomingDataError("Malformed base64: ${e.message}"))
- return@forEach
- }
- val message = try {
- json.decodeFromString<P2PMessage>(jsonText)
- } catch (e: SerializationException) {
- sendError(InvalidIncomingDataError("Error when deserializing json: ${e.message}"))
- close("Malformed data")
- return@forEach
- } catch (e: IllegalArgumentException) {
- sendError(InvalidIncomingDataError("Malformed json: ${e.message}"))
- close("Malformed data")
- return@forEach
- }
- // parsing ok, add to queue
- incomingMessageQueue.add(MessageQueueEntry(this, message))
- }
- }
- } catch (_: IOException) {
- logger.warn { "Socket from ${getDisplayName()} closed due to IO exception" }
- } catch (t: Throwable) {
- logger.warn(t) { "Socket from ${getDisplayName()} closed!" }
- }
- // something happened, close the socket
- close()
- }
- // send auth
- sendMessage(P2PMessage.createRequest(AuthRequest(applicationName, peerInfo)))
- }
这里我们一上来就拉起一个线程,它的工作很简单:从输入流中读取一行,尝试解析。解析成功就加入消息队列,解析失败则返回一个 Error。这里并没有使用循环,而是利用了 Kotlin 对于 Java 类型的扩展,例如 Reader.useLines
会将输入流转化成一个序列(Sequence
,类似于 Java 8 引入的 Stream),我们对这个序列调用 foreach
,就相当于我们写一个循环,在循环内读一行的操作了。如果底层的套接字被关闭,则 foreach 会抛出异常,我们在最外层捕获异常即可。当然,捕获到异常之后也很好处理:打日志,关闭会话。
这里要介绍一下那个 MessageQueueEntry
:
- data class MessageQueueEntry<ContextType : SessionContext>(
- val session: PeerSession<ContextType>,
- val message: P2PMessage
- )
本质上非常简单:这个消息来自于哪个会话,以及这个消息是什么。
在拉起读取线程之后,我们立即向对方发送一个认证请求。按照前文设计,我们应该在验证被接受之前阻挡其他消息的发送,但是这个逻辑写起来有点麻烦,所以我便寄希望于我们产生其他消息之前,我们的验证请求已经被处理。
如果对方节点有大量消息积压,那这个假设可能会失败,导致我们作为合法的节点被中断连接。这个问题可以通过每个会话一个消息队列来缓解,但是这样就会导致一个会话有两个线程。假设我们在服务器中有 30 个玩家,那这就是 60 个常驻后台的进程。前台还要处理 MC 相关的东西,这对于性能的影响应该不会太小。
当然,你可能注意到这里面用到了一些非常规的函数,例如 close(String)
:
- fun close(reason: String?) {
- synchronized(socket) {
- if (socket.isClosed) return
- reason?.let { sendRequest(ByeRequest(it)) }
- try {
- socket.close()
- input.close()
- output.close()
- } catch (_: IOException) {
- // It won't hurt anything if the close failed.
- // It might be already broken/closed.
- }
- onSocketClosed(this)
- }
- }
-
- override fun close() {
- close(null)
- }
后面的无参数 close 好理解,因为我继承了 AutoCloseable
,但它调用了第一个 close。这个 close 是在关闭前向对方主动发送一个 ByeRequest,还记得上一篇文章吗?我们说,如果能够主动通知对方关闭资源,那就比让对方过一会儿才发现我们已经关闭了要好。对于 MC 来说,服务器告诉客户端 XXX 已经退出了服务器,但我们的 Mod 还认为这个连接依旧可用,这就会给玩家带来困惑。不过话虽这么说,我们的系统并不依赖这个功能。也就是说,如果对方客户端意外崩溃,我们还是会在一段时间后发现问题,继而释放资源,调用关闭回调,清理资源。
当然,你可能还注意到了,这个 sessionContext 是 private 修饰的,应用程序用不到啊,你这上下文不就有跟没有一样了吗?这个也好解决:
- fun <T> useContextSync(block: ContextType.() -> T): T =
- synchronized(sessionContext) {
- block(sessionContext)
- }
只要设计这么一个方法就可以了。方法在执行的时候先获取锁,然后再执行用户的操作,最后把返回值带出来。
最后简单介绍一下写的部分:
- private fun sendMessage(message: P2PMessage) {
- synchronized(socket) {
- if (socket.isClosed) {
- logger.warn { "Sending message to closed socket ${getDisplayName()}" }
- return
- }
- val jsonText = json.encodeToString(message)
- logger.debug { "Send to ${getDisplayName()}: $jsonText" }
- val base64 = Base64.getEncoder().encodeToString(jsonText.encodeToByteArray())
- try {
- output.write(base64)
- if (!base64.endsWith("\n"))
- output.newLine()
- output.flush()
- } catch (e: IOException) {
- logger.warn { "Failed to write socket. Socket ${getDisplayName()} closed!" }
- // failed to send due to I/O errors
- // then close the socket
- close()
- }
- }
- }
发送消息之前先要检查套接字是否已经关闭,关闭的话就直接返回。没有关闭则尝试发送信息,发送成功则万事大吉。发送失败则说明这个套接字已经不可用了,直接调用 close 回收资源。
节点展开目录
那么,会话设计好了,就可以开始看节点了。
入站消息处理展开目录
用户需要通过节点这个对象来实现数据的收发,发当然好办,我们提供一个发送的函数,需要发消息的时候调用就好了,可是收有点麻烦:你不知道消息什么时候来。这个可以通过回调来解决:
- interface IncomingMessageHandler<ContextType : SessionContext> {
- fun handle(
- peer: Peer<ContextType>,
- session: PeerSession<ContextType>,
- messageId: UUID,
- payload: MessagePayload,
- threadPool: ThreadPoolExecutor
- )
- }
这是一个处理器,对于每一个入站消息,节点将调用这个 handle 方法进行处理。前几个参数都好理解,最后一个线程池的作用是允许处理消息时采用异步计算提高性能,例如使用 CompletableFuture 等。在设计中,一个消息应当被精确处理一次,所以注册一大堆 IncomingMessageHandler 不合适,所以我们的 Peer 只接收一个 Handler,由 handler 根据消息类型去处理。当然,可以让用户自己写,但是考虑到我们已经内置了一些消息,我们也应该针对这些消息写一个可拓展的处理器:
- abstract class MessagePayloadHandler<ContextType : SessionContext, PayloadType : MessagePayload>(
- internal val payloadType: KClass<PayloadType>
- ) {
-
- fun isInterface(): Boolean = payloadType.java.isInterface
-
- fun isAbstractClass(): Boolean = payloadType.isAbstract
-
- fun canHandle(value: MessagePayload): Boolean = payloadType.isInstance(value)
-
- private fun cast(value: MessagePayload): PayloadType = payloadType.cast(value)
-
- fun handle(
- peer: Peer<ContextType>,
- session: PeerSession<ContextType>,
- messageId: UUID,
- payload: MessagePayload,
- threadPool: ThreadPoolExecutor
- ): MessagePayload? = handleTyped(peer, session, messageId, cast(payload), threadPool)
-
- protected abstract fun handleTyped(
- peer: Peer<ContextType>,
- session: PeerSession<ContextType>,
- messageId: UUID,
- payload: PayloadType,
- threadPool: ThreadPoolExecutor
- ): MessagePayload?
- }
IncomingMessageHandler 不能注册多个,但是我们可以自己拓展一个 MessagePayloadHandler,针对特定的 PayloadType 进行处理,然后我们把这些 Handler 收集起来,根据消息的运行时类型来分发处理。你会发现上面要求了一个 internal val payloadType: KClass<PayloadType>
,这是因为泛型发生在类声明,而 Kotlin 的 reified 只能用在函数上。所以不得已,为了要后期能够将抽象的 MessagePayload 转化成具体的实现类型,只能在构造时带一个 KClass 用来在运行时判断(canHandle
)、转换(cast
)。当然,不一定非要限制具体类型,用户也可以针对一类消息进行处理,比如说接收 ErrorMessagePayload。所以这里还引入了对于接口和抽象类的判断(isInterface
和 isAbstractClass
)。在分发消息时,先搜索具体的实现(不是接口,不是抽象类),然后搜索抽象类,最后搜索接口。
能够使用上述 MessagePayloadHandler 的 Handler 实现如下:
- class GeneralIncomingMessageHandler<ContextType : SessionContext>(
- /**
- * Search class, if not found then abstract class, if not found then interface.
- * */
- vararg _handlers: MessagePayloadHandler<ContextType, *>
- ) : IncomingMessageHandler<ContextType> {
- // ......
- }
这个 Handler 接收一个列表,这个列表我们不能直接用,在初始化的时候需要进行一些检查:
- init {
- val set = mutableSetOf<KClass<*>>()
- _handlers.forEach {
- require(!set.contains(it.payloadType)) {
- "Duplicated handlers for payload ${it.payloadType.jvmName}. Each type of payload can be only handled by one handler."
- }
- set.add(it.payloadType)
- }
- }
-
- // copy array into a immutable list
- private val handlers = _handlers.toList()
这里要保证针对每一种消息类型,只能有一个 Handler。然后就是重点的 handle 方法:
- override fun handle(
- peer: Peer<ContextType>,
- session: PeerSession<ContextType>,
- messageId: UUID,
- payload: MessagePayload,
- threadPool: ThreadPoolExecutor
- ) {
- // pre-process, for unauthorized messages
- if (session.useContextSync { !isAuthed() }) {
- if (payload !is AuthRequest && payload !is ByeRequest) {
- // reject all messages other than auth request and bye request
- session.sendError(UnauthorizedError(messageId))
- session.close("Unauthorized session")
- return // stop processing
- }
- }
-
- // for all bye request, close session
- if (payload is ByeRequest) {
- session.close()
- }
-
- // perform additional check on auth request
- if (payload is AuthRequest) {
- // check application
- // Different applications might have different custom payloads
- if (peer.applicationName != payload.applicationName) {
- // wrong application name, reject
- session.sendError(AuthenticationFailedError("Different application", messageId))
- session.close("Failed to auth: different application")
- return // stop processing
- }
- // check destination
- try {
- // additional check on auth, make sure the dest is correct
- val dest = Destination(payload.peerInfo.info[PeerInfo.INFO_KEY_DEST])
- require(dest == session.getPeerDestination())
- } catch (t: Throwable) {
- session.sendError(AuthenticationFailedError("Destination not match", messageId))
- session.close("Failed to auth: wrong dest")
- return // stop processing if failed
- }
- }
-
- // find normal class, then abstract class, then interface
- val handler = handlers.filterNot { it.isInterface() || it.isAbstractClass() }.find { it.canHandle(payload) }
- ?: handlers.filter { !it.isInterface() && it.isAbstractClass() }.find { it.canHandle(payload) }
- ?: handlers.filter { it.isInterface() }.find { it.canHandle(payload) }
-
- if (handler == null) {
- logger.warn {
- "Message handler not found for type ${payload::class.jvmName}\n" +
- "Message from ${session.getDisplayName()}: $payload"
- }
- if (payload is RequestMessagePayload || payload is ReplyMessagePayload) {
- // send error if the message is request or reply
- session.sendError(UnsupportedMessageError(messageId, payload::class.jvmName))
- }
- return
- }
-
- // handle message
- var reply = handler.handle(peer, session, messageId, payload, threadPool)
- if (payload is RequestMessagePayload && reply == null) {
- // make sure request always get reply
- reply = NoContentReply(messageId)
- }
- if (reply != null) {
- // decide if we should send reply
- val shouldSendReply = when (payload) {
- is RequestMessagePayload -> reply is ReplyMessagePayload || reply is ErrorMessagePayload
- is ReplyMessagePayload -> reply is ErrorMessagePayload
- else -> false
- }
- if (shouldSendReply) {
- session.sendPayload(reply)
- }
- // post-process for auth
- if (payload is AuthRequest) {
- if (reply is ErrorMessagePayload) {
- // auth failed, disconnect
- session.close("Authentication failed")
- } else {
- // auth ok
- session.useContextSync {
- peerInfo = payload.peerInfo
- onAuthAccepted()
- }
- }
- }
- }
- }
整体流程分三部分:预处理、处理、后处理。预处理部分是硬编码的逻辑,例如拒绝未验证的消息,针对 AuthRequest、ByeRequest 等进行额外的检查、处理等。然后针对消息类型搜索对应的 handler,如果没搜到,打印一条日志,并回复给对方一个类似 HTTP 501 Unimplemented 错误,告诉对方本节点无法处理这类消息。处理完成后得到一个 reply。在回复方面没法很好的限制 Handler,只能在代码里进行检查,根据上一篇文章的规则:
- Request 可以返回 Reply 和 Error
- Reply 可以返回 Error
- Error 不接受任何返回
检查通过后则发送返回的消息。此外对于认证失败的认证请求,直接断开连接。认证成功则自动将 PeerInfo 保存到 Context 中,并调用回调。
节点实现展开目录
解决了处理入站消息的问题,我们可以正经开始实现节点了。
实例化一个节点需要如下信息:
- class Peer<ContextType : SessionContext>(
- private val json: Json,
- private val socketManager: I2PSocketManager,
- val applicationName: String,
- private val sessionContextProvider: (SessionSource) -> ContextType,
- private val selfPeerInfoProvider: (Peer<ContextType>) -> PeerInfo,
- private val messageHandler: IncomingMessageHandler<ContextType>,
- private val threadPool: ThreadPoolExecutor,
- private val pexInterval: Long = 10 * 1000,
- private val logger: KLogger = KotlinLogging.logger { },
- private val onSessionSocketClose: (PeerSession<ContextType>) -> Unit = {}
- ) : AutoCloseable {
- // ......
- }
节点用到的参数有点多,首先是 Json 实例,用于解析消息的;然后是一个 I2PSocketManager,主要用于获取 I2P 的入站连接,以及发起出站连接;applicationName 则用于判断 AuthRequest,不同 applicationName 的节点不能相互通过认证;sessionContextProvider 用于根据会话来源创建会话上下文;selfPeerInfoProvider 用于提供本节点的节点信息,用户可以在这里附加额外的信息,处理签名逻辑;messageHandler 就是前文介绍的消息处理器;threadPool 是用于处理耗时操作的线程池,也就是实际传给 Handler 的线程池;pexInterval 决定了多久执行一次 PEX 操作;logger 可以有用户自定义,默认的话是使用类名;onSessionSocketClose 则为用户提供了观测会话结束的能力,除了节点需要用来清理资源之外,用户可以在这里执行额外的操作,例如在聊天框打印消息告诉用户 XXX 已经断开连接。
除此之外,我们还需要初始化一些东西:
- private val incomingMessageQueue: BlockingQueue<MessageQueueEntry<ContextType>> = LinkedBlockingQueue()
- private val serverSocket: I2PServerSocket = socketManager.serverSocket
- private val sessions = ConcurrentHashMap<Destination, PeerSession<ContextType>>()
- private val closedFlag = AtomicBoolean(false)
-
- private val newPeerQueue: BlockingQueue<Pair<() -> Unit, (Throwable) -> Unit>> = LinkedBlockingQueue()
前四个比较好理解,它们分别是:入站消息队列、从 SocketManager 处获取的服务端 Socket、存储所有 Session 的 Map,以及一个记录当前节点是否关闭的标志。存储会话使用 Map 主要是为了快速查找目的地是否有重复,如果要连接的地址已经是一个已知且存活的会话,那么我们就不必重复连接了。
最后一个可能有些费解,为什么新节点队列是一个 lambda 组成的双元组呢?这一点后面再说。
关于实例化的过程,主要是拉起各个工作线程:
- private fun createPeerSession(
- sessionSource: SessionSource,
- socket: I2PSocket
- ): PeerSession<ContextType> = PeerSession(
- selfPeerInfoProvider(this), applicationName, json, socket,
- sessionContextProvider(sessionSource), incomingMessageQueue, logger
- ) { sessions.remove(it.getPeerDestination()); onSessionSocketClose(it) }
-
- init {
- // handle new peers
- runThread {
- // ......
- }
- }
- // handle incoming message
- runThread {
- // .......
- }
- // handle server part
- runThread {
- // ......
- }
- // exchange peer every X minutes
- runThread {
- // ......
- }
- // print self info
- logger.info { "I'm ${getMyB32Address()}" }
- }
这里启动了四个线程。首先是处理新节点的,这些节点的来源是 PEX,交换得到的节点全部放到上面那个很奇怪的队列中,然后这个线程从队列中获取信息,进行连接。但是问题来了:I2P 的隧道有两种连接方式:一种是使用 Destination 直接连接,相当于发快递时写好了地址,直接派送;还有一种方式是通过哈希,类似于你只提供了对方的电话,快递员需要联络对方才能得直具体地址。在 I2P 中,Destination 保存了对方隧道的公钥等信息,通过 Base64 编码之后有 500 多字符,不利于分享(Minecraft 一行消息只能发一二百个字符);而哈希地址则不超过 96 个字符,我们使用的是 52 个字符的 b32 地址,但代价就是连接之前需要向路由查询 Destination,于是便催生了如下两个连接方法:
- fun connect(b32: String) {
- val nameService = I2PAppContext.getGlobalContext().namingService()
- val dest = nameService.lookup(b32) ?: throw UnknownHostException("Host not found: $b32")
- connect(dest)
- }
-
- fun connect(dest: Destination) {
- if (closedFlag.get()) return
- // do not connect to ourselves
- if (dest == getMyDestination()) return
-
- sessions.compute(dest) { _, oldSession ->
- if (oldSession != null && !oldSession.isClosed()) {
- // old still alive, do nothing and return old one
- return@compute oldSession
- }
- // make new connection
- logger.info { "Connecting to peer ${dest.toBase32()}" }
- val socket = socketManager.connect(dest)
- createPeerSession(SessionSource.CLIENT, socket)
- }
- }
第一个 connect 接收 b32 地址,查询成为 Destination 对象,然后交给第二个 connect 方法。后者检查地址是否重复,并进行连接。鉴于这里有两种连接方法,而且一个是字符串,一个是对象,我们不应该将他们混着存到一个 List<Object>
里,这个是可以实现目的的,但并不安全,也不值得提倡。针对这两种 connect 方法,有两种增加新节点的方法:
- fun addNewPeer(
- b32: String,
- errorHandler: (t: Throwable) -> Unit
- ) {
- newPeerQueue.add({ connect(b32) } to errorHandler)
- }
-
- fun addNewPeer(
- dest: Destination,
- errorHandler: (t: Throwable) -> Unit
- ) {
- newPeerQueue.add({ connect(dest) } to errorHandler)
- }
你看,无论哪种地址类型,我们只要在闭包中处理连接的逻辑,最终得到的都是一个抽象的函数,而那个 errorHandler 则提供了用户观测连接失败的能力。我们只要在处理新节点队列的线程中执行这个闭包就可以了:
- // handle new peers
- runThread {
- while (!closedFlag.get()) {
- val (connect, handleError) = newPeerQueue.take()
- threadPool.execute {
- try {
- connect()
- } catch (t: Throwable) {
- handleError(t)
- }
- }
- }
- }
我们依次从队列中取出连接新节点的 Lambda,在线程池中执行,如果捕获到错误,就调用处理错误的回调。
能够处理新节点之后,我们更进一步,开始拉起消息处理的线程:
- // handle incoming message
- runThread {
- // if peer closed, no need to process messages
- while (!closedFlag.get()) {
- val entry = incomingMessageQueue.take()
- // do things sync, ensure first in first handled
- messageHandler.handle(
- this,
- entry.session,
- entry.message.id,
- entry.message.payload,
- threadPool
- )
- }
- }
这里判断了一下节点是否关闭,如果节点已经关闭了,那么就没有必要处理消息了 —— 处理了你也发不出去回复。每次从队列中拿一个消息,拿到之后便调用 handle 进行处理。
按道理来说,这个 handle 应该包裹在 try catch 中,以防止意外的异常提前结束消息处理进程,导致消息积压。不知道怎么搞的,我当时给忘了,以后一定修。
但很奇怪的是,上面的忘了,下面服务端那块又想起来了。不愧是我。
有了处理入站消息的能力,接下来便可以拉起服务端了:
-
- // handle server part
- runThread {
- while (!closedFlag.get()) {
- try {
- val socket = serverSocket.accept() ?: continue
- sessions.compute(socket.peerDestination) { dest, oldSession ->
- if (oldSession != null && !oldSession.isClosed()) {
- // old still alive, kill new one and return old one
- logger.warn { "Duplicated connection from ${dest.toBase32()}" }
- socket.reset()
- return@compute oldSession
- }
- // accept new socket
- createPeerSession(SessionSource.SERVER, socket)
- }
- } catch (e: ConnectException) {
- logger.warn { "Server socket ${getMyB32Address()} closed or interrupted!" }
- break
- } catch (e: I2PException) {
- logger.warn { "I2CP session for ${getMyB32Address()} closed or broken!" }
- break
- } catch (_: Throwable) {
- }
- }
- logger.warn { "Peer stop handle incoming connections: ${getMyB32Address()}" }
- close()
- }
在服务端接收到入站套接字之后,对来源进行了检查:如果我们已经有了对应的出站连接,那么为了避免重复,应该拒绝这个入站连接。但是有一种比较极端的情况:
- 我们发起出站连接,对方接受了连接(accept 返回了非 null 值)
- 我们知道对方接收了连接(connect 成功返回),因此我们创建了一个会话(称为出站会话)
- 对方节点的服务端线程被暂停,对方的 PEX 开始工作,对方借由 PEX 得知了我们的存在,由于对方节点仅接收了我们的套接字,尚未创建会话,对方向我们发起了入站连接
- 我们接受了对方的入站连接,但发现已经重复了,我们立刻断开连接(
socket.reset()
),但此时对方的 connect 成功返回,对方创建了一个会话(称为入站会话) - 对方节点的服务端线程恢复执行,发现了 PEX 创建的会话,对方认定我们的出站会话与上一步的入站会话冲突,对方也 reset 了我们的套接字
- 最后,我们的出站连接被中断,对方拥有的入站会话已经被关闭。
这个情况导致了双方节点尝试互相连接对方,但谁也没连上谁。不过从现实情况来讲,要达成这种条件也需要运气,我姑且不认为这是个问题。
最后,当然就是拉起那个 PEX 进程:
- // exchange peer every X minutes
- runThread {
- var lastActionTime = System.currentTimeMillis()
- while (!closedFlag.get()) {
- try {
- while (System.currentTimeMillis() - lastActionTime < pexInterval) Thread.sleep(1000)
- val request = PEXRequest(dumpKnownPeer())
- logger.debug { "Doing PEX for peer ${getMyB32Address()}" }
- sessions.filter { it.value.useContextSync { isAuthed() && isAccepted() } && !it.value.isClosed() }
- .forEach { threadPool.execute { it.value.sendRequest(request) } }
- lastActionTime = System.currentTimeMillis()
- } catch (_: InterruptedException) {
- } catch (t: Throwable) {
- logger.warn(t) { "Failed to PEX" }
- lastActionTime = System.currentTimeMillis()
- }
- }
- logger.warn { "Stop PEX due to peer ${getMyB32Address()} closed" }
- }
这里就是一个简单的循环,每隔一段时间对于经过验证且没有关闭的会话发出 PEX 请求。
实例初始化完成之后,下面介绍一些方法。前面介绍过的 addNewPeer 和 connect 就不介绍了,已经说过了。由于我们本身不处理 PEX,所以需要 Handler 通过传入的 peer 参数来获取,这样一来我们就需要对应的方法:
- fun dumpKnownPeer(): List<PeerInfo> =
- sessions.mapNotNull { it.value.useContextSync { peerInfo } }.toList()
-
- fun dumpSessions(): List<PeerSession<ContextType>> = sessions.values.toList()
dumpKnownPeer 将我们已知的所有会话的 PeerInfo 拿出来,而 dumpSessions 则直接列出我们的会话。
以及,除了 connect,我们还需要断开连接的能力:
- fun disconnect(reason: String = "Disconnect by user", filter: (PeerSession<ContextType>) -> Boolean) {
- if (closedFlag.get()) return
- sessions.forEach { if (filter(it.value)) it.value.close(reason) }
- }
还有发送消息的能力:
- fun sendRequest(message: RequestMessagePayload, filter: (PeerSession<ContextType>) -> Boolean) {
- if (closedFlag.get()) return
- sessions.forEach { if (filter(it.value)) threadPool.execute { it.value.sendRequest(message) } }
- }
这里我们只能发送请求,因为上一篇文章中规定,所有动作必须由请求发起。
当然,最后还有关闭:
- fun isClosed(): Boolean = closedFlag.get()
-
- override fun close() {
- closedFlag.set(true)
- sessions.forEach { it.value.close("Peer closing") }
- serverSocket.close()
- }
总结展开目录
本文介绍了一大堆东西,他们是 i2p-p2p-chat 的核心内容。写起来我就觉得比较乱,读起来可能就会更乱吧。
总之这一篇文章先到此为止。在下一篇文章中我将介绍一些常用的消息处理器,比如针对 PEX 请求的处理,以及由此产生的工具方法,还有 Peer 的使用方法展示,以及最重要的,将这个系列过渡到 Minecraft 部分。
感谢大家的耐心阅读,我们下一篇文章见。
- 全文完 -

【代码札记】从零开始的点对点聊天系统 P2 由 天空 Blond 采用 知识共享 署名 - 非商业性使用 - 相同方式共享 4.0 国际 许可协议进行许可。
本许可协议授权之外的使用权限可以从 https://skyblond.info/about.html 处获得。