MENU

【代码札记】基于 Merkle DAG 的文件存储服务 P3 概念验证

September 30, 2021 • 瞎折腾

上文中讨论了关于本系列的数据结构设计,本文将实现一个基于磁盘存储的原型用于概念验证。

结构设计

想起来前几日网易笔试问设计模式,我想来想去,设计模式我是一个也叫不上名字,但我又一想,设计模式一开始也不是凭空造出来的理论,还是先贤们在实践中不断尝试,发现了一些可以使用的最佳实践,总结而来的。所以后文中我也叫不出来哪些设计对应了哪些设计模式,有知道的读者可以在评论区说一下,我要是哪里设计的不好,也欢迎在评论其或者GitHub仓库提Issue,本系列的所有代码将存放于此。

目前想的是将余下的元素分成三块:

  • MultihashProvider:提供统一的接口用于计算哈希并以Multihash格式返回
  • StorageClient:面向存储系统实现的通用存储接口,该接口只关心如何存储和读取Protobuf对象
  • StorageLayer:面向用户的存储接口,该接口响应用户请求,并使用StorageClient来存储或读取Protobuf对象以完成用户请求

MultihashProvider

由于JVM提供的哈希函数有限,如果将来要使用其他哈希函数的实现,不同的提供商给出的接口都各不相同,所以提出这个接口来保证无论使用何种哈希函数,都可以转换为Multihash格式并被程序使用:

package info.skyblond.ariteg.multihash

import io.ipfs.multihash.Multihash
import java.io.InputStream

interface MultihashProvider {
    fun getType(): Multihash.Type

    fun digest(byteArray: ByteArray): Multihash

    fun digest(inputStream: InputStream, bufferSize: Int = 4096): Multihash
}

这个接口十分简单,一个MultihashProvider要告知外界自己是计算哪一种哈希的,这个我直接使用了Multihash.Type。随后便是两种计算哈希的接口,一种是直接给一个字节数组进行计算,另一个是给一个输入流进行计算,默认的读缓冲区是4KB,结果都以Multihash返回。为了方便使用,我还实现了两个具体的Provider:MultihashJavaProviderMultihashBouncyCastleProvider。前者用于调用JVM提供的哈希函数,主要是面向java.security.MessageDigest,后者面向org.bouncycastle.crypto.Digest的子类。两者的实现都很简单,同时为了避免多线程引发问题,每次计算哈希时使用的digest都是随用随生成,因此这两个类在实例化时不需要一个创建好的Digest对象,而是一个() -> Digest类型的生产者,计算哈希时首先调用这个生产者产生一个Digest对象,该对象可以复用,但需要由传入的生产者保证复用的线程安全,否则推荐每次创建一个新的。代码可以详见GitHub。

目前默认提供SHA3-512SHA3-256blake2b-512blake2b-256,本来想用blake3,但遗憾的是Multihash对于blake3的支持还处于draft阶段,所以只好暂时使用blake2b了。

StorageClient

有了统一的哈希计算接口,我们就不需要关心我们的底层到底需要面向何种哈希计算而苦恼如何将其转换成Multihash形式了。但是我们确实需要苦恼一下底层到底要以何种形式存储Protobuf对象,一般来说不建议使用磁盘存储,因为Protobuf对象都比较小,而过多的零碎文件显然不适合传统的磁盘,SSD勉强,更关键的是文件系统。反过来说,AWS S3等云对象存储倒更适合存放大量的零碎文件。但文件太多并且访问频次比较低的时候,可能放到磁带或者其他冷存储介质中会获得更好的成本,但之后读取的话又要实现对文件进行一系列处理才能读取。这些面向存储介质的问题正式StorageClient接口要考虑的问题。

接口

这个接口的目的在于兼容所有可能的存储设备,并且尽可能少的使用特定设备才能提供的特性。但遗憾的是本人才疏学浅,见识也不多,所以很可能没法在设计时涵盖所有可能的存储介质,同时这个接口可能也会因为日后兼容其他介质而作出修改,因此以GitHub仓库的代码为准,目前的设计如下:

package info.skyblond.ariteg.storage.client

import com.google.protobuf.ByteString
import info.skyblond.ariteg.AritegLink
import info.skyblond.ariteg.AritegObject
import info.skyblond.ariteg.storage.ObjectNotFoundException
import info.skyblond.ariteg.storage.ObjectNotReadyException
import io.ipfs.multihash.Multihash

interface StorageClient : AutoCloseable {
    fun parse(multihashString: String): AritegLink? {
        val bytestring = ByteString.copyFrom(Multihash.fromBase58(multihashString).toBytes())
        return AritegLink.newBuilder()
            .setMultihash(bytestring)
            .build()
    }

    fun storeProto(name: String, proto: AritegObject): AritegLink

    fun storeProto(proto: AritegObject): AritegLink {
        return storeProto("", proto)
    }

    fun deleteProto(proto: AritegObject): Boolean

    fun deleteProto(link: AritegLink): Boolean

    fun linkExists(link: AritegLink): Boolean

    fun prepareLink(link: AritegLink, option: PrepareOption)

    @Throws(ObjectNotFoundException::class, ObjectNotReadyException::class)
    fun linkAvailable(link: AritegLink): Boolean

    @Throws(ObjectNotFoundException::class, ObjectNotReadyException::class)
    fun loadProto(link: AritegLink): AritegObject
}

interface PrepareOption

这个接口的设计相对有些复杂,首先是parse,它将一个Base58编码的字符串转换成AritegLink,默认实现就是简单的把字符串解析成Multihash然后构造一个Link对象,并没有考虑这个base58字符串使用的哈希函数是否与具体实现所使用的哈希函数相符,也没有考虑这个哈希对应的对象是否存在。总之,如果子类想要在解析字符串时做出保证,比如保证返回的AritegLink一定有效(形式正确),或者解析时该对象一定在存储系统上存在等事宜,可以覆盖这个方法,并加入额外的检查。一般情况下默认实现也不是不能用。

随后是两个存储对象的方法,接受一个Protobuf对象并返回一个指向该对象的Link,Link的name域可以为指定的字符串,默认为空。子类唯一需要保证的就是重复写同一个proto不应该有坏的副作用,即重复写同一个proto可以浪费时间,因为同一个文件被写了多次,但并不应该因此而抛出异常,或导致数据损毁等坏的影响。这个接口也推荐作为将proto转换为Link使用。

随后是删除对象。实际上在设计时并没有考虑删除对象,因为Merkle DAG就决定了删除对象是没有意义的,IPFS的设计中也没有删除对象。但是我们这个是一个中心化的存储系统,假如文件只多不减,大概存储成本会越来越高,所以设计了一个删除对象的方法,因为考虑到子类可能会根据对象的类型不同而分别存储,或者有索引优化什么的,这个方法只保证如果返回成功,那么该对象一定被妥当的删除了:即物理上的文件被删除了,同时子类中记录的元数据也被妥当删除,不会出现数据不一致的问题。但是这个接口不保证多线程环境下子类会如何反应——一个客户端在读/写,而另一个客户端请求删除,这个行为的结果是未定义的。因此不推荐在使用时删除,最理想的情况是停止其他客户端的访问,只允许执行删除操作的客户端,同时尽可能保证单线程操作。相当于停机维护的那种,目前对这个接口还没有实际使用,之后可能会根据元数据中记录的文件找出所有被使用的proto对象,然后删掉没有被使用的proto对象。不过比起删除对象,我更倾向于按文件将所有系统中已知的文件读出来,写入到新的存储系统中,这样旧系统中的无用文件将不会被拷贝到新系统中,同时这个操作还可以并发操作(因为是只读)。

