MENU

【代码札记】基于 Merkle DAG 的文件存储服务 P3 概念验证

September 30, 2021 • 瞎折腾

上文中讨论了关于本系列的数据结构设计,本文将实现一个基于磁盘存储的原型用于概念验证。

结构设计

想起来前几日网易笔试问设计模式,我想来想去,设计模式我是一个也叫不上名字,但我又一想,设计模式一开始也不是凭空造出来的理论,还是先贤们在实践中不断尝试,发现了一些可以使用的最佳实践,总结而来的。所以后文中我也叫不出来哪些设计对应了哪些设计模式,有知道的读者可以在评论区说一下,我要是哪里设计的不好,也欢迎在评论其或者 GitHub 仓库提 Issue,本系列的所有代码将存放于此。

目前想的是将余下的元素分成三块:

  • MultihashProvider:提供统一的接口用于计算哈希并以 Multihash 格式返回
  • StorageClient:面向存储系统实现的通用存储接口,该接口只关心如何存储和读取 Protobuf 对象
  • StorageLayer:面向用户的存储接口,该接口响应用户请求,并使用 StorageClient 来存储或读取 Protobuf 对象以完成用户请求

MultihashProvider

由于 JVM 提供的哈希函数有限,如果将来要使用其他哈希函数的实现,不同的提供商给出的接口都各不相同,所以提出这个接口来保证无论使用何种哈希函数,都可以转换为 Multihash 格式并被程序使用:

  • package info.skyblond.ariteg.multihash
  • import io.ipfs.multihash.Multihash
  • import java.io.InputStream
  • interface MultihashProvider {
  • fun getType(): Multihash.Type
  • fun digest(byteArray: ByteArray): Multihash
  • fun digest(inputStream: InputStream, bufferSize: Int = 4096): Multihash
  • }

这个接口十分简单,一个 MultihashProvider 要告知外界自己是计算哪一种哈希的,这个我直接使用了 Multihash.Type。随后便是两种计算哈希的接口,一种是直接给一个字节数组进行计算,另一个是给一个输入流进行计算,默认的读缓冲区是 4KB,结果都以 Multihash 返回。为了方便使用,我还实现了两个具体的 Provider:MultihashJavaProviderMultihashBouncyCastleProvider。前者用于调用 JVM 提供的哈希函数,主要是面向 java.security.MessageDigest,后者面向 org.bouncycastle.crypto.Digest 的子类。两者的实现都很简单,同时为了避免多线程引发问题,每次计算哈希时使用的 digest 都是随用随生成,因此这两个类在实例化时不需要一个创建好的 Digest 对象,而是一个 () -> Digest 类型的生产者,计算哈希时首先调用这个生产者产生一个 Digest 对象,该对象可以复用,但需要由传入的生产者保证复用的线程安全,否则推荐每次创建一个新的。代码可以详见 GitHub。

目前默认提供 SHA3-512SHA3-256blake2b-512blake2b-256,本来想用 blake3,但遗憾的是 Multihash 对于 blake3 的支持还处于 draft 阶段,所以只好暂时使用 blake2b 了。

StorageClient

有了统一的哈希计算接口,我们就不需要关心我们的底层到底需要面向何种哈希计算而苦恼如何将其转换成 Multihash 形式了。但是我们确实需要苦恼一下底层到底要以何种形式存储 Protobuf 对象,一般来说不建议使用磁盘存储,因为 Protobuf 对象都比较小,而过多的零碎文件显然不适合传统的磁盘,SSD 勉强,更关键的是文件系统。反过来说,AWS S3 等云对象存储倒更适合存放大量的零碎文件。但文件太多并且访问频次比较低的时候,可能放到磁带或者其他冷存储介质中会获得更好的成本,但之后读取的话又要实现对文件进行一系列处理才能读取。这些面向存储介质的问题正式 StorageClient 接口要考虑的问题。

接口

