愿知识带给你我勇气,用那温暖的光,驱散冰冷的黑夜。

MENU

【代码札记】从零开始的点对点聊天系统 P2

December 3, 2022 • Read: 129 • 瞎折腾

本文介绍这个点对点聊天系统的节点设计。

预备

上一篇文章中介绍了这套系统中的消息结构设计,这一篇文章中将介绍一个节点是怎么实现的。在上一篇文章中,我决定采用异步消息的设计,但为了避免收到一个消息就启动一个线程,这样对操作系统调度有压力,在编写同步控制的时候也会有额外的麻烦,所以我决定参考Redis的处理方式。

在正文开始之前,这里想做一个小铺垫,主要是I2PHelper这个类,我在里面写了一些工具方法,最主要的有两个:

    @JvmStatic
    fun runThread(name: String? = null, task: () -> Unit) {
        val t = I2PAppThread {
            try {
                task()
            } catch (t: Throwable) {
                logger.error(t) { "Uncaught exception cause thread terminated" }
            }
        }
        t.isDaemon = true
        name?.let { t.name = it }
        t.start()
    }

    @JvmStatic
    fun createThreadPool(nThreads: Int): ThreadPoolExecutor =
        ThreadPoolExecutor(
            nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            LinkedBlockingQueue(),
            { I2PAppThread(it).apply { isDaemon = true } },
            ThreadPoolExecutor.CallerRunsPolicy()
        )

这两个工具方法应该是用的最多的。I2P自己的库中提供了一个叫做I2PAppThread的类,他的做用与Thread类似,不同之处在于前者提供了I2P提供的OOM监听器,推荐在使用I2P相关资源的时候使用这个线程。所以基于这种推荐,在使用I2P相关资源的时候将待执行的操作包装到I2PAppThread里,以及让线程池构造I2PAppThread实例。

不过据我后来观察,这个应该是作为嵌入式路由时使用的,也就是应用程序自己跑一个I2P路由的时候用。如果使用外部路由的话(就像这个库,它要求使用者自己单独运行一个I2P路由,运行时将通过I2CP连接到这个外部路由),这个OOM应该没有什么用,所以之后使用协程代替这部分的代码,应该不会引发什么问题。

会话

在开始介绍节点实现之前,我想介绍一下这个叫做会话(Session)的设计。

对于一个节点来说,其他节点的信息无外乎两个:与对方节点建立的I2P连接和与这个连接相关的状态。前者是节点通讯的基础,后者则是应用程序正常工作所依赖的基础,例如对方节点的验证信息,前文提到但没有实现的各类计时器,还有来自应用程序的各种状态信息(会话上下文)。

会话上下文

为了尽可能保持通用,与连接相关的状态设计为一个接口:

interface SessionContext {
    val sessionSource: SessionSource
    var peerInfo: PeerInfo?

    val nickname: String?

    fun isAuthed(): Boolean = peerInfo != null

    fun onAuthAccepted()

    fun isAccepted(): Boolean
}

这里的SessionSource是一个枚举:

enum class SessionSource {
    CLIENT,
    SERVER
}

当会话来源为SERVER的时候,就说明这个会话是我们的Server部分创建的,也就是说,这个会话是别人发起的,我们接收的;如果是CLIENT,就意味着这个会话是我们的Client部分创建的,即我们发起,别人接收。这个字段对于这个库和后面MC聊天都没什么用,但是有些应用程序可能会依赖我们自己的角色产生不同的行为,这一点可以通过检查会话来源实现。

考虑到我们在PEX动作中交换节点信息,与其单独维护一个列表来存储PeerInfo,我们不妨直接把PeerInfo存到每一个会话的上下文中。

关于NickName,这个是一个后期引入的字段,主要是调试起来太麻烦。调试时节点的地址通常都是随机生成的,于是打印的日志也全部都是随机地址,虽然日志一开始会打印出来Peer1的地址是AAA,peer2的地址是BBB,但是这看起来还是太令人头大了。所以便引入了NickName字段,在通过认证后,可以从PeerInfo里提取用户名等信息作为这个会话的昵称,这样打印日志的时候就可以采用“昵称(地址)”的方式,即“Peer1(AAA)”,这样我们即看到了节点的地址,又看到了这个节点是Peer1,不会再被随机地址搞得乱七八糟了。