随后是四个面向Link的操作。分别是:检查Link是否存在,准备Link,检查Link是否可读,以及将Link读成proto对象。检查Link是否存在不必多说。准备Link在本文中不会被使用到,这个接口是考虑到其他存储系统,比如AWS S3提供归档存储,好处是节省成本,坏处时读取之前必须申请恢复文件,恢复所需的时间从几分钟倒数十个小时不等。如果有高端人士使用磁带库,同样也需要先索引磁带,读到缓存中将数据准备好,然后再度(当然也可以读的时候再捯磁带,但是这样效率会很低)。检查Link是否可读则是检测上述操作是否完成,一旦Link可读,那么应该尽快读取。诸如AWS S3等存储系统对恢复的文件有生命周期设置,一旦逾期,恢复出来的文件将被删除。这些接口并不保证当已知可读时文件将在多长时间内保持可读,所以应当尽快读取。

最后还有一个用于定义类型的接口PrepareOption,这个是配合准备Link而设计的。不同的存储系统可能需要不同的参数,因此子类可以实现这个类型,将自己需要的参数放进这个类型里面。

磁盘存储实现

基于磁盘存储是出于概念验证的目的而实现的,但再实现的过程中不断修改,意图在日后实现基于AWS的存储时能够有所帮助。

存储设计如下:

baseDir
│  client.db
│  
├─blob
│      8tU4a5988LBnJG5mUPBGv4rAjTjcm3HRwzeCotv...
│      8tU4ajGuHEdLo9yZG7tmJiFCabzDGxE1my5omMx7yDK...
│      8tU4apuSwmZuz541DkadBvCTwb8UG1DonMMedJ5JR8...
│
├─list
│      8tU4iRLfDDLJwjUvJdeeiJZgKmcuwvUdycz5cfimjj5Rko9q...
│      8tU5DXRZZPocLaU2XBDJC68QPPdVFrqCxhrXBRSciJV...
│      tU5QHkFuddDbexRSMZNtE8fqCGrg1i1so6UHou3Cceft2C...
│
├─tree
│      8tU4a5988LBnJG5mUPBGv4rAjTjcm3HRwzeCotvUBzT8PF...
│      8tZBJFq3APeUTvoKYNrytPtFtPBgrhQknGZGiQuhZ4pHQktjK...
│      8tZBJGT4arJzn2Xn5aUTzwepuUibJzemitoVNDNe2527u5eTs...
│
└─commit
        8tU4iRLfDDLJwjUvJdeeiJZgKmcuwvUdycz5cfimjj5Rko9q123...
        8tZ25FfFuusRVZyQsos8WatprQocU35J2QgVY9dxTg2Er8bHa...
        8tZ2TN86GEpeRCmpDCbBhVgxMMS8zQWzfceV2S52XBcN26...
        

根目录为baseDir,其下有四个文件夹,分别用于存储四种类型的proto对象。其中的client.db是一个简单的基于文件的键值对数据库,用于存储每一个proto对象的元数据。采用类似的结构,可以在AWS S3上使用储存桶存放数据,使用DynamoDB存储对象的元数据。对象分结构设计的目的在于AWS S3上的归档存储。其中blob对象可以被归档存储,因为其存储的数据最多。而其他数据则最好不使用归档存储,例如当我想要遍历一个文件夹下的内容,而不读取其中数据时,不归档的话可以直接拿到tree对象,但如果归档了,我还要申请恢复tree对象所在的文件,假如其中还有其他涉及到的对象,由于我在恢复归档前无法读取内容,因此也就无法一次性确认我到底需要恢复哪些文件。通过采用文件夹的形式(对应S3的key前缀),可以方便的决定哪些对象可以归档,哪些对象不必归档。需要读取数据时,可以一次性找出所有需要恢复的blob,批量提交,这样既能享受到归档存储低廉的价格,还能较为方便的管理数据。

数据库中存储的元数据主要有三个:

  • 每一个已提交的Multihash对应的proto类型,用于按类别找到文件夹中的数据
  • 每一个已提交的Multihash对应的次要Multihash,用于检测哈希碰撞
  • 每一个待写入的Multihash对应的次要Multihash,用于保存写入队列

这在AWS DynamoDB中可以实现为一张表,主键为主要Multihash,随后是对应的类型、次要Multihash和提交状态(null表示已写入,时间戳表示待写入及入队时间)。