这个接口的目的在于兼容所有可能的存储设备,并且尽可能少的使用特定设备才能提供的特性。但遗憾的是本人才疏学浅,见识也不多,所以很可能没法在设计时涵盖所有可能的存储介质,同时这个接口可能也会因为日后兼容其他介质而作出修改,因此以 GitHub 仓库的代码为准,目前的设计如下:

  • package info.skyblond.ariteg.storage.client
  • import com.google.protobuf.ByteString
  • import info.skyblond.ariteg.AritegLink
  • import info.skyblond.ariteg.AritegObject
  • import info.skyblond.ariteg.storage.ObjectNotFoundException
  • import info.skyblond.ariteg.storage.ObjectNotReadyException
  • import io.ipfs.multihash.Multihash
  • interface StorageClient : AutoCloseable {
  • fun parse(multihashString: String): AritegLink? {
  • val bytestring = ByteString.copyFrom(Multihash.fromBase58(multihashString).toBytes())
  • return AritegLink.newBuilder()
  • .setMultihash(bytestring)
  • .build()
  • }
  • fun storeProto(name: String, proto: AritegObject): AritegLink
  • fun storeProto(proto: AritegObject): AritegLink {
  • return storeProto("", proto)
  • }
  • fun deleteProto(proto: AritegObject): Boolean
  • fun deleteProto(link: AritegLink): Boolean
  • fun linkExists(link: AritegLink): Boolean
  • fun prepareLink(link: AritegLink, option: PrepareOption)
  • @Throws(ObjectNotFoundException::class, ObjectNotReadyException::class)
  • fun linkAvailable(link: AritegLink): Boolean
  • @Throws(ObjectNotFoundException::class, ObjectNotReadyException::class)
  • fun loadProto(link: AritegLink): AritegObject
  • }
  • interface PrepareOption

这个接口的设计相对有些复杂,首先是 parse,它将一个 Base58 编码的字符串转换成 AritegLink,默认实现就是简单的把字符串解析成 Multihash 然后构造一个 Link 对象,并没有考虑这个 base58 字符串使用的哈希函数是否与具体实现所使用的哈希函数相符,也没有考虑这个哈希对应的对象是否存在。总之,如果子类想要在解析字符串时做出保证,比如保证返回的 AritegLink 一定有效(形式正确),或者解析时该对象一定在存储系统上存在等事宜,可以覆盖这个方法,并加入额外的检查。一般情况下默认实现也不是不能用。

随后是两个存储对象的方法,接受一个 Protobuf 对象并返回一个指向该对象的 Link,Link 的 name 域可以为指定的字符串,默认为空。子类唯一需要保证的就是重复写同一个 proto 不应该有坏的副作用,即重复写同一个 proto 可以浪费时间,因为同一个文件被写了多次,但并不应该因此而抛出异常,或导致数据损毁等坏的影响。这个接口也推荐作为将 proto 转换为 Link 使用。

随后是删除对象。实际上在设计时并没有考虑删除对象,因为 Merkle DAG 就决定了删除对象是没有意义的,IPFS 的设计中也没有删除对象。但是我们这个是一个中心化的存储系统,假如文件只多不减,大概存储成本会越来越高,所以设计了一个删除对象的方法,因为考虑到子类可能会根据对象的类型不同而分别存储,或者有索引优化什么的,这个方法只保证如果返回成功,那么该对象一定被妥当的删除了:即物理上的文件被删除了,同时子类中记录的元数据也被妥当删除,不会出现数据不一致的问题。但是这个接口不保证多线程环境下子类会如何反应 —— 一个客户端在读 / 写,而另一个客户端请求删除,这个行为的结果是未定义的。因此不推荐在使用时删除,最理想的情况是停止其他客户端的访问,只允许执行删除操作的客户端,同时尽可能保证单线程操作。相当于停机维护的那种,目前对这个接口还没有实际使用,之后可能会根据元数据中记录的文件找出所有被使用的 proto 对象,然后删掉没有被使用的 proto 对象。不过比起删除对象,我更倾向于按文件将所有系统中已知的文件读出来,写入到新的存储系统中,这样旧系统中的无用文件将不会被拷贝到新系统中,同时这个操作还可以并发操作(因为是只读)。