后面还针对验证机制引入了两个字段:一个是判断对方是否通过我方的验证(isAuthed),另一个是我方的验证是否被对方接收(isAccepted),并且在收到AuthAcceptedReply时将会调用onAuthAccepted来进行一些自定义操作,例如允许我方向对方发送消息,或者在认证之后开始一些协商。

这个会话上下文不要求线程安全,这是为了避免后续使用时疏忽大意引发问题。在会话中,所有对上下文的访问将由同步锁保护。虽然在一定程度上会降低性能,但是避免了并发可能带来的问题。

会话

有了上下文,我们就可以设计会话了。在节点运行的过程中,节点需要做的事情就是接收入站连接,或者发起出站连接,总之这两者的操作都是得到一个I2PSocket,接下来节点需要初始化一个会话,然后对于这个Socket的操作则完全委托给会话处理(因为会话里保存了针对这个连接的状态信息,任何会话外的操作都可能引发状态不同步)。会话的构造函数如下:

class PeerSession<ContextType : SessionContext>(
    peerInfo: PeerInfo,
    applicationName: String,
    private val json: Json,
    private val socket: I2PSocket,
    private val sessionContext: ContextType,
    private val incomingMessageQueue: BlockingQueue<MessageQueueEntry<ContextType>>,
    private val logger: KLogger,
    private val onSocketClosed: (PeerSession<ContextType>) -> Unit = {},
)

前两个参数用于创建会话后立刻发送认证请求,并不会持久化保存。后面的json对象则是用来序列化(发送)和反序列化(接收)消息的。Socket则不言而喻,就是前文提到的用于通讯的I2PSocket,上下文也是如此。这个上下文对象是由节点提供的,按照设计,上下文对于不同来源的连接可以有所区别,但不应当针对具体的连接产生区别。

例如服务器部分收到的连接与客户端发出的连接具有不同的初始状态,但同样是服务器的入站连接,所有节点的入站连接都应当得到相同的上下文。

再后面有一个所有会话共享的入站消息队列。会话中只负责解析入站消息,并不处理。解析后的消息由读取线程放入这个消息队列,然后进入下一轮读取。后面的logger实际上是为了保持日志干净设计的,我在设计的时候希望各个类打印日志的时候不要各自为政,而是同一个节点的组件都应该以这个节点的名义打印日志,虽然这会给排查问题带来一些麻烦(现在我们不知道是哪个组件打印的日志了),但通过一些特殊的技巧(例如不同组件打日志时使用唯一的字符串来描述,通过搜索这个字符串便可以找到打印日志的点)可以缓解。这一点在一个进程中运行多个节点进行测试时显得尤为重要。

想想看,你有10个叫做“PeerSession”的logger同时打日志,你知道哪一行是哪个节点打印的吗?

最后有一个简单的回调,将会在I2PSocket结束时被调用。无论是用户主动关闭会话,还是会话底层的套接字中断,这个回调都会被调用,因为在调用时,这个会话已经不再具备通信的功能了。在节点中我们使用了一个HashMap来存储当前活跃的会话,而会话可能会因为各种各样的原因随时中断,因此将清理过期会话的代码交给会话自己来执行,可以降低编码的复杂度。

当然,我们可以拉一个线程定期清理失效的会话,但这样毕竟还是占用了一个线程。不够优雅。

当然,只有这些还不够。第一点我们就会发现,I2PSocket好像直接用不太方便啊。我们想按行收发文本数据,那最方便的还得是BufferedWriter和BufferedReader。所以还要初始化这两个东西:

    private val input = socket.inputStream.bufferedReader()
    private val output = socket.outputStream.bufferedWriter()

有了这些,就可以执行我们的构造函数了(在Kotlin中,构造函数的形参是写在类声明上的,而实际的过程则由init函数体来承载):

