愿知识带给你我勇气,用那温暖的光,驱散冰冷的黑夜。

MENU

【代码札记】从零开始的点对点聊天系统 P1

November 29, 2022 • Read: 140 • 瞎折腾

本文介绍这个点对点聊天系统的整体设计。

其实在决定这一篇文章内容的时候犹豫了好久。周末看了Kotlin中文开发者大会,发现了协程这个东西,有点类似于Java即将实现的纤程——一种轻量化的线程,不需要操作系统来进行调度,因此能够节省线程切换引发的开销。看到了这种性能提升,我也想把他引入到这个系统中。但是这样的话又要等好久才能把代码定形,到时候指不定又看到了什么新东西,不断优化代码,最后文章怎么也写不出来。所以最后思来想去,决定先按照现在的样子写文章,以后再专门说优化。

前言

点对点系统是一种非常棒的框架。像是C/S和B/S这些传统框架中,总会有明确的主从关系。用户作为客户端,是被动的一方,需要向服务器,也就是主动的一方发起请求,然后获得主动方返回的结果。如果出于各种原因,客户端无法连接服务器,或者服务器崩了,那么整套系统也就不能用了。为了防止系统出现问题,领域内的先贤们搞出了诸如高可用、主从备份等各种各样的手段来尽量避免故障。

而点对点系统则用另一种方式解决服务故障:如果把服务器分散给每一个用户,那么只要有用户使用,就有用户提供服务,而这个系统就永远也不会宕机。我想大家既然能点进来这篇文章,那应该都是用过BitTorrent的人。如果你对这个名字不熟悉,那你肯定知道「种子」和「迅雷」。BitTorrent就是一个点对点内容共享系统,系统中的每一个节点(参与者)既作为客户端向别人请求数据,又作为服务端向别人提供自己有的数据,从而实现了一种人人为我,我为人人的境界。

但是点对点系统一般来说都比较难设计,即便设计出来了,使用体验上也不是很好。这就是为什么,现在能使用的大多数APP全部都是C/S结构的——要提升用户体验,只要在服务端投入即可。点对点系统要提升体验,就涉及到节点间互联质量,系统的公平机制,攻击防范等。节点间互联质量主要与网络有关,其中影响最大的就是防火墙。关于公平机制和攻击防范,主要是在设计上考虑如何防止系统被滥用,虽然比较复杂,但好在相对容易实现。

本系列意图实现一个点对点的聊天系统,我想把主要的精力放在协议设计上,数据传输则委托给I2P处理。I2P也是由Java编写的,并提供了一个不错的API来供其他Java程序使用。此外为了防止I2P通讯部分的代码与MC部分的代码耦合在一起,我把这两部分的代码完全分开了。使用I2P处理通讯的部分放在了hurui200320/i2p-p2p-chat这个仓库,而与MC模组有关的代码则放到了hurui200320/MinecraftChatAlternative这个仓库里。

关于I2P本身的内容我就不多说了,网络上有不少关于I2P的介绍。不过这里需要提及的一点是,I2P遵守的是比较纯粹的开源风格,也就是所谓的“As Is - No Warranty”。现在的主要开发者从创始人手中接下项目后,不接受任何来自社区的货币捐助,二十多年来完全靠他们对于隐私与自由的热爱在维护这个庞大的软件。我尝试着阅读过他的源代码,看到了2003年的Java,以及当时使用的构建工具。可以说在我的认知里,我还从来没有见过这样的项目。说回到“As Is - No Warranty”,作为开发者,他们会尽力让软件变得“能用”,但是他们并没有义务为你修复bug。所以如果你因为这个系列也喜欢上了I2P的功能,但在使用的过程中遇到了一些问题,我希望你可以自己动手做一些简单的调试工作,不要在语气上“指责”开发者(这一点经常无意识的发生,使得你在提出issue的时候听起来像是在指责你这个开发者在发布软件之前没有充分测试代码,从而给我带来了麻烦)。

提供帮助,不要指责。

协议设计

关于这一套系统的协议,经过一番修修改改,最终是这个样子的。

同步还是异步

整个系统的消息采用异步的方式处理,从而实现复用同一条TCP链接。