随后是四个面向 Link 的操作。分别是:检查 Link 是否存在,准备 Link,检查 Link 是否可读,以及将 Link 读成 proto 对象。检查 Link 是否存在不必多说。准备 Link 在本文中不会被使用到,这个接口是考虑到其他存储系统,比如 AWS S3 提供归档存储,好处是节省成本,坏处时读取之前必须申请恢复文件,恢复所需的时间从几分钟倒数十个小时不等。如果有高端人士使用磁带库,同样也需要先索引磁带,读到缓存中将数据准备好,然后再度(当然也可以读的时候再捯磁带,但是这样效率会很低)。检查 Link 是否可读则是检测上述操作是否完成,一旦 Link 可读,那么应该尽快读取。诸如 AWS S3 等存储系统对恢复的文件有生命周期设置,一旦逾期,恢复出来的文件将被删除。这些接口并不保证当已知可读时文件将在多长时间内保持可读,所以应当尽快读取。

最后还有一个用于定义类型的接口 PrepareOption,这个是配合准备 Link 而设计的。不同的存储系统可能需要不同的参数,因此子类可以实现这个类型,将自己需要的参数放进这个类型里面。

磁盘存储实现

基于磁盘存储是出于概念验证的目的而实现的,但再实现的过程中不断修改,意图在日后实现基于 AWS 的存储时能够有所帮助。

存储设计如下:

  • baseDir
  • │ client.db
  • ├─blob
  • │ 8tU4a5988LBnJG5mUPBGv4rAjTjcm3HRwzeCotv...
  • │ 8tU4ajGuHEdLo9yZG7tmJiFCabzDGxE1my5omMx7yDK...
  • │ 8tU4apuSwmZuz541DkadBvCTwb8UG1DonMMedJ5JR8...
  • ├─list
  • │ 8tU4iRLfDDLJwjUvJdeeiJZgKmcuwvUdycz5cfimjj5Rko9q...
  • │ 8tU5DXRZZPocLaU2XBDJC68QPPdVFrqCxhrXBRSciJV...
  • │ tU5QHkFuddDbexRSMZNtE8fqCGrg1i1so6UHou3Cceft2C...
  • ├─tree
  • │ 8tU4a5988LBnJG5mUPBGv4rAjTjcm3HRwzeCotvUBzT8PF...
  • │ 8tZBJFq3APeUTvoKYNrytPtFtPBgrhQknGZGiQuhZ4pHQktjK...
  • │ 8tZBJGT4arJzn2Xn5aUTzwepuUibJzemitoVNDNe2527u5eTs...
  • └─commit
  • 8tU4iRLfDDLJwjUvJdeeiJZgKmcuwvUdycz5cfimjj5Rko9q123...
  • 8tZ25FfFuusRVZyQsos8WatprQocU35J2QgVY9dxTg2Er8bHa...
  • 8tZ2TN86GEpeRCmpDCbBhVgxMMS8zQWzfceV2S52XBcN26...

根目录为 baseDir,其下有四个文件夹,分别用于存储四种类型的 proto 对象。其中的 client.db 是一个简单的基于文件的键值对数据库,用于存储每一个 proto 对象的元数据。采用类似的结构,可以在 AWS S3 上使用储存桶存放数据,使用 DynamoDB 存储对象的元数据。对象分结构设计的目的在于 AWS S3 上的归档存储。其中 blob 对象可以被归档存储,因为其存储的数据最多。而其他数据则最好不使用归档存储,例如当我想要遍历一个文件夹下的内容,而不读取其中数据时,不归档的话可以直接拿到 tree 对象,但如果归档了,我还要申请恢复 tree 对象所在的文件,假如其中还有其他涉及到的对象,由于我在恢复归档前无法读取内容,因此也就无法一次性确认我到底需要恢复哪些文件。通过采用文件夹的形式(对应 S3 的 key 前缀),可以方便的决定哪些对象可以归档,哪些对象不必归档。需要读取数据时,可以一次性找出所有需要恢复的 blob,批量提交,这样既能享受到归档存储低廉的价格,还能较为方便的管理数据。

数据库中存储的元数据主要有三个:

  • 每一个已提交的 Multihash 对应的 proto 类型,用于按类别找到文件夹中的数据
  • 每一个已提交的 Multihash 对应的次要 Multihash,用于检测哈希碰撞
  • 每一个待写入的 Multihash 对应的次要 Multihash,用于保存写入队列

这在 AWS DynamoDB 中可以实现为一张表,主键为主要 Multihash,随后是对应的类型、次要 Multihash 和提交状态(null 表示已写入,时间戳表示待写入及入队时间)。