init {
        runThread {
            try {
                input.useLines { lines ->
                    // read line one by one forever
                    lines.forEach { base64Line ->
                        val jsonText = try {
                            Base64.getDecoder().decode(base64Line).decodeToString()
                        } catch (e: IllegalArgumentException) {
                            sendError(InvalidIncomingDataError("Malformed base64: ${e.message}"))
                            return@forEach
                        }
                        val message = try {
                            json.decodeFromString<P2PMessage>(jsonText)
                        } catch (e: SerializationException) {
                            sendError(InvalidIncomingDataError("Error when deserializing json: ${e.message}"))
                            close("Malformed data")
                            return@forEach
                        } catch (e: IllegalArgumentException) {
                            sendError(InvalidIncomingDataError("Malformed json: ${e.message}"))
                            close("Malformed data")
                            return@forEach
                        }
                        // parsing ok, add to queue
                        incomingMessageQueue.add(MessageQueueEntry(this, message))
                    }
                }
            } catch (_: IOException) {
                logger.warn { "Socket from ${getDisplayName()} closed due to IO exception" }
            } catch (t: Throwable) {
                logger.warn(t) { "Socket from ${getDisplayName()} closed!" }
            }
            // something happened, close the socket
            close()
        }
        // send auth
        sendMessage(P2PMessage.createRequest(AuthRequest(applicationName, peerInfo)))
    }

这里我们一上来就拉起一个线程,它的工作很简单:从输入流中读取一行,尝试解析。解析成功就加入消息队列,解析失败则返回一个Error。这里并没有使用循环,而是利用了Kotlin对于Java类型的扩展,例如Reader.useLines会将输入流转化成一个序列(Sequence,类似于Java 8引入的Stream),我们对这个序列调用foreach,就相当于我们写一个循环,在循环内读一行的操作了。如果底层的套接字被关闭,则foreach会抛出异常,我们在最外层捕获异常即可。当然,捕获到异常之后也很好处理:打日志,关闭会话。

这里要介绍一下那个MessageQueueEntry

data class MessageQueueEntry<ContextType : SessionContext>(
    val session: PeerSession<ContextType>,
    val message: P2PMessage
)

本质上非常简单:这个消息来自于哪个会话,以及这个消息是什么。

在拉起读取线程之后,我们立即向对方发送一个认证请求。按照前文设计,我们应该在验证被接受之前阻挡其他消息的发送,但是这个逻辑写起来有点麻烦,所以我便寄希望于我们产生其他消息之前,我们的验证请求已经被处理。

如果对方节点有大量消息积压,那这个假设可能会失败,导致我们作为合法的节点被中断连接。这个问题可以通过每个会话一个消息队列来缓解,但是这样就会导致一个会话有两个线程。假设我们在服务器中有30个玩家,那这就是60个常驻后台的进程。前台还要处理MC相关的东西,这对于性能的影响应该不会太小。

当然,你可能注意到这里面用到了一些非常规的函数,例如close(String)

    fun close(reason: String?) {
        synchronized(socket) {
            if (socket.isClosed) return
            reason?.let { sendRequest(ByeRequest(it)) }
            try {
                socket.close()
                input.close()
                output.close()
            } catch (_: IOException) {
                // It won't hurt anything if the close failed.
                // It might be already broken/closed.
            }
            onSocketClosed(this)
        }
    }

    override fun close() {
        close(null)
    }

后面的无参数close好理解,因为我继承了AutoCloseable,但它调用了第一个close。这个close是在关闭前向对方主动发送一个ByeRequest,还记得上一篇文章吗?我们说,如果能够主动通知对方关闭资源,那就比让对方过一会儿才发现我们已经关闭了要好。对于MC来说,服务器告诉客户端XXX已经退出了服务器,但我们的Mod还认为这个连接依旧可用,这就会给玩家带来困惑。不过话虽这么说,我们的系统并不依赖这个功能。也就是说,如果对方客户端意外崩溃,我们还是会在一段时间后发现问题,继而释放资源,调用关闭回调,清理资源。

当然,你可能还注意到了,这个sessionContext是private修饰的,应用程序用不到啊,你这上下文不就有跟没有一样了吗?这个也好解决:

    fun <T> useContextSync(block: ContextType.() -> T): T =
        synchronized(sessionContext) {
            block(sessionContext)
        }

只要设计这么一个方法就可以了。方法在执行的时候先获取锁,然后再执行用户的操作,最后把返回值带出来。