关于哈希碰撞检测,在上文中提到过哈希碰撞的几率微乎其微,但是还是没办法保证理论上完全没有碰撞。IPFS的做法是发现碰撞之后升级到新的哈希函数,但对于这个系统来说,应当能够保证写入的数据一定是完好的,而不会因为哈希碰撞而导致某个重要的文件丢失。因此在写入新文件时会同时计算主要哈希和次要哈希,这两个哈希由不同的方法计算而来,因此同时碰撞的概率微乎其微(可能是实践意义上的不可能事件)。如果主哈希已经存在,则通过数据库检查次要哈希,如果二者一致则认为写入的是相同的文件,如果不一致则说明主哈希发生了碰撞,这个时候就抛出异常,阻止新文件写入。

对于写入方式,可选有同步和异步两种方式,因此除写入细节部分,设计如下抽象类来封装公用的代码:

package info.skyblond.ariteg.storage.client.disk

import com.google.protobuf.ByteString
import info.skyblond.ariteg.AritegLink
import info.skyblond.ariteg.AritegObject
import info.skyblond.ariteg.multihash.MultihashProvider
import info.skyblond.ariteg.multihash.MultihashProviders
import info.skyblond.ariteg.storage.HashNotMatchException
import info.skyblond.ariteg.storage.ObjectNotFoundException
import info.skyblond.ariteg.storage.ObjectNotReadyException
import info.skyblond.ariteg.storage.client.PrepareOption
import info.skyblond.ariteg.storage.client.StorageClient
import info.skyblond.ariteg.storage.toMultihash
import io.ipfs.multihash.Multihash
import org.mapdb.DBMaker
import org.mapdb.Serializer
import java.io.File
import java.util.*
import java.util.concurrent.ConcurrentMap