在早期设计中使用同步方式处理消息,这就意味着一个组件发出消息后必须占用这一条TCP直到收到回复,此时不能接收其他消息,也不能发送其他消息。这种情况下节点间的一条TCP连接就变成了半双工的方式,并且客户端和服务端需要轮流征用一条TCP,控制逻辑比较复杂。为了实现简单的全双工,可以让节点之间建立两条TCP连接,每条按单工方式使用。这样写起来非常简单:服务端只需要监听入站消息并处理,客户端使用单独的TCP连接发送出站消息。这种设计的好处是不需要太复杂的同步控制,但坏处也很明显,一对节点需要两条TCP,如果要和N个节点通讯,那就得2N条链接。如果其中一条链接中断了,我们便失去了发送或接收的能力,针对这种情况的错误处理又引入了额外的复杂性。

总的来说,虽然异步设计会带来一定的复杂度,但整体框架建立起来之后的好处就非常可观了:单条链接实现全双工,可以提供类似Redis的单线程消息处理(和性能),整体消息设计也更加灵活。

消息结构

在消息本身的设计上,我采用了相对简单但具有可拓展性的设计:

@Serializable
data class P2PMessage(
    @Serializable(with = UUIDAsStringSerializer::class)
    val id: UUID,
    @Contextual
    val payload: MessagePayload
)

每个消息都具有一个会话内唯一的UUID,和一个消息负载。由于我们是异步处理消息,因此接收到之后不会立刻收到回复,但如果要让对方知道回复的是哪个一个请求,那么这个请求就必须具有一个编号。

关于消息负载,它是一个抽象的接口:

interface MessagePayload {
    val messageType: Type

    enum class Type {
        REQUEST, REPLY, ERROR
    }
}

这里规定消息负载有三种类型:请求、回复和错误。请求如同字面意思,一方想要发起一个操作,例如向你发送一个聊天消息。回复则是表示对先前请求的成功响应,例如我方已经收到并妥当处理了你的聊天消息。而错误则是表示先前请求未能得到正确处理,例如你的聊天消息未经验证,或者你的请求不符合参数要求。

由于这三种请求都有各自的共性,于是设计抽象类来描述这三类请求的大致细节:

abstract class RequestMessagePayload : MessagePayload {
    final override val messageType: MessagePayload.Type = MessagePayload.Type.REQUEST
}

abstract class ReplyMessagePayload : MessagePayload {
    abstract val replyTo: UUID
    final override val messageType: MessagePayload.Type = MessagePayload.Type.REPLY
}

abstract class ErrorMessagePayload : MessagePayload {
    abstract val errorAt: UUID
    abstract val msg: String
    final override val messageType: MessagePayload.Type = MessagePayload.Type.ERROR
}

可以注意到的是,这里我覆盖了消息类型,并限定他们是final,从而阻止他们的子类修改消息类型。因为这些接口要开放给其他代码使用,我当然不想有调皮的开发者继承了RequestPayload,但把消息类型改成Reply,这样不太好。

请求类是最简单的,只要说明消息类型,具体的内容可以根据请求动作的需求自行增添参数。回复也相对简单,只是强制要求了replyTo字段,即你这个回复是回复先前哪个消息的。最后是错误类,除了要求发生错误的消息ID,还要包含一个描述性的字符串来告诉哪里出错了,类似于Exception中的message字段。

这三个类型的时序约束是:

  1. 一个动作必须由Request发起,不可以使用Reply或Error发起一个动作。
  2. Request请求存在两种可能的响应:Reply,成功处理并返回了结果;Error:未能正确处理请求
  3. 收到Reply后,有两种可能的操作:若回复数据正确,则什么都不做,本次动作结束;若回复数据有误,则返回Error。
  4. 收到Error后,不再进行任何回复。Error响应意味着本次动作终止。

身份验证

在具体消息设计上,我想引入一个验证系统。在旧版本的MC中,客户端需要完全信任服务器,如果服务器捏造消息,我们也无从得知。在1.19引入了密码学签名系统后,每个玩家都会有一个临时的、由Mojang签发的证书。玩家发出的消息都会由这个证书进行签名,以确保其没有被修改、捏造,这也是举报系统的依据所在。既然Mojang在游戏里用了这套系统,那我们也可以利用这套系统来保证其他节点并没有捏造聊天消息/冒充别人。