最后简单介绍一下写的部分:

    private fun sendMessage(message: P2PMessage) {
        synchronized(socket) {
            if (socket.isClosed) {
                logger.warn { "Sending message to closed socket ${getDisplayName()}" }
                return
            }
            val jsonText = json.encodeToString(message)
            logger.debug { "Send to ${getDisplayName()}: $jsonText" }
            val base64 = Base64.getEncoder().encodeToString(jsonText.encodeToByteArray())
            try {
                output.write(base64)
                if (!base64.endsWith("\n"))
                    output.newLine()
                output.flush()
            } catch (e: IOException) {
                logger.warn { "Failed to write socket. Socket ${getDisplayName()} closed!" }
                // failed to send due to I/O errors
                // then close the socket
                close()
            }
        }
    }

发送消息之前先要检查套接字是否已经关闭,关闭的话就直接返回。没有关闭则尝试发送信息,发送成功则万事大吉。发送失败则说明这个套接字已经不可用了,直接调用close回收资源。

节点

那么,会话设计好了,就可以开始看节点了。

入站消息处理

用户需要通过节点这个对象来实现数据的收发,发当然好办,我们提供一个发送的函数,需要发消息的时候调用就好了,可是收有点麻烦:你不知道消息什么时候来。这个可以通过回调来解决:

interface IncomingMessageHandler<ContextType : SessionContext> {
    fun handle(
        peer: Peer<ContextType>,
        session: PeerSession<ContextType>,
        messageId: UUID,
        payload: MessagePayload,
        threadPool: ThreadPoolExecutor
    )
}

这是一个处理器,对于每一个入站消息,节点将调用这个handle方法进行处理。前几个参数都好理解,最后一个线程池的作用是允许处理消息时采用异步计算提高性能,例如使用CompletableFuture等。在设计中,一个消息应当被精确处理一次,所以注册一大堆IncomingMessageHandler不合适,所以我们的Peer只接收一个Handler,由handler根据消息类型去处理。当然,可以让用户自己写,但是考虑到我们已经内置了一些消息,我们也应该针对这些消息写一个可拓展的处理器:

abstract class MessagePayloadHandler<ContextType : SessionContext, PayloadType : MessagePayload>(
    internal val payloadType: KClass<PayloadType>
) {

    fun isInterface(): Boolean = payloadType.java.isInterface

    fun isAbstractClass(): Boolean = payloadType.isAbstract

    fun canHandle(value: MessagePayload): Boolean = payloadType.isInstance(value)

    private fun cast(value: MessagePayload): PayloadType = payloadType.cast(value)

    fun handle(
        peer: Peer<ContextType>,
        session: PeerSession<ContextType>,
        messageId: UUID,
        payload: MessagePayload,
        threadPool: ThreadPoolExecutor
    ): MessagePayload? = handleTyped(peer, session, messageId, cast(payload), threadPool)

    protected abstract fun handleTyped(
        peer: Peer<ContextType>,
        session: PeerSession<ContextType>,
        messageId: UUID,
        payload: PayloadType,
        threadPool: ThreadPoolExecutor
    ): MessagePayload?
}

IncomingMessageHandler不能注册多个,但是我们可以自己拓展一个MessagePayloadHandler,针对特定的PayloadType进行处理,然后我们把这些Handler收集起来,根据消息的运行时类型来分发处理。你会发现上面要求了一个internal val payloadType: KClass<PayloadType>,这是因为泛型发生在类声明,而Kotlin的reified只能用在函数上。所以不得已,为了要后期能够将抽象的MessagePayload转化成具体的实现类型,只能在构造时带一个KClass用来在运行时判断(canHandle)、转换(cast)。当然,不一定非要限制具体类型,用户也可以针对一类消息进行处理,比如说接收ErrorMessagePayload。所以这里还引入了对于接口和抽象类的判断(isInterfaceisAbstractClass)。在分发消息时,先搜索具体的实现(不是接口,不是抽象类),然后搜索抽象类,最后搜索接口。

能够使用上述MessagePayloadHandler的Handler实现如下:

class GeneralIncomingMessageHandler<ContextType : SessionContext>(
    /**
     * Search class, if not found then abstract class, if not found then interface.
     * */
    vararg _handlers: MessagePayloadHandler<ContextType, *>
) : IncomingMessageHandler<ContextType> {
    // ......
}

这个Handler接收一个列表,这个列表我们不能直接用,在初始化的时候需要进行一些检查:

    init {
        val set = mutableSetOf<KClass<*>>()
        _handlers.forEach {
            require(!set.contains(it.payloadType)) {
                "Duplicated handlers for payload ${it.payloadType.jvmName}. Each type of payload can be only handled by one handler."
            }
            set.add(it.payloadType)
        }
    }

    // copy array into a immutable list
    private val handlers = _handlers.toList()

