Initial commit
This commit is contained in:
229
src/main/kotlin/nl/astraeus/persistence/Datastore.kt
Normal file
229
src/main/kotlin/nl/astraeus/persistence/Datastore.kt
Normal file
@@ -0,0 +1,229 @@
|
||||
package nl.astraeus.nl.astraeus.persistence
|
||||
|
||||
import java.io.File
|
||||
import java.io.ObjectInputStream
|
||||
import java.io.ObjectOutputStream
|
||||
import java.io.Serializable
|
||||
import java.text.DecimalFormat
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
enum class ActionType {
|
||||
STORE,
|
||||
DELETE
|
||||
}
|
||||
|
||||
class TypeData(
|
||||
var nextId: Long = 1L,
|
||||
val data: MutableMap<Any, Persistable> = ConcurrentHashMap(),
|
||||
) : Serializable
|
||||
|
||||
class Action(
|
||||
val type: ActionType,
|
||||
val obj: Persistable
|
||||
) : Serializable
|
||||
|
||||
class Datastore(
|
||||
private val directory: File,
|
||||
indexes: Array<PersistableIndex> = arrayOf(),
|
||||
) {
|
||||
private val fileManager = FileManager(directory)
|
||||
private val transactionFormatter = DecimalFormat("#")
|
||||
private var nextTransactionNumber = 1L
|
||||
private val data: MutableMap<Class<*>, TypeData> = ConcurrentHashMap()
|
||||
private val indexes: MutableMap<Class<*>, MutableMap<String, PersistableIndex>> = ConcurrentHashMap()
|
||||
|
||||
init {
|
||||
if (!directory.exists()) {
|
||||
directory.mkdirs()
|
||||
}
|
||||
|
||||
for (index in indexes) {
|
||||
this.indexes.getOrPut(index.cls) {
|
||||
ConcurrentHashMap()
|
||||
}[index.name] = index
|
||||
}
|
||||
|
||||
loadTransactions()
|
||||
}
|
||||
|
||||
private fun loadTransactions() {
|
||||
synchronized(this) {
|
||||
val snapshots: Array<File>? = directory.listFiles { _, name -> name.startsWith("transaction-") && name.endsWith(".snp") }
|
||||
val files: Array<File>? = directory.listFiles { _, name -> name.startsWith("transaction-") && name.endsWith(".trn") }
|
||||
|
||||
var lastSnapshot: Long? = null
|
||||
var lastSnapshotFile: File? = null
|
||||
|
||||
snapshots?.let {
|
||||
it.forEach {
|
||||
val trnx = getTrnx(it)
|
||||
if (lastSnapshot == null || trnx > (lastSnapshot ?: 0L)) {
|
||||
lastSnapshot = trnx
|
||||
lastSnapshotFile = it
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val lastSnapshotFile2 = fileManager.findLastSnapshot()
|
||||
|
||||
if (lastSnapshotFile != null) {
|
||||
ObjectInputStream(lastSnapshotFile?.inputStream()).use { ois ->
|
||||
readSnapshot(ois)
|
||||
}
|
||||
}
|
||||
|
||||
val trns = fileManager.findTransactionsAfter(lastSnapshot ?: 0L)
|
||||
|
||||
files?.also { snaphotFiles ->
|
||||
Arrays.sort(snaphotFiles) { o1, o2 -> if (getTrnx(o1) > getTrnx(o2)) 1 else -1}
|
||||
|
||||
snaphotFiles.forEach { file ->
|
||||
if (getTrnx(file) > (lastSnapshot ?: 0L)) {
|
||||
ObjectInputStream(file.inputStream()).use { ois ->
|
||||
val transactionNumber = ois.readLong()
|
||||
nextTransactionNumber = transactionNumber + 1
|
||||
val actions = ois.readObject() as MutableList<Action>
|
||||
execute(actions)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun getTrnx(file: File): Long {
|
||||
return file.name.substringAfterLast('/').substringAfter("transaction-").substringBefore(".").toLong()
|
||||
}
|
||||
|
||||
fun execute(actions: MutableList<Action>) {
|
||||
synchronized(this) {
|
||||
for (action in actions) {
|
||||
val typeData = data.getOrPut(action.obj::class.java) {
|
||||
TypeData()
|
||||
}
|
||||
|
||||
when (action.type) {
|
||||
ActionType.STORE -> {
|
||||
if (action.obj.id == 0L) {
|
||||
action.obj.id = typeData.nextId++
|
||||
}
|
||||
typeData.data[action.obj.id] = action.obj
|
||||
|
||||
for (index in indexes[action.obj::class.java]?.values ?: listOf()) {
|
||||
index.add(action.obj as Persistable)
|
||||
}
|
||||
}
|
||||
|
||||
ActionType.DELETE -> {
|
||||
typeData.data.remove(action.obj.id)
|
||||
|
||||
for (index in indexes[action.obj::class.java]?.values ?: listOf()) {
|
||||
index.remove(action.obj)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun <T : Persistable> find(clazz: KClass<T>, id: Long): T? {
|
||||
val typeData = data.getOrPut(clazz.java) {
|
||||
TypeData()
|
||||
}
|
||||
|
||||
val p: Persistable? = typeData.data[id]
|
||||
|
||||
return (p?.copy() as T?)
|
||||
}
|
||||
|
||||
fun <T : Persistable> search(
|
||||
clazz: KClass<T>,
|
||||
search: (T) -> Boolean
|
||||
): List<T> {
|
||||
val typeData = data.getOrPut(clazz.java) {
|
||||
TypeData()
|
||||
}
|
||||
|
||||
return typeData.data.values
|
||||
.filter { search(it as T) }
|
||||
.map { o -> o.copy() as T }
|
||||
}
|
||||
|
||||
fun findIndex(
|
||||
kClass: KClass<*>,
|
||||
indexName: String
|
||||
): PersistableIndex? {
|
||||
return indexes[kClass.java]?.get(indexName)
|
||||
}
|
||||
|
||||
fun storeActions(actions: MutableList<Action>) {
|
||||
if (actions.isNotEmpty()) {
|
||||
synchronized(this) {
|
||||
val number = transactionFormatter.format(nextTransactionNumber)
|
||||
val file = File(directory, "transaction-$number.trn")
|
||||
ObjectOutputStream(file.outputStream()).use { oos ->
|
||||
oos.writeLong(nextTransactionNumber++)
|
||||
oos.writeObject(actions)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun snapshot() {
|
||||
synchronized(this) {
|
||||
val number = transactionFormatter.format(nextTransactionNumber)
|
||||
val file = File(directory, "transaction-$number.snp")
|
||||
ObjectOutputStream(file.outputStream()).use { oos ->
|
||||
oos.writeLong(nextTransactionNumber++)
|
||||
oos.writeObject(data)
|
||||
oos.writeInt(indexes.size)
|
||||
for ((cls, index) in indexes) {
|
||||
oos.writeObject(cls)
|
||||
oos.writeInt(index.size)
|
||||
for ((name, idx) in index) {
|
||||
oos.writeObject(name)
|
||||
oos.writeObject(idx.index)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun readSnapshot(ois: ObjectInputStream) {
|
||||
nextTransactionNumber = ois.readLong() + 1
|
||||
data.clear()
|
||||
data.putAll(ois.readObject() as MutableMap<Class<*>, TypeData>)
|
||||
|
||||
val foundIndexes = mutableMapOf<Class<*>, MutableList<String>>()
|
||||
val numberOfClassesWithIndex = ois.readInt()
|
||||
for (i in 0 until numberOfClassesWithIndex) {
|
||||
val cls = ois.readObject() as Class<*>
|
||||
val numberOfIndexesForClass = ois.readInt()
|
||||
for (indexForClass in 0 until numberOfIndexesForClass) {
|
||||
val name = ois.readObject() as String
|
||||
val idx = ois.readObject() as MutableMap<Serializable, MutableSet<Long>>
|
||||
foundIndexes.getOrPut(cls) { mutableListOf() }.add(name)
|
||||
|
||||
val index = indexes[cls]
|
||||
if (index != null) {
|
||||
index[name]?.index?.putAll(idx)
|
||||
} // else ignore, index is removed
|
||||
}
|
||||
}
|
||||
|
||||
// any (new) index not serialized needs to be build now
|
||||
for ((cls, indexes) in indexes) {
|
||||
for ((name, index) in indexes) {
|
||||
if (!foundIndexes.getOrDefault(cls, mutableListOf()).contains(name)) {
|
||||
index.index.clear()
|
||||
for (obj in data.getOrDefault(cls, TypeData()).data.values) {
|
||||
index.add(obj)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user