abstract class AbstractNativeStorageClient(
    private val baseDir: File,
    private val primaryMultihashProvider: MultihashProvider = MultihashProviders.sha3Provider512(),
    private val secondaryMultihashProvider: MultihashProvider = MultihashProviders.blake2b512Provider()
) : StorageClient {
    /**
     * Simple KV database stored in file.
     * */
    private val dbFile = File(baseDir, "client.db")

    // The transaction used in mapdb is not ACID transactions.
    // It just flushes data into disk so the data won't get corrupted.
    private val db = DBMaker.fileDB(dbFile)
        .fileMmapEnableIfSupported()
        .transactionEnable()
        .make()

    private val objectTypeMap: ConcurrentMap<ByteString, String> = db
        .hashMap("object_type_map", SerializerByteString(), Serializer.STRING)
        .createOrOpen()

    private val objectMultihashMap: ConcurrentMap<ByteString, ByteString> = db
        .hashMap("object_multihash_map", SerializerByteString(), SerializerByteString())
        .createOrOpen()

    private val objectWritingMap: ConcurrentMap<ByteString, ByteString> = db
        .hashMap("object_status_map", SerializerByteString(), SerializerByteString())
        .createOrOpen()

    /**
     * Get the file object based on ObjectType.
     *
     * Subclasses can have their own storage layout, like hash trie-ish folders.
     * */
    @Suppress("MemberVisibilityCanBePrivate")
    protected fun getObjectFile(type: String, multihashBase58: String): File {
        val parentDir = File(baseDir, type).also { it.mkdirs() }
        return File(parentDir, multihashBase58)
    }

    /**
     * Subclass need to implement this to actually write [rawBytes] into [file].
     * Before writing the data, run [preWrite], then write, then run [postWrite].
     *
     * Subclass can write data in the call, or put it into a queue and do it later.
     * */
    protected abstract fun handleWrite(
        file: File, rawBytes: ByteArray,
        preWrite: () -> Unit, postWrite: () -> Unit
    )

    override fun storeProto(name: String, proto: AritegObject): AritegLink {
        val rawBytes = proto.toByteArray()
        val primaryMultihash = primaryMultihashProvider.digest(rawBytes)
        val primaryMultihashByteString = ByteString.copyFrom(primaryMultihash.toBytes())
        val secondaryMultihash = secondaryMultihashProvider.digest(rawBytes)
        val secondaryMultihashByteString = ByteString.copyFrom(secondaryMultihash.toBytes())

        // not presenting in type db
        if (!objectTypeMap.containsKey(primaryMultihashByteString)) {
            // set version if no one is writing, get old version
            val oldSecondaryMultihash = objectWritingMap.putIfAbsent(
                primaryMultihashByteString, secondaryMultihashByteString
            )?.toMultihash()
            if (oldSecondaryMultihash != null) {
                // someone is writing, check the secondary hash
                require(oldSecondaryMultihash == secondaryMultihash) {
                    val encodedData = Base64.getEncoder().encode(rawBytes)
                    "Hash check failed! Data '${encodedData}' has the same primary hash with ${primaryMultihash.toBase58()}, " +
                            "but secondary hash is ${secondaryMultihash.toBase58()}, " +
                            "expected ${oldSecondaryMultihash.toBase58()}"
                }
                // The check will failed the process if secondary hash not match
            } else {
                // no one is writing, this client can write
                val type = proto.type.name.lowercase()
                val file = getObjectFile(type, primaryMultihash.toBase58())
                handleWrite(file, rawBytes, {}, {
                    synchronized(db) {
                        // add to type db
                        objectTypeMap[primaryMultihashByteString] = type
                        // add to secondary hash map
                        objectMultihashMap[primaryMultihashByteString] = secondaryMultihashByteString
                        // remove from writing queue
                        check(
                            objectWritingMap.remove(
                                primaryMultihashByteString,
                                secondaryMultihashByteString
                            )
                        ) { "Cannot remove entry from writing map. Data corrupt!" }
                        // flush changes to file
                        db.commit()
                    }
                })
            }
        }

        return AritegLink.newBuilder()
            .setName(name)
            .setMultihash(primaryMultihashByteString)
            .build()
    }

    override fun deleteProto(proto: AritegObject): Boolean {
        val rawBytes = proto.toByteArray()
        val multihash = primaryMultihashProvider.digest(rawBytes)
        return deleteProto(multihash)
    }

    override fun deleteProto(link: AritegLink): Boolean {
        return deleteProto(link.multihash.toMultihash())
    }

    private fun deleteProto(multihash: Multihash): Boolean {
        val multihashByteString = ByteString.copyFrom(multihash.toBytes())
        val type = synchronized(db) {
            // try to remove from type map
            val type = objectTypeMap.remove(multihashByteString)
            if (type != null) {
                // object exists, delete secondary hash and file
                objectMultihashMap.remove(multihashByteString)
                db.commit()
            }
            type
        }
        if (type != null) {
            // try to delete the file, ok to failed, it will be overwritten next time
            getObjectFile(type, multihash.toBase58()).delete()
        }
        // Not found, or in writing queue
        return false
    }

    override fun linkExists(link: AritegLink): Boolean {
        // object exists on disk
        if (objectTypeMap.contains(link.multihash))
            return true
        // object exists in memory
        if (objectWritingMap.contains(link.multihash))
            return true
        // not found
        return false
    }

    override fun prepareLink(link: AritegLink, option: PrepareOption) {
        if (!linkExists(link))
            throw ObjectNotFoundException(link)
    }

    override fun linkAvailable(link: AritegLink): Boolean {
        // read objects on disk
        if (objectTypeMap.contains(link.multihash))
            return true
        // read objects in writing queue
        if (objectWritingMap.contains(link.multihash))
            return false
        // not found
        throw ObjectNotFoundException(link)
    }

    @Throws
    override fun loadProto(link: AritegLink): AritegObject {
        val multihash = link.multihash.toMultihash()
        val type = objectTypeMap[link.multihash]
        if (type != null) {
            val file = getObjectFile(type, link.multihash.toMultihash().toBase58())
            val rawBytes = file.readBytes()
            // check loaded hash
            val loadedHash = when (multihash.type) {
                primaryMultihashProvider.getType() -> primaryMultihashProvider.digest(rawBytes)
                else -> throw UnsupportedOperationException(
                    "Unsupported multihash type: ${multihash.type}. " +
                            "Only ${primaryMultihashProvider.getType()} is supported"
                )
            }
            if (multihash != loadedHash) throw HashNotMatchException(multihash, loadedHash)
            return AritegObject.parseFrom(rawBytes)
        }
        if (objectWritingMap.contains(link.multihash))
            throw ObjectNotReadyException(multihash)
        throw ObjectNotFoundException(multihash)
    }

    override fun close() {
        db.close()
    }
}