关于哈希碰撞检测,在上文中提到过哈希碰撞的几率微乎其微,但是还是没办法保证理论上完全没有碰撞。IPFS 的做法是发现碰撞之后升级到新的哈希函数,但对于这个系统来说,应当能够保证写入的数据一定是完好的,而不会因为哈希碰撞而导致某个重要的文件丢失。因此在写入新文件时会同时计算主要哈希和次要哈希,这两个哈希由不同的方法计算而来,因此同时碰撞的概率微乎其微(可能是实践意义上的不可能事件)。如果主哈希已经存在,则通过数据库检查次要哈希,如果二者一致则认为写入的是相同的文件,如果不一致则说明主哈希发生了碰撞,这个时候就抛出异常,阻止新文件写入。

对于写入方式,可选有同步和异步两种方式,因此除写入细节部分,设计如下抽象类来封装公用的代码:

  • package info.skyblond.ariteg.storage.client.disk
  • import com.google.protobuf.ByteString
  • import info.skyblond.ariteg.AritegLink
  • import info.skyblond.ariteg.AritegObject
  • import info.skyblond.ariteg.multihash.MultihashProvider
  • import info.skyblond.ariteg.multihash.MultihashProviders
  • import info.skyblond.ariteg.storage.HashNotMatchException
  • import info.skyblond.ariteg.storage.ObjectNotFoundException
  • import info.skyblond.ariteg.storage.ObjectNotReadyException
  • import info.skyblond.ariteg.storage.client.PrepareOption
  • import info.skyblond.ariteg.storage.client.StorageClient
  • import info.skyblond.ariteg.storage.toMultihash
  • import io.ipfs.multihash.Multihash
  • import org.mapdb.DBMaker
  • import org.mapdb.Serializer
  • import java.io.File
  • import java.util.*
  • import java.util.concurrent.ConcurrentMap
  • abstract class AbstractNativeStorageClient(
  • private val baseDir: File,
  • private val primaryMultihashProvider: MultihashProvider = MultihashProviders.sha3Provider512(),
  • private val secondaryMultihashProvider: MultihashProvider = MultihashProviders.blake2b512Provider()
  • ) : StorageClient {
  • /**
  • * Simple KV database stored in file.
  • * */
  • private val dbFile = File(baseDir, "client.db")
  • // The transaction used in mapdb is not ACID transactions.
  • // It just flushes data into disk so the data won't get corrupted.
  • private val db = DBMaker.fileDB(dbFile)
  • .fileMmapEnableIfSupported()
  • .transactionEnable()
  • .make()
  • private val objectTypeMap: ConcurrentMap<ByteString, String> = db
  • .hashMap("object_type_map", SerializerByteString(), Serializer.STRING)
  • .createOrOpen()
  • private val objectMultihashMap: ConcurrentMap<ByteString, ByteString> = db
  • .hashMap("object_multihash_map", SerializerByteString(), SerializerByteString())
  • .createOrOpen()
  • private val objectWritingMap: ConcurrentMap<ByteString, ByteString> = db
  • .hashMap("object_status_map", SerializerByteString(), SerializerByteString())
  • .createOrOpen()
  • /**
  • * Get the file object based on ObjectType.
  • *
  • * Subclasses can have their own storage layout, like hash trie-ish folders.
  • * */
  • @Suppress("MemberVisibilityCanBePrivate")
  • protected fun getObjectFile(type: String, multihashBase58: String): File {
  • val parentDir = File(baseDir, type).also { it.mkdirs() }
  • return File(parentDir, multihashBase58)
  • }
  • /**
  • * Subclass need to implement this to actually write [rawBytes] into [file].
  • * Before writing the data, run [preWrite], then write, then run [postWrite].
  • *
  • * Subclass can write data in the call, or put it into a queue and do it later.
  • * */
  • protected abstract fun handleWrite(
  • file: File, rawBytes: ByteArray,
  • preWrite: () -> Unit, postWrite: () -> Unit
  • )
  • override fun storeProto(name: String, proto: AritegObject): AritegLink {
  • val rawBytes = proto.toByteArray()
  • val primaryMultihash = primaryMultihashProvider.digest(rawBytes)
  • val primaryMultihashByteString = ByteString.copyFrom(primaryMultihash.toBytes())
  • val secondaryMultihash = secondaryMultihashProvider.digest(rawBytes)
  • val secondaryMultihashByteString = ByteString.copyFrom(secondaryMultihash.toBytes())
  • // not presenting in type db
  • if (!objectTypeMap.containsKey(primaryMultihashByteString)) {
  • // set version if no one is writing, get old version
  • val oldSecondaryMultihash = objectWritingMap.putIfAbsent(
  • primaryMultihashByteString, secondaryMultihashByteString
  • )?.toMultihash()
  • if (oldSecondaryMultihash != null) {
  • // someone is writing, check the secondary hash
  • require(oldSecondaryMultihash == secondaryMultihash) {
  • val encodedData = Base64.getEncoder().encode(rawBytes)
  • "Hash check failed! Data '${encodedData}' has the same primary hash with ${primaryMultihash.toBase58()}, " +
  • "but secondary hash is ${secondaryMultihash.toBase58()}, " +
  • "expected ${oldSecondaryMultihash.toBase58()}"
  • }
  • // The check will failed the process if secondary hash not match
  • } else {
  • // no one is writing, this client can write
  • val type = proto.type.name.lowercase()
  • val file = getObjectFile(type, primaryMultihash.toBase58())
  • handleWrite(file, rawBytes, {}, {
  • synchronized(db) {
  • // add to type db
  • objectTypeMap[primaryMultihashByteString] = type
  • // add to secondary hash map
  • objectMultihashMap[primaryMultihashByteString] = secondaryMultihashByteString
  • // remove from writing queue
  • check(
  • objectWritingMap.remove(
  • primaryMultihashByteString,
  • secondaryMultihashByteString
  • )
  • ) { "Cannot remove entry from writing map. Data corrupt!" }
  • // flush changes to file
  • db.commit()
  • }
  • })
  • }
  • }
  • return AritegLink.newBuilder()
  • .setName(name)
  • .setMultihash(primaryMultihashByteString)
  • .build()
  • }
  • override fun deleteProto(proto: AritegObject): Boolean {
  • val rawBytes = proto.toByteArray()
  • val multihash = primaryMultihashProvider.digest(rawBytes)
  • return deleteProto(multihash)
  • }
  • override fun deleteProto(link: AritegLink): Boolean {
  • return deleteProto(link.multihash.toMultihash())
  • }
  • private fun deleteProto(multihash: Multihash): Boolean {
  • val multihashByteString = ByteString.copyFrom(multihash.toBytes())
  • val type = synchronized(db) {
  • // try to remove from type map
  • val type = objectTypeMap.remove(multihashByteString)
  • if (type != null) {
  • // object exists, delete secondary hash and file
  • objectMultihashMap.remove(multihashByteString)
  • db.commit()
  • }
  • type
  • }
  • if (type != null) {
  • // try to delete the file, ok to failed, it will be overwritten next time
  • getObjectFile(type, multihash.toBase58()).delete()
  • }
  • // Not found, or in writing queue
  • return false
  • }
  • override fun linkExists(link: AritegLink): Boolean {
  • // object exists on disk
  • if (objectTypeMap.contains(link.multihash))
  • return true
  • // object exists in memory
  • if (objectWritingMap.contains(link.multihash))
  • return true
  • // not found
  • return false
  • }
  • override fun prepareLink(link: AritegLink, option: PrepareOption) {
  • if (!linkExists(link))
  • throw ObjectNotFoundException(link)
  • }
  • override fun linkAvailable(link: AritegLink): Boolean {
  • // read objects on disk
  • if (objectTypeMap.contains(link.multihash))
  • return true
  • // read objects in writing queue
  • if (objectWritingMap.contains(link.multihash))
  • return false
  • // not found
  • throw ObjectNotFoundException(link)
  • }
  • @Throws
  • override fun loadProto(link: AritegLink): AritegObject {
  • val multihash = link.multihash.toMultihash()
  • val type = objectTypeMap[link.multihash]
  • if (type != null) {
  • val file = getObjectFile(type, link.multihash.toMultihash().toBase58())
  • val rawBytes = file.readBytes()
  • // check loaded hash
  • val loadedHash = when (multihash.type) {
  • primaryMultihashProvider.getType() -> primaryMultihashProvider.digest(rawBytes)
  • else -> throw UnsupportedOperationException(
  • "Unsupported multihash type: ${multihash.type}. " +
  • "Only ${primaryMultihashProvider.getType()} is supported"
  • )
  • }
  • if (multihash != loadedHash) throw HashNotMatchException(multihash, loadedHash)
  • return AritegObject.parseFrom(rawBytes)
  • }
  • if (objectWritingMap.contains(link.multihash))
  • throw ObjectNotReadyException(multihash)
  • throw ObjectNotFoundException(multihash)
  • }
  • override fun close() {
  • db.close()
  • }
  • }

