MENU

【代码札记】康威生命游戏

February 18, 2023 • 瞎折腾

本文记述我使用Kotlin在JVM上实现Conway's Game of Life。

游戏规则

康威生命游戏并不是什么新鲜玩意儿了,它可以视作一个二维的棋盘,每一个格子被称为细胞(Cell),每一个细胞有两种状态:死亡或存活。每一轮迭代时,所有细胞同时更新状态,更新的规则如下:

  • 若细胞当前存活:

    • 当周围的存活细胞数量小于2时,这个细胞在迭代时死亡(因人烟稀少)
    • 当周围的存活细胞数量大于3时,这个细胞在迭代时死亡(因人口过剩)
    • 当周围有2个或3个存活的细胞时,这个细胞在迭代时继续存活
  • 若细胞当前死亡:

    • 当周围有且只有3个存活细胞时,这个细胞在迭代时复活(模拟繁殖)
    • 其他形况下,细胞继续保持死亡状态

这个规则其实还挺简单的,并且它可以使用多线程进行运算。

实现思路

这个游戏的“世界”可以用一个二维数组表示,数组的元素就是一个布尔值,true表示存活,false表示死亡。每次更新细胞时先统计它周围的存活细胞,然后根据细胞的当前状态使用if else做判断,更新细胞状态。考虑到规则要求所有细胞同时更新,因此我们需要两个“世界”,一个表示当前的细胞状态,计算出下一状态后,将新的状态写入另一个世界,表示未来的状态。当所有细胞都产生了新状态后,交换这两个世界的指针,这样一来,未来的状态就成为了现在的状态,而原来的状态则成为记录未来状态的容器。

不论使用什么语言,这种设计还都挺合理的对吧?在JVM上也一样,对吧?

整体来说还挺对的,但有一些小问题。首先,JVM标准并没有规定一个布尔值应该占多少内存。可能是1比特,也可能是1字节,这完全取决于JVM的实现。如果JVM使用1比特,那么含有1024个元素的布尔数组的大小就是128字节;如果采用1字节表示一个布尔,那么同样大小的数组就要占用1KB。假如说我们要渲染一个1080P的康威游戏,1920乘以1080,一共是2073600(200万)个细胞,每个细胞占用1字节,这就是2MB的内存,而原本它可以只占用不到256KB的。

当然了,好在JVM中其他类型的大小是固定的。你可以选择使用byte作为容器,这样一个byte就有8个bit,但鉴于kotlin的类型系统比较生硬,在做位运算的时候需要将byte转换成int,再将int转换为byte存回去,这样子很麻烦,所以我是用Int作为容器,每个Int可以存储32个比特,即便有浪费,也最多是最后一个Int没有完全使用,浪费的部分不超过4字节。

熟悉BitSet的读者可能会说,诶呀,你说的这个不就是BitSet嘛!Java中提供了一个可变长度的BitSet,可以方便地做位运算。但这里还有一个问题:内存一致性。

关于这一部分,最开始的构想是使用分段锁来保护不同位置,这样既保证了内存一致性,又保证不同位置的独写互不冲突。但是后来发现一个问题:写入一个状态需要读取九个状态,换句话说,如果读写都加锁(读需要加锁保证线程可以看到之前写的数据),那么更新一个细胞就要竞争10次锁,一旦竞争失败,synchronized关键字就会把竞争失败的线程转变为操作系统的阻塞态。这是非常耗费时间的。

谈及多线程和内存一致性,包括我在内,第一反应可能都是用同步锁,无论是synchronized关键字还是同步代码块,总之,正确用锁就能保证安全。但是从性能角度来看,监视器锁可能是性能最差的(字节码中使用了monitorentermonitorexit),因为它依赖于操作系统将竞争失败的线程阻塞,锁释放时再依赖操作系统唤醒线程。在这里,我们更新一个细胞需要读取9次,而每次都要竞争锁,如果竞争失败则需要在阻塞、唤醒上浪费时间。采用自旋锁(ReentrantLock)则能够避开线程调度。

选好了锁,我们再想想,到底哪里需要锁?我们使用锁的目的就是利用它的顺序性,也就是所谓的happen before,即后进入锁的线程一定能够看到先前持有锁进程产生的修改——后来的线程在读取世界状态时,必须能够看到之前由多个线程计算出来的状态。可我们真的需要使用锁来保证这一点吗?