这里实现了所有来自接口的方法,并提供了新的抽象方法handleWrite供子类实现写入的细节。通过注释可以看出,该方法传入两个函数,一个File和一个字节数组。两个函数分别在写入前和写入后执行,目前写入前为空,纯粹是为了可拓展性保留的。写入后要执行的代码用于提交数据库,将写入的数据提交到数据库中,数据库的使用由同步锁保护。此外还有一个可以由子类覆盖的方法是getObjectFile。默认实现如同上文介绍的文件夹结构。如果未来需要大规模在磁盘上使用,一个目录下过多零碎文件会给文件系统带来压力(尤其是智障Windows),所以可以进一步采取诸如前缀树等结构优化,子类通过覆盖这个函数可以改变类型和Multihash到磁盘文件的映射方式。

这里使用的数据库由MapDB提供,它实现了原生的并发HashMap,因此可以使用许多原子操作而无需加锁。例如在判单待写入proto是否正在被其他人写入时,通过objectWritingMap.putIfAbsent这一原子操作即可同时实现两件事:获取原来的值,如果原来的值不存在,那么就把我的值放进去。有原子性保证每一个Key最多只有一个人可以写入这个值,当其他人再执行相同的操作时,将会返回我写入的值。这样也就避免了重复写的问题。

至于其他实现,应该没有什么过多需要讲解的地方,我在关键的逻辑处都写有注释,这里就不多赘述了。

StorageLayer

解决了存储系统带来的问题,下面就该说上层建筑了。常见的数据组织结构是文件系统,一般就是指采用文件和文件夹的组织形式,这是我们最熟悉的方式。除此之外还有对象存储,这时候就不再区分文件夹了,所有的文件都是平起平坐的(但是通过\分割的方法,我们还是可以让它和文件夹划等号)。此外还有数据库,改为表的形式。StorageLayer接口要解决的问题就是根据不同数据组织形式的特点,给出将特定操作转换为对StorageClient的调用。

目前这个接口具体应该有什么功能我还完全没有想好,因此整体设计都偏向于文件系统。

package info.skyblond.ariteg.storage.layer

import info.skyblond.ariteg.AritegObject
import info.skyblond.ariteg.storage.client.StorageClient
import java.io.File
import java.io.InputStream

interface StorageLayer {
    companion object {
        const val DEFAULT_BLOB_SIZE = 256 * 1024
        const val DEFAULT_LIST_LENGTH = 176
    }

    fun writeInputStreamToProto(
        inputStream: InputStream,
        blobSize: Int = DEFAULT_BLOB_SIZE,
        listLength: Int = DEFAULT_LIST_LENGTH
    ): AritegObject

    fun readInputStreamFromProto(
        proto: AritegObject
    ): InputStream

    fun storeDir(dir: File): AritegObject

    fun walkTree(treeProto: AritegObject, foo: (StorageClient, StorageLayer, String, AritegObject) -> Unit)
}

这里只定义了4个基本操作。前两个是面向整块数据的,可以将字节流写入到系统中,返回list或者blob,也可以将list或blob的内容作为输入流返回,供其他程序使用。随后两个是面向文件夹的操作,可以将文件夹作为一个tree写入系统,也可以根据一个tree来遍历其中的内容。遍历需要要给tree和一个函数,这个函数将在每一个元素上调用,调用时传入当前的StorageClient和StorageLayer,同时还有对象的路径,例如/some_folder/some_file,以及对象本身。这里遍历的API可以采用Visitor模式进行设计,但是比较复杂,就采用了这样一种简陋的方法。