但是每条消息都签名,这听起来像是做了一套Mojang之外的审查系统。所以我们并不使用这套机制来签名聊天消息。我们使用这套机制来验证节点的链接。在发送聊天消息之前,节点需要与其他节点建立连接。连接建立后无论是出站方向还是入站方向,都应该立刻发送一个验证请求,并在验证通过之前阻挡/推迟其他消息的发送。这个验证请求的设计如下:

@Serializable
@SerialName("AuthRequest")
data class AuthRequest(
    val applicationName: String,
    val peerInfo: PeerInfo
) : RequestMessagePayload()

这里有两个字段:应用名称和节点信息。应用名称保证验证时使用统一框架的不同应用程序不会意外的认证。例如我用这个框架做了个MC的聊天系统,别人搞了个类似BT下载的东西,都用了我这个i2p-p2p-chat库,他们各自也都有自己的扩展消息,为了避免意外连接两个不同系统的节点,应用名称不同则直接认定验证失败,并断开连接。节点信息则为了适应不同应用程序,设计成了一个可签名的Map:

@Serializable
data class PeerInfo(
    /**
     * Put want ever you want here.
     * It will be encoded as bencode.
     * */
    @Serializable(with = MapAsBencodeSerializer::class)
    val info: Map<String, String>,
    /**
     * The signature of the bencode info.
     * */
    val signatureBase64: String,
    /**
     * The public key of that signature
     * */
    val publicKeyBase64: String
)

这里的info便是Map本身,它会被编码成bencode格式,用于传输和签名。下面的签名则是保证info字段没有被篡改,最后附上用于签名的公钥,以便接收者确认身份。但是这里有一个问题:如果别人收到了这个认证请求,保存下载,再转发给别人,宣称他就是你,这怎么办?为了解决这个问题,我们可以在info中加一个字段peer.dest,用来存储请求者的源地址。与TCP连接不同,I2P的连接地址是用户侧的私钥决定的,而私钥不应该被共享。只要别人没有这个私钥,就不可能伪造你的源地址。私钥是随机生成的,但是发生碰撞的几率非常小(你想想看,整个加密货币的安全性就是建立在随机生成的私钥上的,如果你随便一生成就撞到了别人钱包的私钥,那怎么了得)。通过检查这个字段我们便可以知道对方是不是info中声称的请求者。此外考虑到Mojang签发的证书本身有效期较短,因此这里对签名就不再额外设置有效期限制了。如果你的私钥意外泄漏,别人冒充了你,那你应该及时联系你的好友,告诉他们发生了什么,并告诉他们警惕冒充者。

对应地,验证请求伴有两个响应:

@Serializable
@SerialName("AuthAcceptedReply")
data class AuthAcceptedReply(
    @Serializable(with = UUIDAsStringSerializer::class)
    override val replyTo: UUID
) : ReplyMessagePayload()

@Serializable
@SerialName("AuthenticationFailedError")
data class AuthenticationFailedError(
    val cause: String,
    @Serializable(with = UUIDAsStringSerializer::class)
    override val errorAt: UUID
) : ErrorMessagePayload() {
    override val msg: String = "Failed to authenticate your credential"
}

很明显,一种是明确告诉对方验证成功,这样对方就可以开始向你发送其他请求了;一种是告诉对方验证失败,并告诉对方因何失败(cause字段)。取决于具体实现,你可以给对方一次机会,也可以直接断开连接。在这里,针对验证失败的会话,我选择直接断开连接,以避免DDOS攻击。

目前还没有针对验证进行计时,因此一种潜在的攻击方法就是建立连接后不发出任何消息,通过建立足够多的连接,从而耗尽用户的资源(一条连接就需要一条线程接收入站消息)。最根本的解决方法就是在建立连接后开始计时,但这种攻击也可以通过使用纤程进行缓解(根据官方的例子,10万个纤程可以轻松地被处理,而10万个纤程很可能耗尽JVM的内存)。

节点交换