这里要保证针对每一种消息类型,只能有一个Handler。然后就是重点的handle方法:

    override fun handle(
        peer: Peer<ContextType>,
        session: PeerSession<ContextType>,
        messageId: UUID,
        payload: MessagePayload,
        threadPool: ThreadPoolExecutor
    ) {
        // pre-process, for unauthorized messages
        if (session.useContextSync { !isAuthed() }) {
            if (payload !is AuthRequest && payload !is ByeRequest) {
                // reject all messages other than auth request and bye request
                session.sendError(UnauthorizedError(messageId))
                session.close("Unauthorized session")
                return // stop processing
            }
        }

        // for all bye request, close session
        if (payload is ByeRequest) {
            session.close()
        }

        // perform additional check on auth request
        if (payload is AuthRequest) {
            // check application
            // Different applications might have different custom payloads
            if (peer.applicationName != payload.applicationName) {
                // wrong application name, reject
                session.sendError(AuthenticationFailedError("Different application", messageId))
                session.close("Failed to auth: different application")
                return // stop processing
            }
            // check destination
            try {
                // additional check on auth, make sure the dest is correct
                val dest = Destination(payload.peerInfo.info[PeerInfo.INFO_KEY_DEST])
                require(dest == session.getPeerDestination())
            } catch (t: Throwable) {
                session.sendError(AuthenticationFailedError("Destination not match", messageId))
                session.close("Failed to auth: wrong dest")
                return // stop processing if failed
            }
        }

        // find normal class, then abstract class, then interface
        val handler = handlers.filterNot { it.isInterface() || it.isAbstractClass() }.find { it.canHandle(payload) }
            ?: handlers.filter { !it.isInterface() && it.isAbstractClass() }.find { it.canHandle(payload) }
            ?: handlers.filter { it.isInterface() }.find { it.canHandle(payload) }

        if (handler == null) {
            logger.warn {
                "Message handler not found for type ${payload::class.jvmName}\n" +
                        "Message from ${session.getDisplayName()}: $payload"
            }
            if (payload is RequestMessagePayload || payload is ReplyMessagePayload) {
                // send error if the message is request or reply
                session.sendError(UnsupportedMessageError(messageId, payload::class.jvmName))
            }
            return
        }

        // handle message
        var reply = handler.handle(peer, session, messageId, payload, threadPool)
        if (payload is RequestMessagePayload && reply == null) {
            // make sure request always get reply
            reply = NoContentReply(messageId)
        }
        if (reply != null) {
            // decide if we should send reply
            val shouldSendReply = when (payload) {
                is RequestMessagePayload -> reply is ReplyMessagePayload || reply is ErrorMessagePayload
                is ReplyMessagePayload -> reply is ErrorMessagePayload
                else -> false
            }
            if (shouldSendReply) {
                session.sendPayload(reply)
            }
            // post-process for auth
            if (payload is AuthRequest) {
                if (reply is ErrorMessagePayload) {
                    // auth failed, disconnect
                    session.close("Authentication failed")
                } else {
                    // auth ok
                    session.useContextSync {
                        peerInfo = payload.peerInfo
                        onAuthAccepted()
                    }
                }
            }
        }
    }

整体流程分三部分:预处理、处理、后处理。预处理部分是硬编码的逻辑,例如拒绝未验证的消息,针对AuthRequest、ByeRequest等进行额外的检查、处理等。然后针对消息类型搜索对应的handler,如果没搜到,打印一条日志,并回复给对方一个类似HTTP 501 Unimplemented错误,告诉对方本节点无法处理这类消息。处理完成后得到一个reply。在回复方面没法很好的限制Handler,只能在代码里进行检查,根据上一篇文章的规则:

  • Request可以返回Reply和Error
  • Reply可以返回Error
  • Error不接受任何返回

检查通过后则发送返回的消息。此外对于认证失败的认证请求,直接断开连接。认证成功则自动将PeerInfo保存到Context中,并调用回调。

节点实现

解决了处理入站消息的问题,我们可以正经开始实现节点了。

实例化一个节点需要如下信息:

class Peer<ContextType : SessionContext>(
    private val json: Json,
    private val socketManager: I2PSocketManager,
    val applicationName: String,
    private val sessionContextProvider: (SessionSource) -> ContextType,
    private val selfPeerInfoProvider: (Peer<ContextType>) -> PeerInfo,
    private val messageHandler: IncomingMessageHandler<ContextType>,
    private val threadPool: ThreadPoolExecutor,
    private val pexInterval: Long = 10 * 1000,
    private val logger: KLogger = KotlinLogging.logger { },
    private val onSessionSocketClose: (PeerSession<ContextType>) -> Unit = {}
) : AutoCloseable {
    // ......
}

节点用到的参数有点多,首先是Json实例,用于解析消息的;然后是一个I2PSocketManager,主要用于获取I2P的入站连接,以及发起出站连接;applicationName则用于判断AuthRequest,不同applicationName的节点不能相互通过认证;sessionContextProvider用于根据会话来源创建会话上下文;selfPeerInfoProvider用于提供本节点的节点信息,用户可以在这里附加额外的信息,处理签名逻辑;messageHandler就是前文介绍的消息处理器;threadPool是用于处理耗时操作的线程池,也就是实际传给Handler的线程池;pexInterval决定了多久执行一次PEX操作;logger可以有用户自定义,默认的话是使用类名;onSessionSocketClose则为用户提供了观测会话结束的能力,除了节点需要用来清理资源之外,用户可以在这里执行额外的操作,例如在聊天框打印消息告诉用户XXX已经断开连接。

除此之外,我们还需要初始化一些东西:

    private val incomingMessageQueue: BlockingQueue<MessageQueueEntry<ContextType>> = LinkedBlockingQueue()
    private val serverSocket: I2PServerSocket = socketManager.serverSocket
    private val sessions = ConcurrentHashMap<Destination, PeerSession<ContextType>>()
    private val closedFlag = AtomicBoolean(false)

    private val newPeerQueue: BlockingQueue<Pair<() -> Unit, (Throwable) -> Unit>> = LinkedBlockingQueue()

前四个比较好理解,它们分别是:入站消息队列、从SocketManager处获取的服务端Socket、存储所有Session的Map,以及一个记录当前节点是否关闭的标志。存储会话使用Map主要是为了快速查找目的地是否有重复,如果要连接的地址已经是一个已知且存活的会话,那么我们就不必重复连接了。

最后一个可能有些费解,为什么新节点队列是一个lambda组成的双元组呢?这一点后面再说。

关于实例化的过程,主要是拉起各个工作线程:

    private fun createPeerSession(
        sessionSource: SessionSource,
        socket: I2PSocket
    ): PeerSession<ContextType> = PeerSession(
        selfPeerInfoProvider(this), applicationName, json, socket,
        sessionContextProvider(sessionSource), incomingMessageQueue, logger
    ) { sessions.remove(it.getPeerDestination()); onSessionSocketClose(it) }

    init {
        // handle new peers
        runThread {
            // ......
            }
        }
        // handle incoming message
        runThread {
            // .......
        }
        // handle server part
        runThread {
            // ......
        }
        // exchange peer every X minutes
        runThread {
            // ......
        }
        // print self info
        logger.info { "I'm ${getMyB32Address()}" }
    }

这里启动了四个线程。首先是处理新节点的,这些节点的来源是PEX,交换得到的节点全部放到上面那个很奇怪的队列中,然后这个线程从队列中获取信息,进行连接。但是问题来了:I2P的隧道有两种连接方式:一种是使用Destination直接连接,相当于发快递时写好了地址,直接派送;还有一种方式是通过哈希,类似于你只提供了对方的电话,快递员需要联络对方才能得直具体地址。在I2P中,Destination保存了对方隧道的公钥等信息,通过Base64编码之后有500多字符,不利于分享(Minecraft一行消息只能发一二百个字符);而哈希地址则不超过96个字符,我们使用的是52个字符的b32地址,但代价就是连接之前需要向路由查询Destination,于是便催生了如下两个连接方法:

    fun connect(b32: String) {
        val nameService = I2PAppContext.getGlobalContext().namingService()
        val dest = nameService.lookup(b32) ?: throw UnknownHostException("Host not found: $b32")
        connect(dest)
    }

    fun connect(dest: Destination) {
        if (closedFlag.get()) return
        // do not connect to ourselves
        if (dest == getMyDestination()) return

        sessions.compute(dest) { _, oldSession ->
            if (oldSession != null && !oldSession.isClosed()) {
                // old still alive, do nothing and return old one
                return@compute oldSession
            }
            // make new connection
            logger.info { "Connecting to peer ${dest.toBase32()}" }
            val socket = socketManager.connect(dest)
            createPeerSession(SessionSource.CLIENT, socket)
        }
    }

