Fix multi-threading, add Query interface, java test

This commit is contained in:
2024-05-04 17:13:34 +02:00
parent 34b620dfa5
commit cc3ac67be6
11 changed files with 488 additions and 80 deletions

View File

@@ -6,6 +6,7 @@ import java.io.ObjectOutputStream
import java.io.Serializable
import java.text.DecimalFormat
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import kotlin.reflect.KClass
enum class ActionType {
@@ -14,8 +15,8 @@ enum class ActionType {
}
class TypeData(
var nextId: Long = 1L,
val data: MutableMap<Any, Persistable> = ConcurrentHashMap(),
var nextId: AtomicLong = AtomicLong(1L),
val data: MutableMap<Serializable, Persistable> = ConcurrentHashMap(),
) : Serializable
class Action(
@@ -43,6 +44,23 @@ class Datastore(
loadTransactions()
}
fun getNextId(javaClass: Class<Persistable>): Long {
if (data[javaClass] == null) {
synchronized(this) {
if (data[javaClass] == null) {
data[javaClass] = TypeData()
}
}
}
return data[javaClass]!!.nextId.getAndIncrement()
}
fun setMaxId(javaClass: Class<Persistable>, id: Long) {
val nextId = data.getOrPut(javaClass) { TypeData() }.nextId
if (nextId.get() <= id) nextId.set(id + 1)
}
override fun toString(): String {
return "Datastore(directory=${fileManager.directory}, classes=${data.keys.size}, indexes=${indexes.keys.size})"
}
@@ -95,14 +113,15 @@ class Datastore(
when (action.type) {
ActionType.STORE -> {
if (action.obj.id == 0L) {
action.obj.id = typeData.nextId++
}
typeData.data[action.obj.id] = action.obj
if (action.obj.id >= typeData.nextId.get()) {
typeData.nextId.set(action.obj.id + 1)
}
for (index in indexes[action.obj::class.java]?.values ?: listOf()) {
index.remove(action.obj.id)
index.add(action.obj as Persistable)
index.add(action.obj)
}
}
@@ -118,6 +137,15 @@ class Datastore(
}
}
fun <T : Persistable> count(clazz: KClass<T>): Int {
val typeData = data.getOrPut(clazz.java) {
TypeData()
}
return typeData.data.size
}
fun <T : Persistable> find(clazz: KClass<T>, id: Long): T? {
val typeData = data.getOrPut(clazz.java) {
TypeData()
@@ -148,15 +176,18 @@ class Datastore(
return indexes[kClass.java]?.get(indexName)
}
fun storeActions(actions: MutableList<Action>) {
fun storeAndExecute(actions: MutableList<Action>) {
if (actions.isNotEmpty()) {
synchronized(this) {
writeTransaction(actions)
execute(actions)
}
}
}
private fun readTransaction(ois: ObjectInputStream) {
val versionNumber = ois.readInt()
check (versionNumber == 1) { "Unsupported version number: $versionNumber" }
val transactionNumber = ois.readLong()
nextTransactionNumber = transactionNumber + 1
val actions = ois.readObject() as MutableList<Action>
@@ -167,6 +198,8 @@ class Datastore(
val number = transactionFormatter.format(nextTransactionNumber)
val file = File(directory, "transaction-$number.trn")
ObjectOutputStream(file.outputStream()).use { oos ->
// version number
oos.writeInt(1)
oos.writeLong(nextTransactionNumber++)
oos.writeObject(actions)
}
@@ -178,6 +211,8 @@ class Datastore(
val number = transactionFormatter.format(nextTransactionNumber)
val file = File(directory, "transaction-$number.snp")
ObjectOutputStream(file.outputStream()).use { oos ->
// version number
oos.writeInt(1)
oos.writeLong(nextTransactionNumber++)
oos.writeObject(data)
oos.writeInt(indexes.size)
@@ -195,6 +230,8 @@ class Datastore(
}
private fun readSnapshot(ois: ObjectInputStream) {
val versionNumber = ois.readInt()
check (versionNumber == 1) { "Unsupported version number: $versionNumber" }
nextTransactionNumber = ois.readLong() + 1
data.clear()
data.putAll(ois.readObject() as MutableMap<Class<*>, TypeData>)