package nl.astraeus.nl.astraeus.persistence import java.io.File import java.io.ObjectInputStream import java.io.ObjectOutputStream import java.io.Serializable import java.text.DecimalFormat import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong import kotlin.reflect.KClass enum class ActionType { STORE, DELETE } class TypeData( var nextId: AtomicLong = AtomicLong(1L), val data: MutableMap = ConcurrentHashMap(), ) : Serializable { companion object { private const val serialVersionUID: Long = 1L } } class Action( val type: ActionType, val obj: Persistable ) : Serializable { override fun toString(): String { return "Action(type=$type, obj=$obj)" } companion object { private const val serialVersionUID: Long = 1L } } class Datastore( private val directory: File, indexes: Array = arrayOf(), val enableOptimisticLocking: Boolean = false, ) { private val fileManager = FileManager(directory) private val transactionFormatter = DecimalFormat("#") private var nextTransactionNumber = 1L private val data: MutableMap, TypeData> = ConcurrentHashMap() private val indexes: MutableMap, MutableMap> = ConcurrentHashMap() init { for (index in indexes) { this.indexes.getOrPut(index.cls) { ConcurrentHashMap() }[index.name] = index } loadTransactions() } fun getNextId(javaClass: Class): Long { if (data[javaClass] == null) { synchronized(this) { if (data[javaClass] == null) { data[javaClass] = TypeData() } } } return data[javaClass]!!.nextId.getAndIncrement() } fun setMaxId(javaClass: Class, id: Long) { val nextId = data.getOrPut(javaClass) { TypeData() }.nextId val current = nextId.get() if (current <= id) nextId.addAndGet(id - current) } override fun toString(): String { return "Datastore(directory=${fileManager.directory}, classes=${data.keys.size}, indexes=${indexes.keys.size})" } // print status, show number of entries for each class and index fun printStatus() { println(this) for ((cls, typeData) in data) { println(" ${cls.simpleName}: ${typeData.data.size}") for ((name, index) in indexes.getOrDefault(cls, mutableMapOf())) { println(" $name: ${index.index.size}") } } } private fun loadTransactions() { val start = System.nanoTime() synchronized(this) { val (lastSnapshot, lastSnapshotFile) = fileManager.findLastSnapshot() if (lastSnapshotFile != null) { ObjectInputStream(lastSnapshotFile.inputStream()).use { ois -> readSnapshot(ois) } } val transactions = fileManager.findTransactionsAfter(lastSnapshot ?: 0L) transactions?.forEach { file -> ObjectInputStream(file.inputStream()).use { ois -> readTransaction(ois) } } } Logger.debug("Loaded transactions in %6.3fms", ((System.nanoTime() - start) / 1_000_000f)) } private fun execute(actions: Set) { synchronized(this) { if (enableOptimisticLocking) { for (action in actions) { val typeData = data.getOrPut(action.obj::class.java) { TypeData() } if (action.type == ActionType.STORE) { if ((typeData.data[action.obj.id]?.version ?: -1L) >= action.obj.version) { throw OptimisticLockingException(action.obj) } } } } for (action in actions) { val typeData = data.getOrPut(action.obj::class.java) { TypeData() } when (action.type) { ActionType.STORE -> { typeData.data[action.obj.id] = action.obj if (action.obj.id >= typeData.nextId.get()) { typeData.nextId.set(action.obj.id + 1) } for (index in indexes[action.obj::class.java]?.values ?: listOf()) { index.remove(action.obj.id) index.add(action.obj) } } ActionType.DELETE -> { typeData.data.remove(action.obj.id) for (index in indexes[action.obj::class.java]?.values ?: listOf()) { index.remove(action.obj.id) } } } } } } fun count(clazz: KClass): Int { val typeData = data.getOrPut(clazz.java) { TypeData() } return typeData.data.size } fun find(clazz: KClass, id: Long): T? { val typeData = data.getOrPut(clazz.java) { TypeData() } val p: Persistable? = typeData.data[id] return (p?.copy() as T?) } fun search( clazz: KClass, search: (T) -> Boolean ): List { val typeData = data.getOrPut(clazz.java) { TypeData() } return typeData.data.values .filter { search(it as T) } .map { o -> o.copy() as T } } fun findIndex( kClass: KClass<*>, indexName: String ): PersistableIndex? { return indexes[kClass.java]?.get(indexName) } fun storeAndExecute(actions: Set) { if (actions.isNotEmpty()) { synchronized(this) { execute(actions) writeTransaction(actions) } } } private fun readTransaction(ois: ObjectInputStream) { val versionNumber = ois.readInt() check(versionNumber == 1) { "Unsupported version number: $versionNumber" } val transactionNumber = ois.readLong() nextTransactionNumber = transactionNumber + 1 val actions = ois.readObject() as Set execute(actions) } private fun writeTransaction(actions: Set) { val number = transactionFormatter.format(nextTransactionNumber) val file = File(directory, "transaction-$number.trn") ObjectOutputStream(file.outputStream()).use { oos -> // version number oos.writeInt(1) oos.writeLong(nextTransactionNumber++) oos.writeObject(actions) } } fun snapshot() { val start = System.nanoTime() synchronized(this) { val number = transactionFormatter.format(nextTransactionNumber) val file = File(directory, "transaction-$number.snp") ObjectOutputStream(file.outputStream()).use { oos -> // version number oos.writeInt(1) oos.writeLong(nextTransactionNumber++) oos.writeObject(data) oos.writeInt(indexes.size) for ((cls, index) in indexes) { oos.writeObject(cls) oos.writeInt(index.size) for ((name, idx) in index) { oos.writeObject(name) oos.writeObject(idx.index) } } } } Logger.debug("Snapshot in %6.3fms", ((System.nanoTime() - start) / 1_000_000f)) } private fun readSnapshot(ois: ObjectInputStream) { val versionNumber = ois.readInt() check(versionNumber == 1) { "Unsupported version number: $versionNumber" } nextTransactionNumber = ois.readLong() + 1 data.clear() data.putAll(ois.readObject() as MutableMap, TypeData>) val foundIndexes = mutableMapOf, MutableList>() val numberOfClassesWithIndex = ois.readInt() for (i in 0 until numberOfClassesWithIndex) { val cls = ois.readObject() as Class<*> val numberOfIndexesForClass = ois.readInt() for (indexForClass in 0 until numberOfIndexesForClass) { val name = ois.readObject() as String val idx = ois.readObject() as MutableMap> foundIndexes.getOrPut(cls) { mutableListOf() }.add(name) val index = indexes[cls] if (index != null) { index[name]?.index?.putAll(idx) } // else ignore, index is removed } } // any (new) index not serialized needs to be build now for ((cls, indexes) in indexes) { for ((name, index) in indexes) { if (!foundIndexes.getOrDefault(cls, mutableListOf()).contains(name)) { index.index.clear() for (obj in data.getOrDefault(cls, TypeData()).data.values) { index.add(obj) } } } } } fun removeOldFiles() { fileManager.removeOldFiles() } }