这里实现了所有来自接口的方法,并提供了新的抽象方法 handleWrite 供子类实现写入的细节。通过注释可以看出,该方法传入两个函数,一个 File 和一个字节数组。两个函数分别在写入前和写入后执行,目前写入前为空,纯粹是为了可拓展性保留的。写入后要执行的代码用于提交数据库,将写入的数据提交到数据库中,数据库的使用由同步锁保护。此外还有一个可以由子类覆盖的方法是 getObjectFile。默认实现如同上文介绍的文件夹结构。如果未来需要大规模在磁盘上使用,一个目录下过多零碎文件会给文件系统带来压力(尤其是智障 Windows),所以可以进一步采取诸如前缀树等结构优化,子类通过覆盖这个函数可以改变类型和 Multihash 到磁盘文件的映射方式。

这里使用的数据库由 MapDB 提供,它实现了原生的并发 HashMap,因此可以使用许多原子操作而无需加锁。例如在判单待写入 proto 是否正在被其他人写入时,通过 objectWritingMap.putIfAbsent 这一原子操作即可同时实现两件事:获取原来的值,如果原来的值不存在,那么就把我的值放进去。有原子性保证每一个 Key 最多只有一个人可以写入这个值,当其他人再执行相同的操作时,将会返回我写入的值。这样也就避免了重复写的问题。

