Add FileManager, File cleanup, Logger
This commit is contained in:
@@ -5,7 +5,6 @@ import java.io.ObjectInputStream
|
|||||||
import java.io.ObjectOutputStream
|
import java.io.ObjectOutputStream
|
||||||
import java.io.Serializable
|
import java.io.Serializable
|
||||||
import java.text.DecimalFormat
|
import java.text.DecimalFormat
|
||||||
import java.util.*
|
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import kotlin.reflect.KClass
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
@@ -35,10 +34,6 @@ class Datastore(
|
|||||||
private val indexes: MutableMap<Class<*>, MutableMap<String, PersistableIndex>> = ConcurrentHashMap()
|
private val indexes: MutableMap<Class<*>, MutableMap<String, PersistableIndex>> = ConcurrentHashMap()
|
||||||
|
|
||||||
init {
|
init {
|
||||||
if (!directory.exists()) {
|
|
||||||
directory.mkdirs()
|
|
||||||
}
|
|
||||||
|
|
||||||
for (index in indexes) {
|
for (index in indexes) {
|
||||||
this.indexes.getOrPut(index.cls) {
|
this.indexes.getOrPut(index.cls) {
|
||||||
ConcurrentHashMap()
|
ConcurrentHashMap()
|
||||||
@@ -48,49 +43,43 @@ class Datastore(
|
|||||||
loadTransactions()
|
loadTransactions()
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun loadTransactions() {
|
override fun toString(): String {
|
||||||
synchronized(this) {
|
return "Datastore(directory=${fileManager.directory}, classes=${data.keys.size}, indexes=${indexes.keys.size})"
|
||||||
val snapshots: Array<File>? = directory.listFiles { _, name -> name.startsWith("transaction-") && name.endsWith(".snp") }
|
}
|
||||||
val files: Array<File>? = directory.listFiles { _, name -> name.startsWith("transaction-") && name.endsWith(".trn") }
|
|
||||||
|
|
||||||
var lastSnapshot: Long? = null
|
// print status, show number of entries for each class and index
|
||||||
var lastSnapshotFile: File? = null
|
fun printStatus() {
|
||||||
|
println(this)
|
||||||
snapshots?.let {
|
for ((cls, typeData) in data) {
|
||||||
it.forEach {
|
println(" ${cls.simpleName}: ${typeData.data.size}")
|
||||||
val trnx = getTrnx(it)
|
for ((name, index) in indexes.getOrDefault(cls, mutableMapOf())) {
|
||||||
if (lastSnapshot == null || trnx > (lastSnapshot ?: 0L)) {
|
println(" $name: ${index.index.size}")
|
||||||
lastSnapshot = trnx
|
|
||||||
lastSnapshotFile = it
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
val lastSnapshotFile2 = fileManager.findLastSnapshot()
|
private fun loadTransactions() {
|
||||||
|
val start = System.nanoTime()
|
||||||
|
|
||||||
|
synchronized(this) {
|
||||||
|
val (lastSnapshot, lastSnapshotFile) = fileManager.findLastSnapshot()
|
||||||
|
|
||||||
if (lastSnapshotFile != null) {
|
if (lastSnapshotFile != null) {
|
||||||
ObjectInputStream(lastSnapshotFile?.inputStream()).use { ois ->
|
ObjectInputStream(lastSnapshotFile.inputStream()).use { ois ->
|
||||||
readSnapshot(ois)
|
readSnapshot(ois)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val trns = fileManager.findTransactionsAfter(lastSnapshot ?: 0L)
|
val transactions = fileManager.findTransactionsAfter(lastSnapshot ?: 0L)
|
||||||
|
|
||||||
files?.also { snaphotFiles ->
|
transactions?.forEach { file ->
|
||||||
Arrays.sort(snaphotFiles) { o1, o2 -> if (getTrnx(o1) > getTrnx(o2)) 1 else -1}
|
ObjectInputStream(file.inputStream()).use { ois ->
|
||||||
|
readTransaction(ois)
|
||||||
snaphotFiles.forEach { file ->
|
|
||||||
if (getTrnx(file) > (lastSnapshot ?: 0L)) {
|
|
||||||
ObjectInputStream(file.inputStream()).use { ois ->
|
|
||||||
val transactionNumber = ois.readLong()
|
|
||||||
nextTransactionNumber = transactionNumber + 1
|
|
||||||
val actions = ois.readObject() as MutableList<Action>
|
|
||||||
execute(actions)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Logger.debug("Loaded transactions in ${(System.nanoTime() - start) / 1_000_000}ms")
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun getTrnx(file: File): Long {
|
private fun getTrnx(file: File): Long {
|
||||||
@@ -112,6 +101,7 @@ class Datastore(
|
|||||||
typeData.data[action.obj.id] = action.obj
|
typeData.data[action.obj.id] = action.obj
|
||||||
|
|
||||||
for (index in indexes[action.obj::class.java]?.values ?: listOf()) {
|
for (index in indexes[action.obj::class.java]?.values ?: listOf()) {
|
||||||
|
index.remove(action.obj.id)
|
||||||
index.add(action.obj as Persistable)
|
index.add(action.obj as Persistable)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -120,7 +110,7 @@ class Datastore(
|
|||||||
typeData.data.remove(action.obj.id)
|
typeData.data.remove(action.obj.id)
|
||||||
|
|
||||||
for (index in indexes[action.obj::class.java]?.values ?: listOf()) {
|
for (index in indexes[action.obj::class.java]?.values ?: listOf()) {
|
||||||
index.remove(action.obj)
|
index.remove(action.obj.id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -161,17 +151,29 @@ class Datastore(
|
|||||||
fun storeActions(actions: MutableList<Action>) {
|
fun storeActions(actions: MutableList<Action>) {
|
||||||
if (actions.isNotEmpty()) {
|
if (actions.isNotEmpty()) {
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
val number = transactionFormatter.format(nextTransactionNumber)
|
writeTransaction(actions)
|
||||||
val file = File(directory, "transaction-$number.trn")
|
|
||||||
ObjectOutputStream(file.outputStream()).use { oos ->
|
|
||||||
oos.writeLong(nextTransactionNumber++)
|
|
||||||
oos.writeObject(actions)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun readTransaction(ois: ObjectInputStream) {
|
||||||
|
val transactionNumber = ois.readLong()
|
||||||
|
nextTransactionNumber = transactionNumber + 1
|
||||||
|
val actions = ois.readObject() as MutableList<Action>
|
||||||
|
execute(actions)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun writeTransaction(actions: MutableList<Action>) {
|
||||||
|
val number = transactionFormatter.format(nextTransactionNumber)
|
||||||
|
val file = File(directory, "transaction-$number.trn")
|
||||||
|
ObjectOutputStream(file.outputStream()).use { oos ->
|
||||||
|
oos.writeLong(nextTransactionNumber++)
|
||||||
|
oos.writeObject(actions)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fun snapshot() {
|
fun snapshot() {
|
||||||
|
val start = System.nanoTime()
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
val number = transactionFormatter.format(nextTransactionNumber)
|
val number = transactionFormatter.format(nextTransactionNumber)
|
||||||
val file = File(directory, "transaction-$number.snp")
|
val file = File(directory, "transaction-$number.snp")
|
||||||
@@ -189,6 +191,7 @@ class Datastore(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Logger.debug("Snapshot in ${(System.nanoTime() - start) / 1_000_000}ms")
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun readSnapshot(ois: ObjectInputStream) {
|
private fun readSnapshot(ois: ObjectInputStream) {
|
||||||
@@ -226,4 +229,8 @@ class Datastore(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun removeOldFiles() {
|
||||||
|
fileManager.removeOldFiles()
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
62
src/main/kotlin/nl/astraeus/persistence/FileManager.kt
Normal file
62
src/main/kotlin/nl/astraeus/persistence/FileManager.kt
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
package nl.astraeus.nl.astraeus.persistence
|
||||||
|
|
||||||
|
import java.io.File
|
||||||
|
|
||||||
|
class FileManager(
|
||||||
|
val directory: File
|
||||||
|
) {
|
||||||
|
|
||||||
|
init {
|
||||||
|
if (!directory.exists()) {
|
||||||
|
directory.mkdirs()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fun findLastSnapshot(): Pair<Long?, File?> {
|
||||||
|
directory.listFiles { _, name ->
|
||||||
|
name.startsWith("transaction-") && name.endsWith(".snp")
|
||||||
|
}?.maxByOrNull {
|
||||||
|
getTrnx(it)
|
||||||
|
}?.also { file ->
|
||||||
|
return getTrnx(file) to file
|
||||||
|
}
|
||||||
|
|
||||||
|
return null to null
|
||||||
|
}
|
||||||
|
|
||||||
|
fun findTransactionsAfter(trnx: Long): List<File>? {
|
||||||
|
return directory.listFiles { _, name ->
|
||||||
|
name.startsWith("transaction-") && name.endsWith(".trn")
|
||||||
|
}?.filter {
|
||||||
|
getTrnx(it) > trnx
|
||||||
|
}?.sortedBy {
|
||||||
|
getTrnx(it)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun getTrnx(file: File): Long {
|
||||||
|
// todo: add checks, improve performance
|
||||||
|
return file.name
|
||||||
|
.substringAfterLast('/')
|
||||||
|
.substringAfter("transaction-")
|
||||||
|
.substringBefore(".")
|
||||||
|
.toLong()
|
||||||
|
}
|
||||||
|
|
||||||
|
fun removeOldFiles() {
|
||||||
|
val (lastSnapshot, _) = findLastSnapshot()
|
||||||
|
|
||||||
|
if (lastSnapshot != null) {
|
||||||
|
val files = directory.listFiles { _, name ->
|
||||||
|
name.startsWith("transaction-")
|
||||||
|
}?.filter {
|
||||||
|
getTrnx(it) < lastSnapshot
|
||||||
|
}
|
||||||
|
|
||||||
|
files?.forEach {
|
||||||
|
it.delete()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -27,11 +27,17 @@ class Index<T : Persistable>(
|
|||||||
index[key]?.remove(obj.id)
|
index[key]?.remove(obj.id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun remove(id: Long) {
|
||||||
|
for ((_, ids) in index) {
|
||||||
|
ids.remove(id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fun find(key: Any): List<T> {
|
fun find(key: Any): List<T> {
|
||||||
return index[key]?.mapNotNull { currentTransaction()?.find(cls.kotlin, it) } ?: emptyList()
|
return index[key]?.mapNotNull { currentTransaction()?.find(cls.kotlin, it) } ?: emptyList()
|
||||||
}
|
}
|
||||||
|
|
||||||
fun matches(obj: Persistable, value: Any): Boolean {
|
fun matches(obj: Persistable, value: Serializable): Boolean {
|
||||||
return value(obj) == value
|
return value(obj) == value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
8
src/main/kotlin/nl/astraeus/persistence/Logger.kt
Normal file
8
src/main/kotlin/nl/astraeus/persistence/Logger.kt
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
package nl.astraeus.nl.astraeus.persistence
|
||||||
|
|
||||||
|
object Logger {
|
||||||
|
var debug: (String) -> Unit = { println("DEBUG: $it") }
|
||||||
|
var info: (String) -> Unit = { println("INFO: $it") }
|
||||||
|
var warn: (String) -> Unit = { println("WARN: $it") }
|
||||||
|
var error: (String) -> Unit = { println("ERROR: $it") }
|
||||||
|
}
|
||||||
@@ -35,4 +35,8 @@ class Persistent(
|
|||||||
fun snapshot() {
|
fun snapshot() {
|
||||||
datastore.snapshot()
|
datastore.snapshot()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun removeOldFiles() {
|
||||||
|
datastore.removeOldFiles()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -70,10 +70,11 @@ class Transaction(
|
|||||||
fun <T : Persistable> findByIndex(
|
fun <T : Persistable> findByIndex(
|
||||||
kClass: KClass<T>,
|
kClass: KClass<T>,
|
||||||
indexName: String,
|
indexName: String,
|
||||||
search: Any
|
search: Serializable
|
||||||
): List<T> {
|
): List<T> {
|
||||||
val result = mutableListOf<T>()
|
val result = mutableListOf<T>()
|
||||||
val index = persistent.datastore.findIndex(kClass, indexName) ?: throw IllegalArgumentException("Index not found")
|
val index = persistent.datastore.findIndex(kClass, indexName)
|
||||||
|
?: throw IllegalArgumentException("Index with name $indexName not found for class ${kClass.simpleName}")
|
||||||
|
|
||||||
index.find(search).forEach { id ->
|
index.find(search).forEach { id ->
|
||||||
result.add(id as T)
|
result.add(id as T)
|
||||||
@@ -97,4 +98,36 @@ class Transaction(
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun <T : Persistable> searchIndex(
|
||||||
|
kClass: KClass<T>,
|
||||||
|
indexName: String,
|
||||||
|
search: (Serializable) -> Boolean,
|
||||||
|
): List<T> {
|
||||||
|
val result = mutableListOf<T>()
|
||||||
|
val index = persistent.datastore.findIndex(kClass, indexName) ?: throw IllegalArgumentException("Index not found")
|
||||||
|
|
||||||
|
index.index.keys.forEach { key ->
|
||||||
|
if (search(key)) {
|
||||||
|
index.find(key).forEach { id ->
|
||||||
|
result.add(id as T)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (action in actions) {
|
||||||
|
if (action.obj::class == kClass) {
|
||||||
|
val indexedValue = index.value(action.obj)
|
||||||
|
if (indexedValue != null && index.matches(action.obj, indexedValue)) {
|
||||||
|
if (action.type == ActionType.DELETE) {
|
||||||
|
result.remove(action.obj as T)
|
||||||
|
} else if (action.type == ActionType.STORE) {
|
||||||
|
result.remove(action.obj)
|
||||||
|
result.add(action.obj as T)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -136,17 +136,43 @@ class TestPersistence {
|
|||||||
store(
|
store(
|
||||||
Person(
|
Person(
|
||||||
id = 10L,
|
id = 10L,
|
||||||
name = "Pipo",
|
name = "John Pipo",
|
||||||
age = 23
|
age = 23
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
store(
|
store(
|
||||||
Person(
|
Person(
|
||||||
id = 11L,
|
id = 11L,
|
||||||
name = "Clown",
|
name = "John Clown",
|
||||||
age = 18
|
age = 18
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
searchIndex(Person::class, "name") {
|
||||||
|
name -> (name as? String)?.startsWith("John") == true
|
||||||
|
}.forEach { p ->
|
||||||
|
println("Found person where name starts with 'John': ${p.name} - ${p.age}")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pst.transaction {
|
||||||
|
store(
|
||||||
|
Person(
|
||||||
|
id = 15L,
|
||||||
|
name = "Mama",
|
||||||
|
age = 26
|
||||||
|
)
|
||||||
|
)
|
||||||
|
store(
|
||||||
|
Person(
|
||||||
|
id = 16L,
|
||||||
|
name = "Loe",
|
||||||
|
age = 16
|
||||||
|
)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pst.datastore.printStatus()
|
||||||
|
//pst.removeOldFiles()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user