验证通过后我们便可以相信这条连接就是来自它所声称的用户。但问题是,我们怎么知道别人的地址并发起连接?最早我想使用Tracker。所谓Tracker,是BT下载技术中的一个角色,它负责追踪和记录不同种子的客户端情况。正在下载的节点可以向Tracker宣告自己的存在,而新来的节点也可以向Tracker询问现有的节点信息,从而与他们建立连接。但是吧,I2P上的Tracker我怎么也调不通,于是我就放弃了。在经过一番研究后,我发现BT下载中还有一种叫做PEX的技术,它是对BT协议的扩展,全称叫Peer Exchange,允许节点之间直接交换他们已知的节点信息,从而绕过Tracker。换言之,实现一种连接一个节点便能得知其他节点的效果。在这里,我们加入一个Minecraft服务器之后,只要想办法连上一个节点,就可以通过这个节点得知他所连接的其他节点,最终我们可以连上这个服务器内的所有玩家。

当然,这只是一种理想情况。现实中的情况可能更加复杂,例如玩家之前形成了相互隔离的网,一部分人互相连通,而另一部分人互相连通,但这两拨人互不知道彼此的存在。

此外,在初期面临一个bootstrap的问题,即大家都不知道彼此的存在,可能会形成好多相互隔绝的、零散的网。这个需要应用程序自己想办法将他们联通起来,例如提供用户之间相互连接的指令。在后面MC的部分,我们提供了用户复制自己地址,并连接他人地址的功能。

节点交换的请求如下:

@Serializable
@SerialName("PEXRequest")
data class PEXRequest(
    val peerInfos: List<PeerInfo>
) : RequestMessagePayload()

内容很简单,直接把已知的PeerInfo转发出去就好了,这个PeerInfo的签名可以保证它没有被人篡改,又由于这个里面包含了节点的地址(节点的源地址也是对方提供服务的地址),我们可以验证后直接发起连接。

当然了,为了减少请求次数,我们可以在回复里放上我们所知的节点信息:

@Serializable
@SerialName("PEXReply")
data class PEXReply(
    @Serializable(with = UUIDAsStringSerializer::class)
    override val replyTo: UUID,
    val peerInfos: List<PeerInfo>
) : ReplyMessagePayload()

这样,完成PEX动作之后,请求者和接收者都成功交换了节点数据。

但在这一版的实现中,大家都只对发起请求计时了,而没有理会接收回复。因此经常会出现节点A发起了PEX,然后没一会儿节点B又向节点A发起PEX,尽管A刚刚才和B交换完。

发送文字消息

其实严格来说这个请求并不应当包含在框架内,因为框架才不理会你要发什么。但是考虑到文字消息好像还是挺常用,因此也就集成在框架里面了。定义如下:

@Serializable
@SerialName("TextMessageRequest")
data class TextMessageRequest(
    val scope: String,
    val content: String
) : RequestMessagePayload()

这个消息包含两个字段,其中scope指的是发送消息的范围,在MC中,我们有公共聊天和私聊,scope用于鉴别这两种聊天类型。然后就是content,它包含了聊天内容本身。回复如下:

@Serializable
@SerialName("NoContentReply")
data class NoContentReply(
    @Serializable(with = UUIDAsStringSerializer::class)
    override val replyTo: UUID
) : ReplyMessagePayload()

这个回复在语义上更像是HTTP 204,也就是你的请求处理成功了,但我没有什么要回复给你的数据。在其他单向动作中也可以使用这个回复。

主动断开连接

由于各种情况,我们需要主动断开连接。例如玩家退出了当前服务器,那我们就应该断开所有节点。如果能够在玩家关闭游戏之前主动告诉其他节点,那么相比让其他节点等待,然后发现连接中断,不如尽早告诉他们,这样对方可以提前关闭资源,回收内存。

@Serializable
@SerialName("ByeRequest")
data class ByeRequest(
    val reason: String = ""
) : RequestMessagePayload()

这个请求比较特殊,它不具备任何回复。一般来说一个请求必须跟着一个响应,要么成功要么失败。但是这个动作导致连接被双方关闭(发送方在发送完成后立即关闭连接,接收方收到后也立即关闭连接),无法继续通讯,因此这是一个有去无回的动作。

消息发送格式