除了锁,只读内存也是同步安全的,只读就是所谓的immutable,不可变对象。通过游戏规则可知,所有细胞必须同时更新,因此这不可能是就地更新,也就是说,我们从当前世界读取状态,然后将修改写入其他地方。如果当前世界是不可变的呢?这样一来,读取的时候我们就不需要锁了。

读取的时候不需要锁并不意味着不需要锁了。上锁的时机从读取时提前到了创建对象时。计算下一步的状态仍然需要一个可修改的状态,我们仍然需要使用锁来写入,而读取的时候也需要锁来保证我们可以看到其他线程的修改。但如果我们在计算完成后抄写一份,我们只需要在这里上一次锁,然后将抄写的结果作为不可变对象使用,那么之后就不再需要上锁了。

实现细节

WorldMap

“世界”的实现如下:

package info.skyblond.pocog

import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

class WorldMap private constructor(
    val height: Int, val width: Int,
    private val data: Array<IntArray>
) {
    init {
        require(height == data.size) { "Height doesn't match the data" }
        val wordWidth = width / Int.SIZE_BITS + if (width % Int.SIZE_BITS == 0) 0 else 1
        for (i in 1 until data.size) {
            require(data[i].size == wordWidth) { "Rows have different width" }
        }
    }

    operator fun get(x: Int, y: Int): Boolean {
        require(x in 0 until width) { "x out of bound" }
        require(y in 0 until height) { "y out of bound" }
        val wordIndex = x / Int.SIZE_BITS
        val wordOffset = x % Int.SIZE_BITS
        return data[y][wordIndex] and (1 shl wordOffset) != 0
    }

    class Builder(
        private val width: Int, private val height: Int
    ) {
        // ......
    }
}

这里可以看到,WorldMap的底层还是一个data数组,但是整个实现中都没有写入数组的代码。没有修改,也就不需要同步,自然也不需要锁了。而为了方便构造实例,我们在内部实现了一个Builder

    class Builder(
        private val width: Int, private val height: Int
    ) {
        private val wordWidth = width / Int.SIZE_BITS + if (width % Int.SIZE_BITS == 0) 0 else 1
        private val data = Array(height) { IntArray(wordWidth) }
        private val locks = Array(height) { ReentrantLock() }

        operator fun set(x: Int, y: Int, newValue: Boolean) {
            require(y in 0 until height) { "y out of bound" }
            require(x in 0 until width) { "x out of bound" }
            val wordIndex = x / Int.SIZE_BITS
            val wordOffset = x % Int.SIZE_BITS
            locks[y].withLock {
                if (newValue) setTrue(wordIndex, wordOffset, y) else setFalse(wordIndex, wordOffset, y)
            }
        }

        private fun setTrue(wordIndex: Int, wordOffset: Int, y: Int) {
            val mask = 1 shl wordOffset
            data[y][wordIndex] = data[y][wordIndex] or mask
        }

        private fun setFalse(wordIndex: Int, wordOffset: Int, y: Int) {
            val mask = (1 shl wordOffset).inv()
            data[y][wordIndex] = data[y][wordIndex] and mask
        }
        
        fun build(): WorldMap {
            val result = Array(height) { IntArray(wordWidth) }
            for (i in data.indices) {
                locks[i].withLock {
                    System.arraycopy(data[i], 0, result[i], 0, wordWidth)
                }
            }
            return WorldMap(height, width, result)
        }
    }

这个Builder不是线程安全的:当最终调用build时,程序会依次获取每一行的锁,将当前的结果拷贝到一个数组里。如果此时其他行发生了修改,那么就无法保证一致性了。至于选用行锁,单纯是因为可重入锁不涉及阻塞和唤醒,竞争失败的代价很小,同时操作也仅仅是读取、位运算并赋值,没什么耗时操作,因此粗略的选用行锁不会带来太大的性能惩罚。

游戏

游戏本身也不难实现,这里定义了一些基础的功能:

package info.skyblond.pocog

import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