第一个connect接收b32地址,查询成为Destination对象,然后交给第二个connect方法。后者检查地址是否重复,并进行连接。鉴于这里有两种连接方法,而且一个是字符串,一个是对象,我们不应该将他们混着存到一个List<Object>里,这个是可以实现目的的,但并不安全,也不值得提倡。针对这两种connect方法,有两种增加新节点的方法:

    fun addNewPeer(
        b32: String,
        errorHandler: (t: Throwable) -> Unit
    ) {
        newPeerQueue.add({ connect(b32) } to errorHandler)
    }

    fun addNewPeer(
        dest: Destination,
        errorHandler: (t: Throwable) -> Unit
    ) {
        newPeerQueue.add({ connect(dest) } to errorHandler)
    }

你看,无论哪种地址类型,我们只要在闭包中处理连接的逻辑,最终得到的都是一个抽象的函数,而那个errorHandler则提供了用户观测连接失败的能力。我们只要在处理新节点队列的线程中执行这个闭包就可以了:

        // handle new peers
        runThread {
            while (!closedFlag.get()) {
                val (connect, handleError) = newPeerQueue.take()
                threadPool.execute {
                    try {
                        connect()
                    } catch (t: Throwable) {
                        handleError(t)
                    }
                }
            }
        }

我们依次从队列中取出连接新节点的Lambda,在线程池中执行,如果捕获到错误,就调用处理错误的回调。

能够处理新节点之后,我们更进一步,开始拉起消息处理的线程:

        // handle incoming message
        runThread {
            // if peer closed, no need to process messages
            while (!closedFlag.get()) {
                val entry = incomingMessageQueue.take()
                // do things sync, ensure first in first handled
                messageHandler.handle(
                    this,
                    entry.session,
                    entry.message.id,
                    entry.message.payload,
                    threadPool
                )
            }
        }

这里判断了一下节点是否关闭,如果节点已经关闭了,那么就没有必要处理消息了——处理了你也发不出去回复。每次从队列中拿一个消息,拿到之后便调用handle进行处理。

按道理来说,这个handle应该包裹在try catch中,以防止意外的异常提前结束消息处理进程,导致消息积压。不知道怎么搞的,我当时给忘了,以后一定修。

但很奇怪的是,上面的忘了,下面服务端那块又想起来了。不愧是我。

有了处理入站消息的能力,接下来便可以拉起服务端了:


        // handle server part
        runThread {
            while (!closedFlag.get()) {
                try {
                    val socket = serverSocket.accept() ?: continue
                    sessions.compute(socket.peerDestination) { dest, oldSession ->
                        if (oldSession != null && !oldSession.isClosed()) {
                            // old still alive, kill new one and return old one
                            logger.warn { "Duplicated connection from ${dest.toBase32()}" }
                            socket.reset()
                            return@compute oldSession
                        }
                        // accept new socket
                        createPeerSession(SessionSource.SERVER, socket)
                    }
                } catch (e: ConnectException) {
                    logger.warn { "Server socket ${getMyB32Address()} closed or interrupted!" }
                    break
                } catch (e: I2PException) {
                    logger.warn { "I2CP session for ${getMyB32Address()} closed or broken!" }
                    break
                } catch (_: Throwable) {
                }
            }
            logger.warn { "Peer stop handle incoming connections: ${getMyB32Address()}" }
            close()
        }