以上是消息的内容,但是我们怎么编码、发送呢?最简单的方法还是一行一行发文本。虽然使用二进制编码比较高效,例如Protocol Buffer,但是它也是最难用的,你需要提前给每一种消息设计格式。而Json,相比之下则要灵活很多。通过配置序列化器(Serializer),可以将json输出为一行,这样我们读取的时候就可以直接readLine,而不必考虑消息长度不同。

在消息的序列化和反序列化上,我选择使用kotlinx-serialization,它与Gson、Jackson等库不一样,它的所有代码都是编译时期处理的,因此不需要反射这些乱七八糟的东西。用起来有一些限制,但我们也能获得相应的好处(例如摆脱了烦人的反射,获得了更好的性能等)。对于原生类型,Kotlinx提供了对应的serializer,但是从上面的代码中可以看到,诸如UUID这些类型需要我们手动处理一下:

object UUIDAsStringSerializer : KSerializer<UUID> {
    override val descriptor = PrimitiveSerialDescriptor("UUID", PrimitiveKind.STRING)

    override fun deserialize(decoder: Decoder): UUID =
        UUID.fromString(decoder.decodeString())

    override fun serialize(encoder: Encoder, value: UUID) =
        encoder.encodeString(value.toString())
}

object MapAsBencodeSerializer : KSerializer<Map<String, String>> {
    override val descriptor = PrimitiveSerialDescriptor("Map<String, *>", PrimitiveKind.STRING)

    override fun deserialize(decoder: Decoder): Map<String, String> =
        Bencode().decode(decoder.decodeString().encodeToByteArray(), Type.DICTIONARY)
            .mapValues {
                if (it.value !is String) {
                    throw IllegalArgumentException("Expecting String as value, but got ${it.value::class.jvmName}")
                }
                it.value as String
            }

    override fun serialize(encoder: Encoder, value: Map<String, String>) =
        encoder.encodeString(Bencode().encode(value).decodeToString())
}

这里我们将UUID和字符串类型的Map都编码成字符串。UUID本身就可以用字符串表达和解析,问题不大。Map的话可以使用bencode编码,这个编码是源自于BT协议的,非常好用。

而对于反序列化时的多态,即P2PMessage的Payload字段是一个接口,而我们有许多已知和未知的实现,怎么在编译期保证这些子类都能被正确的序列化、反序列化呢?

fun getSerializersModule(
    customMessagePayload: PolymorphicModuleBuilder<MessagePayload>.() -> Unit = {},
    customSerializersModule: SerializersModuleBuilder.() -> Unit = {},
): SerializersModule = SerializersModule {
    polymorphic(MessagePayload::class) {
        subclass(AuthRequest::class)
        subclass(TextMessageRequest::class)
        subclass(PEXRequest::class)
        subclass(ByeRequest::class)
        subclass(NoContentReply::class)
        subclass(AuthAcceptedReply::class)
        subclass(PEXReply::class)
        subclass(InvalidIncomingDataError::class)
        subclass(AuthenticationFailedError::class)
        subclass(UnauthorizedError::class)
        subclass(InvalidScopeError::class)
        subclass(UnsupportedMessageError::class)
        customMessagePayload()
    }
    customSerializersModule()
}

我们可以在构造Json实例的时候使用自定义的SerializersModule,在其中注册我们的多态。这里默认注册了上述消息,如果用户拓展了消息,可以通过lambda的方式一并注册进来。使用的时候如下所示:

val json = Json {
            serializersModule = getSerializersModule(
                customMessagePayload = {
                    // register here
                    subclass(SomeTypeOfRequest::class)
                }
            )
        }

这时构造的json就可以解析我们自定义的SomeTypeOfRequest请求了。

总结

本篇介绍了这个系统中最基础的消息设计与大体交互流程。下一篇中将介绍如何利用I2P实现节点。

-全文完-


知识共享许可协议
【代码札记】从零开始的点对点聊天系统 P1天空 Blond 采用 知识共享 署名 - 非商业性使用 - 相同方式共享 4.0 国际 许可协议进行许可。
本许可协议授权之外的使用权限可以从 https://www.skyblond.info/about.html 处获得。

Archives QR Code
QR Code for this page
Tipping QR Code