关于commit类型的应用,主要还是版本控制,对于概念验证来说用不到,目前我也没有想好它的应用,以及调用方式。所以姑且就算是未实现的Feature好了。

写完这段文字之后反思,也许每一种数据的组织方式都各不相同。文件可以按照上面说的那样实现。数据库可以将每一个记录作为一个tree实现,但和文件系统完全不一样。也许我应该考虑删掉StorageLayer这个接口,或者只让它成为一个类型接口(即没有成员方法的接口),然后放开限制让子类面向自己专有的组织形式去实现,这样得到的类或许会更好用一些。

效果测试

手头正好有一个16GB的ISO文件,于是拿来测试性能。测试程序如下:

package info.skyblond.ariteg

import info.skyblond.ariteg.storage.client.disk.AsyncNativeStorageClient
import info.skyblond.ariteg.storage.layer.FileNativeStorageLayer
import org.slf4j.LoggerFactory
import java.io.File
import java.security.MessageDigest


object Main {
    private val logger = LoggerFactory.getLogger("Application")

    @JvmStatic
    fun main(args: Array<String>) {
        AsyncNativeStorageClient(File("./data").also { it.mkdirs() }).use { storageClient ->
            val storageLayer = FileNativeStorageLayer(storageClient)
            logger.info("Storage file")
            val obj = timing {
                File("C:\\Adobe_2021_MasterCol_win_v11.9#1_20210901.iso").inputStream().use {
                    storageLayer.writeInputStreamToProto(it)
                }
            }
            logger.info("Wait all writing finished")
            timing {
                val link = storageClient.storeProto(obj)
                while (!storageClient.linkAvailable(link))
                    Thread.yield()
            }
            logger.info("Read out, calculate sha256")
            val digest = MessageDigest.getInstance("SHA-256")

            val buffer = ByteArray(4096 * 1024) // 4MB
            val sha256 = timing {
                storageLayer.readInputStreamFromProto(obj).use { inputStream ->
                    var counter: Int
                    while (inputStream.read(buffer, 0, buffer.size)
                            .also { counter = it } != -1
                    ) {
                        digest.update(buffer, 0, counter)
                    }
                    digest.digest()
                }
            }
            logger.info(sha256.toHex())
        }
    }

    private fun ByteArray.toHex(): String = joinToString(separator = "") { eachByte -> "%02x".format(eachByte) }

    private fun <T> timing(foo: () -> T): T {
        val start = System.currentTimeMillis()
        val result = foo()
        val end = System.currentTimeMillis()
        logger.info("Time used: ${end - start} ms")
        return result
    }
}

这个测试程序把文件写入系统,统计写入的时间,由于使用的是异步的客户端,所以有可能还有未写入的数据,等待所有数据全部写入。随后读出数据计算SHA256,与其他工具计算的结果进行对比。

首次写入:

Storage file
Time used: 463515 ms
Wait all writing finished
Time used: 861 ms
Read out, calculate sha256
Time used: 322484 ms
f84eef7eb41b96598a42d0fa157487db7f2f493157e2b592ff07f3431bb6b676

再次运行:

Storage file
Time used: 242401 ms
Wait all writing finished
Time used: 0 ms
Read out, calculate sha256
Time used: 339523 ms
f84eef7eb41b96598a42d0fa157487db7f2f493157e2b592ff07f3431bb6b676

看起来文件系统的性能确实不太行,但好在数据写入后再读出来没有问题。


知识共享许可协议
【代码札记】基于 Merkle DAG 的文件存储服务 P3 概念验证天空 Blond 采用 知识共享 署名 - 非商业性使用 - 相同方式共享 4.0 国际 许可协议进行许可。
本许可协议授权之外的使用权限可以从 https://skyblond.info/about.html 处获得。

Archives QR Code
QR Code for this page
Tipping QR Code