class ConwaysGame(
    val gameWidth: Int,
    val gameHeight: Int,
    val heightSubdivisionSize: Int,
    val parallelism: Int
) : AutoCloseable {
    private val lock = ReentrantLock()
    private val threadPool = ForkJoinPool(parallelism)
    val gameSize = gameWidth.toLong() * gameHeight.toLong()

    @Volatile
    private lateinit var cellStatus: WorldMap

    @Volatile
    private var nextCellStatus: WorldMap? = null

    fun reset(generator: (x: Int, y: Int) -> Boolean) {
        val builder = WorldMap.Builder(gameWidth, gameHeight)
        for (i in 0 until gameWidth) {
            for (j in 0 until gameHeight) {
                builder[i, j] = generator(i, j)
            }
        }
        cellStatus = builder.build()
    }

    fun countNeighbors(x: Int, y: Int): Int {
        var counter = 0
        for (i in x - 1..x + 1) {
            for (j in y - 1..y + 1) {
                if (i == x && j == y) continue
                // count if in range
                if (i in 0 until gameWidth && j in 0 until gameHeight) {
                    if (cellStatus[i, j]) counter++
                }
            }
        }
        return counter
    }

    private fun updateCell(builder: WorldMap.Builder, x: Int, y: Int) {
        val currentStatus = cellStatus[x, y]
        val neighbor = countNeighbors(x, y)
        if (currentStatus) {
            // Any live cell with two or three live neighbours lives on to the next generation.
            // Any live cell with fewer than two live neighbours dies, as if by underpopulation.
            // Any live cell with more than three live neighbours dies, as if by overpopulation.
            builder[x, y] = neighbor in 2..3
        } else {
            // Any dead cell with exactly three live neighbours becomes a live cell, as if by reproduction.
            builder[x, y] = neighbor == 3
        }
    }

    private fun updateLine(builder: WorldMap.Builder, y1: Int, y2: Int) {
        // p1: top left, p2: bottom right
        for (j in y1 until y2.coerceAtMost(gameHeight)) {
            for (i in 0 until gameWidth) {
                updateCell(builder, i, j)
            }
        }
    }

    fun calculateNextTick(): Unit = lock.withLock {
        val builder = WorldMap.Builder(gameWidth, gameHeight)
        val futureList = LinkedList<CompletableFuture<Void>>()
        for (y in 0 until gameHeight step heightSubdivisionSize) {
            CompletableFuture.runAsync({
                updateLine(builder, y, y + heightSubdivisionSize)
            }, threadPool).also { futureList.add(it) }
        }
        futureList.forEach { it.get() }
        nextCellStatus = builder.build()
    }

    fun useCellStatus(
        block: ((x: Int, y: Int, isNext: Boolean) -> Boolean) -> Unit
    ): Unit = lock.withLock {
        if (nextCellStatus == null) calculateNextTick()
        val func = fun(x: Int, y: Int, isNext: Boolean): Boolean =
            if (isNext) nextCellStatus!![x, y] else cellStatus[x, y]
        block.invoke(func)
    }

    /**
     * For debug only, make sure each cell is updated correctly.
     * */
    fun checkResult() {
        useCellStatus { query -> // check result
            for (y in 0 until gameHeight) {
                for (x in 0 until gameWidth) {
                    val current = query(x, y, false)
                    val next = query(x, y, true)
                    val nc = countNeighbors(x, y)
                    if (current) { // current alive
                        if (next) check(nc == 2 || nc == 3) { "($x, $y) should die, but alive, nc: $nc" } // still alive
                        else check(nc < 2 || nc > 3) { "($x, $y) should alive, but die, nc: $nc" }  // die next tick
                    } else { // current die
                        if (next) check(nc == 3) { "($x, $y) shouldn't alive, but did, nc: $nc" }  // alive next tick
                        else check(nc != 3) { "($x, $y) should alive, but didn't, nc: $nc" }  // still die
                    }
                }
            }
        }
    }

    fun swapToNextTick(): Unit = lock.withLock {
        // done update, swap status
        if (nextCellStatus == null) calculateNextTick()
        cellStatus = nextCellStatus!!
        nextCellStatus = null
    }

    override fun close(): Unit = lock.withLock {
        threadPool.shutdown()
    }
}

应该都比较直观。其中reset提供了一种设置游戏初始状态的方式,updateCell用于更新一个细胞,而updateArea则用于更新一个举行范围内的细胞。calculateNextTick函数根据创建实例的参数,分区域指派任务,这个任务由线程池执行,所有计算执行完毕后函数返回。接下来用户可以使用useCellStatus来访问细胞状态。你会发现这几个函数都是同步的,这是保证这些操作是互斥的/顺序的。换句话说,你得先等着状态计算完毕,才能使用状态,或者进行别的操作。同样地,你得等别人操作完了,才能通过swapToNextTick步进到未来。这里checkResult是一个调试用的函数,它通过检查当前和未来的状态来检查游戏是否正确更新了每一个细胞。

正确性验证