在服务端接收到入站套接字之后,对来源进行了检查:如果我们已经有了对应的出站连接,那么为了避免重复,应该拒绝这个入站连接。但是有一种比较极端的情况:

  1. 我们发起出站连接,对方接受了连接(accept返回了非null值)
  2. 我们知道对方接收了连接(connect成功返回),因此我们创建了一个会话(称为出站会话)
  3. 对方节点的服务端线程被暂停,对方的PEX开始工作,对方借由PEX得知了我们的存在,由于对方节点仅接收了我们的套接字,尚未创建会话,对方向我们发起了入站连接
  4. 我们接受了对方的入站连接,但发现已经重复了,我们立刻断开连接(socket.reset()),但此时对方的connect成功返回,对方创建了一个会话(称为入站会话)
  5. 对方节点的服务端线程恢复执行,发现了PEX创建的会话,对方认定我们的出站会话与上一步的入站会话冲突,对方也reset了我们的套接字
  6. 最后,我们的出站连接被中断,对方拥有的入站会话已经被关闭。

这个情况导致了双方节点尝试互相连接对方,但谁也没连上谁。不过从现实情况来讲,要达成这种条件也需要运气,我姑且不认为这是个问题。

最后,当然就是拉起那个PEX进程:

        // exchange peer every X minutes
        runThread {
            var lastActionTime = System.currentTimeMillis()
            while (!closedFlag.get()) {
                try {
                    while (System.currentTimeMillis() - lastActionTime < pexInterval) Thread.sleep(1000)
                    val request = PEXRequest(dumpKnownPeer())
                    logger.debug { "Doing PEX for peer ${getMyB32Address()}" }
                    sessions.filter { it.value.useContextSync { isAuthed() && isAccepted() } && !it.value.isClosed() }
                        .forEach { threadPool.execute { it.value.sendRequest(request) } }
                    lastActionTime = System.currentTimeMillis()
                } catch (_: InterruptedException) {
                } catch (t: Throwable) {
                    logger.warn(t) { "Failed to PEX" }
                    lastActionTime = System.currentTimeMillis()
                }
            }
            logger.warn { "Stop PEX due to peer ${getMyB32Address()} closed" }
        }

这里就是一个简单的循环,每隔一段时间对于经过验证且没有关闭的会话发出PEX请求。

实例初始化完成之后,下面介绍一些方法。前面介绍过的addNewPeer和connect就不介绍了,已经说过了。由于我们本身不处理PEX,所以需要Handler通过传入的peer参数来获取,这样一来我们就需要对应的方法:

    fun dumpKnownPeer(): List<PeerInfo> =
        sessions.mapNotNull { it.value.useContextSync { peerInfo } }.toList()

    fun dumpSessions(): List<PeerSession<ContextType>> = sessions.values.toList()

dumpKnownPeer将我们已知的所有会话的PeerInfo拿出来,而dumpSessions则直接列出我们的会话。

以及,除了connect,我们还需要断开连接的能力:

    fun disconnect(reason: String = "Disconnect by user", filter: (PeerSession<ContextType>) -> Boolean) {
        if (closedFlag.get()) return
        sessions.forEach { if (filter(it.value)) it.value.close(reason) }
    }

还有发送消息的能力:

    fun sendRequest(message: RequestMessagePayload, filter: (PeerSession<ContextType>) -> Boolean) {
        if (closedFlag.get()) return
        sessions.forEach { if (filter(it.value)) threadPool.execute { it.value.sendRequest(message) } }
    }

这里我们只能发送请求,因为上一篇文章中规定,所有动作必须由请求发起。

当然,最后还有关闭:

    fun isClosed(): Boolean = closedFlag.get()

    override fun close() {
        closedFlag.set(true)
        sessions.forEach { it.value.close("Peer closing") }
        serverSocket.close()
    }

总结

本文介绍了一大堆东西,他们是i2p-p2p-chat的核心内容。写起来我就觉得比较乱,读起来可能就会更乱吧。

总之这一篇文章先到此为止。在下一篇文章中我将介绍一些常用的消息处理器,比如针对PEX请求的处理,以及由此产生的工具方法,还有Peer的使用方法展示,以及最重要的,将这个系列过渡到Minecraft部分。

感谢大家的耐心阅读,我们下一篇文章见。

-全文完-


知识共享许可协议
【代码札记】从零开始的点对点聊天系统 P2天空 Blond 采用 知识共享 署名 - 非商业性使用 - 相同方式共享 4.0 国际 许可协议进行许可。
本许可协议授权之外的使用权限可以从 https://www.skyblond.info/about.html 处获得。

Archives QR Code
QR Code for this page
Tipping QR Code