至于其他实现,应该没有什么过多需要讲解的地方,我在关键的逻辑处都写有注释,这里就不多赘述了。

StorageLayer

解决了存储系统带来的问题,下面就该说上层建筑了。常见的数据组织结构是文件系统,一般就是指采用文件和文件夹的组织形式,这是我们最熟悉的方式。除此之外还有对象存储,这时候就不再区分文件夹了,所有的文件都是平起平坐的(但是通过 \ 分割的方法,我们还是可以让它和文件夹划等号)。此外还有数据库,改为表的形式。StorageLayer 接口要解决的问题就是根据不同数据组织形式的特点,给出将特定操作转换为对 StorageClient 的调用。

目前这个接口具体应该有什么功能我还完全没有想好,因此整体设计都偏向于文件系统。

  • package info.skyblond.ariteg.storage.layer
  • import info.skyblond.ariteg.AritegObject
  • import info.skyblond.ariteg.storage.client.StorageClient
  • import java.io.File
  • import java.io.InputStream
  • interface StorageLayer {
  • companion object {
  • const val DEFAULT_BLOB_SIZE = 256 * 1024
  • const val DEFAULT_LIST_LENGTH = 176
  • }
  • fun writeInputStreamToProto(
  • inputStream: InputStream,
  • blobSize: Int = DEFAULT_BLOB_SIZE,
  • listLength: Int = DEFAULT_LIST_LENGTH
  • ): AritegObject
  • fun readInputStreamFromProto(
  • proto: AritegObject
  • ): InputStream
  • fun storeDir(dir: File): AritegObject
  • fun walkTree(treeProto: AritegObject, foo: (StorageClient, StorageLayer, String, AritegObject) -> Unit)
  • }

这里只定义了 4 个基本操作。前两个是面向整块数据的,可以将字节流写入到系统中,返回 list 或者 blob,也可以将 list 或 blob 的内容作为输入流返回,供其他程序使用。随后两个是面向文件夹的操作,可以将文件夹作为一个 tree 写入系统,也可以根据一个 tree 来遍历其中的内容。遍历需要要给 tree 和一个函数,这个函数将在每一个元素上调用,调用时传入当前的 StorageClient 和 StorageLayer,同时还有对象的路径,例如 /some_folder/some_file,以及对象本身。这里遍历的 API 可以采用 Visitor 模式进行设计,但是比较复杂,就采用了这样一种简陋的方法。