当然,代码写完了还得确保它能正确运行:

    val game = ConwaysGame(1000, 1000, 100, 23, 8)
    game.reset { _, _, -> Random.nextBoolean() }
    for (i in 0 until 1000) {
        game.calculateNextTick()
        game.checkResult()
        game.swapToNextTick()
    }
    game.close()

这个代码会随机填充世界,并开始迭代。每次迭代都会检查程序的正确性。在这篇文章中,单元测试并不是主要内容,因此只要使用某种方法确认程序运转正确即可。如果实在不想写代码,将每次迭代的结果打印到终端,人肉检查也是可以的。

性能调节

单元测试不是重点,性能调节才是。你看,在创建Game实例的时候有很大的灵活性。除了世界的大小由用户指定之外,每个线程计算的区域大小,分配多少个线程,这些也要用户来指定。如果每个线程计算的区域太小,那么CPU资源就会浪费在给线程分配任务上;如果计算区域过大,那么线程得不到充分利用,就没法完全释放性能。此外,面积相同的情况下,行和列的尺寸对性能也会有影响(CPU缓存更偏好行)。此外,如果线程数太多,会加剧操作系统调度线程的压力,如果线程太少,那计算任务又分配不过来。而且最重要的是,调节这些参数不能凭感觉,那怎么办呢?

我曾经在调试Minecraft服务器的性能的时候做过简单的性能测量,但那样笼统的性能测量在这里是不适用的。要知道,JVM在运行时是一个复杂的系统,除了不确定的垃圾回收之外,还有各种优化器会随时加入进来,将字节码优化成二进制指令。在这方面而言,Java程序并不是幂等的。

那怎么办呢?实现JVM的那帮人也有相同的疑问,于是他们写了一个性能测量工具,叫做JMH。它主要用于“微基准测试”,也就是说,用于测量一些微小细节的性能的工具,例如测量i++++i到底能差出几纳秒。我们当然不需要测量的那么精准,我们只需要这个框架帮我们排除JVM优化以及垃圾回收这些元素的影响。至于JMH框架的具体使用就不说了,这里放一个代码:

package info.skyblond.pocog

import org.openjdk.jmh.annotations.*
import kotlin.random.Random


open class TestBench {

    @State(Scope.Benchmark)
    open class ExecutionPlan {

        @Param("1", "10", "50", "100", "300")
        open var heightSubdivisionSize = 0

        @Param("12")
        open var parallelism = 0

        private val random = Random(1234)

        @Volatile
        lateinit var game: ConwaysGame

        @Setup
        fun setUp() {
            game = ConwaysGame(
                600, 600, heightSubdivisionSize, parallelism
            )
            game.reset { _, _ -> random.nextBoolean() }
        }

        @TearDown
        fun tearDown() {
            game.close()
        }
    }

    @Benchmark
    @Fork(1)
    @Warmup(iterations = 5)
    @BenchmarkMode(Mode.Throughput)
    fun conway(plan: ExecutionPlan) {
        plan.game.calculateNextTick()
        plan.game.swapToNextTick()
    }
}

这里根据三个参数进行测试,分别是高度和线程数。测试的结果比较直观:高度为10和50时取得了较高的性能:每秒320步迭代。细化测试参数,将高度修改为:5、10、25、50、55,第二轮测试发现高度为25时性能最高。

为了避免用户手动设置,可以按照面积恒定的原则来计算:上述测试中宽600,高25,每个线程计算15000个格子。当宽度变化时,可以根据15000÷宽度得到对应的高度:

(15000 / gameWidth).coerceAtLeast(1)

可视化?

文章写到结尾,好像还没有谈及可视化。如果没有可视化,这玩意儿就只是在内存里的0和1。涉及到模拟的东西,它是一种动态的过程,所以我更希望能够以动图或视频的方式展现出来,而不是一堆数字编号的jpg图片。我本来是想用ffmpeg来一边模拟一边生成视频的,但是看起来这个工具好像只能接收图片输入。索性今天看到了一个叫做humble-video的Java库,能够将Java生成的BufferedImage编码成视频文件。在下一篇文章中,我将介绍如何使用这个工具来生成视频。

-全文完-


知识共享许可协议
【代码札记】康威生命游戏天空 Blond 采用 知识共享 署名 - 非商业性使用 - 相同方式共享 4.0 国际 许可协议进行许可。
本许可协议授权之外的使用权限可以从 https://skyblond.info/about.html 处获得。

Archives QR Code
QR Code for this page
Tipping QR Code