关于 commit 类型的应用,主要还是版本控制,对于概念验证来说用不到,目前我也没有想好它的应用,以及调用方式。所以姑且就算是未实现的 Feature 好了。

写完这段文字之后反思,也许每一种数据的组织方式都各不相同。文件可以按照上面说的那样实现。数据库可以将每一个记录作为一个 tree 实现,但和文件系统完全不一样。也许我应该考虑删掉 StorageLayer 这个接口,或者只让它成为一个类型接口(即没有成员方法的接口),然后放开限制让子类面向自己专有的组织形式去实现,这样得到的类或许会更好用一些。

效果测试

手头正好有一个 16GB 的 ISO 文件,于是拿来测试性能。测试程序如下:

  • package info.skyblond.ariteg
  • import info.skyblond.ariteg.storage.client.disk.AsyncNativeStorageClient
  • import info.skyblond.ariteg.storage.layer.FileNativeStorageLayer
  • import org.slf4j.LoggerFactory
  • import java.io.File
  • import java.security.MessageDigest
  • object Main {
  • private val logger = LoggerFactory.getLogger("Application")
  • @JvmStatic
  • fun main(args: Array<String>) {
  • AsyncNativeStorageClient(File("./data").also { it.mkdirs() }).use { storageClient ->
  • val storageLayer = FileNativeStorageLayer(storageClient)
  • logger.info("Storage file")
  • val obj = timing {
  • File("C:\\Adobe_2021_MasterCol_win_v11.9#1_20210901.iso").inputStream().use {
  • storageLayer.writeInputStreamToProto(it)
  • }
  • }
  • logger.info("Wait all writing finished")
  • timing {
  • val link = storageClient.storeProto(obj)
  • while (!storageClient.linkAvailable(link))
  • Thread.yield()
  • }
  • logger.info("Read out, calculate sha256")
  • val digest = MessageDigest.getInstance("SHA-256")
  • val buffer = ByteArray(4096 * 1024) // 4MB
  • val sha256 = timing {
  • storageLayer.readInputStreamFromProto(obj).use { inputStream ->
  • var counter: Int
  • while (inputStream.read(buffer, 0, buffer.size)
  • .also { counter = it } != -1
  • ) {
  • digest.update(buffer, 0, counter)
  • }
  • digest.digest()
  • }
  • }
  • logger.info(sha256.toHex())
  • }
  • }
  • private fun ByteArray.toHex(): String = joinToString(separator = "") { eachByte -> "%02x".format(eachByte) }
  • private fun <T> timing(foo: () -> T): T {
  • val start = System.currentTimeMillis()
  • val result = foo()
  • val end = System.currentTimeMillis()
  • logger.info("Time used: ${end - start} ms")
  • return result
  • }
  • }

这个测试程序把文件写入系统,统计写入的时间,由于使用的是异步的客户端,所以有可能还有未写入的数据,等待所有数据全部写入。随后读出数据计算 SHA256,与其他工具计算的结果进行对比。

首次写入:

  • Storage file
  • Time used: 463515 ms
  • Wait all writing finished
  • Time used: 861 ms
  • Read out, calculate sha256
  • Time used: 322484 ms
  • f84eef7eb41b96598a42d0fa157487db7f2f493157e2b592ff07f3431bb6b676

再次运行:

  • Storage file
  • Time used: 242401 ms
  • Wait all writing finished
  • Time used: 0 ms
  • Read out, calculate sha256
  • Time used: 339523 ms
  • f84eef7eb41b96598a42d0fa157487db7f2f493157e2b592ff07f3431bb6b676

看起来文件系统的性能确实不太行,但好在数据写入后再读出来没有问题。


知识共享许可协议
【代码札记】基于 Merkle DAG 的文件存储服务 P3 概念验证天空 Blond 采用 知识共享 署名 - 非商业性使用 - 相同方式共享 4.0 国际 许可协议进行许可。
本许可协议授权之外的使用权限可以从 https://skyblond.info/about.html 处获得。

Archives QR Code
QR Code for this page
Tipping QR Code