Merge pull request #28809 from akka/wip-28808-persistence-init-patriknw
Persistence initialization utility #28808
This commit is contained in:
commit
4039a37e41
6 changed files with 264 additions and 1 deletions
|
|
@ -189,3 +189,24 @@ While it's possible to use the @ref:[Persistence Plugin Proxy](../persistence-pl
|
||||||
it's often better and more realistic to use a real database.
|
it's often better and more realistic to use a real database.
|
||||||
|
|
||||||
See [akka-samples issue #128](https://github.com/akka/akka-samples/issues/128).
|
See [akka-samples issue #128](https://github.com/akka/akka-samples/issues/128).
|
||||||
|
|
||||||
|
### Plugin initialization
|
||||||
|
|
||||||
|
Some Persistence plugins create tables automatically, but has the limitation that it can't be done concurrently
|
||||||
|
from several ActorSystems. That can be a problem if the test creates a Cluster and all nodes tries to initialize
|
||||||
|
the plugins at the same time. To coordinate initialization you can use the `PersistenceInit` utility.
|
||||||
|
|
||||||
|
`PersistenceInit` is part of `akka-persistence-testkit` and you need to add the dependency to your project:
|
||||||
|
|
||||||
|
@@dependency[sbt,Maven,Gradle] {
|
||||||
|
group="com.typesafe.akka"
|
||||||
|
artifact="akka-persistence-testkit_$scala.binary_version$"
|
||||||
|
version="$akka.version$"
|
||||||
|
}
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [PersistenceInitSpec.scala](/akka-docs/src/test/scala/docs/persistence/testkit/PersistenceInitSpec.scala) { #imports #init }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [PersistenceInitTest.java](/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java) { #imports #init }
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,48 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package jdocs.persistence.testkit;
|
||||||
|
|
||||||
|
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
|
||||||
|
|
||||||
|
import com.typesafe.config.ConfigFactory;
|
||||||
|
import jdocs.AbstractJavaTest;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
// #imports
|
||||||
|
import akka.persistence.testkit.javadsl.PersistenceInit;
|
||||||
|
import akka.Done;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.concurrent.CompletionStage;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
// #imports
|
||||||
|
|
||||||
|
public class PersistenceInitTest extends AbstractJavaTest {
|
||||||
|
@ClassRule
|
||||||
|
public static final TestKitJunitResource testKit =
|
||||||
|
new TestKitJunitResource(
|
||||||
|
ConfigFactory.parseString(
|
||||||
|
"akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n"
|
||||||
|
+ "akka.persistence.journal.inmem.test-serialization = on \n"
|
||||||
|
+ "akka.persistence.snapshot-store.plugin = \"akka.persistence.snapshot-store.local\" \n"
|
||||||
|
+ "akka.persistence.snapshot-store.local.dir = \"target/snapshot-"
|
||||||
|
+ UUID.randomUUID().toString()
|
||||||
|
+ "\" \n")
|
||||||
|
.withFallback(ConfigFactory.defaultApplication()));
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInit() throws Exception {
|
||||||
|
// #init
|
||||||
|
Duration timeout = Duration.ofSeconds(5);
|
||||||
|
CompletionStage<Done> done =
|
||||||
|
PersistenceInit.initializeDefaultPlugins(testKit.system(), timeout);
|
||||||
|
done.toCompletableFuture().get(timeout.getSeconds(), TimeUnit.SECONDS);
|
||||||
|
// #init
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,38 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.persistence.testkit
|
||||||
|
|
||||||
|
import akka.Done
|
||||||
|
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||||
|
import org.scalatest.wordspec.AnyWordSpecLike
|
||||||
|
|
||||||
|
//#imports
|
||||||
|
import akka.persistence.testkit.scaladsl.PersistenceInit
|
||||||
|
|
||||||
|
import scala.concurrent.Await
|
||||||
|
import scala.concurrent.Future
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
//#imports
|
||||||
|
|
||||||
|
class PersistenceInitSpec
|
||||||
|
extends ScalaTestWithActorTestKit(
|
||||||
|
"""
|
||||||
|
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
|
||||||
|
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
|
||||||
|
akka.persistence.snapshot-store.local.dir = "target/snapshot-${UUID.randomUUID().toString}"
|
||||||
|
""")
|
||||||
|
with AnyWordSpecLike {
|
||||||
|
|
||||||
|
"PersistenceInit" should {
|
||||||
|
"initialize plugins" in {
|
||||||
|
//#init
|
||||||
|
val timeout = 5.seconds
|
||||||
|
val done: Future[Done] = PersistenceInit.initializeDefaultPlugins(system, timeout)
|
||||||
|
Await.result(done, timeout)
|
||||||
|
//#init
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,56 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.persistence.testkit.internal
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
|
import akka.actor.ActorLogging
|
||||||
|
import akka.actor.Props
|
||||||
|
import akka.annotation.InternalApi
|
||||||
|
import akka.persistence.PersistentActor
|
||||||
|
import akka.persistence.RecoveryCompleted
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
@InternalApi private[akka] object PersistenceInitImpl {
|
||||||
|
|
||||||
|
def props(journalPluginId: String, snapshotPluginId: String, persistenceId: String): Props = {
|
||||||
|
Props(new PersistenceInitImpl(journalPluginId, snapshotPluginId, persistenceId))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API: Initialize a journal and snapshot plugin by starting this `PersistentActor`
|
||||||
|
* and send any message to it. It will reply to the `sender()` with the same message when
|
||||||
|
* recovery has completed.
|
||||||
|
*/
|
||||||
|
@InternalApi private[akka] class PersistenceInitImpl(
|
||||||
|
override val journalPluginId: String,
|
||||||
|
override val snapshotPluginId: String,
|
||||||
|
override val persistenceId: String)
|
||||||
|
extends PersistentActor
|
||||||
|
with ActorLogging {
|
||||||
|
|
||||||
|
private val startTime = System.nanoTime()
|
||||||
|
|
||||||
|
def receiveRecover: Receive = {
|
||||||
|
case RecoveryCompleted =>
|
||||||
|
log.debug(
|
||||||
|
"Initialization completed for journal [{}] and snapshot [{}] plugins, with persistenceId [{}], in [{} ms]",
|
||||||
|
journalPluginId,
|
||||||
|
snapshotPluginId,
|
||||||
|
persistenceId,
|
||||||
|
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime))
|
||||||
|
case _ =>
|
||||||
|
}
|
||||||
|
|
||||||
|
def receiveCommand: Receive = {
|
||||||
|
case msg =>
|
||||||
|
// recovery has completed
|
||||||
|
sender() ! msg
|
||||||
|
context.stop(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,45 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.persistence.testkit.javadsl
|
||||||
|
|
||||||
|
import java.time.Duration
|
||||||
|
import java.util.concurrent.CompletionStage
|
||||||
|
|
||||||
|
import scala.compat.java8.FutureConverters._
|
||||||
|
|
||||||
|
import akka.Done
|
||||||
|
import akka.actor.ClassicActorSystemProvider
|
||||||
|
import akka.persistence.testkit.scaladsl
|
||||||
|
import akka.util.JavaDurationConverters._
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test utility to initialize persistence plugins. Useful when initialization order or coordination
|
||||||
|
* is needed. For example to avoid creating tables concurrently.
|
||||||
|
*/
|
||||||
|
object PersistenceInit {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the default journal and snapshot plugins.
|
||||||
|
*
|
||||||
|
* @return a `CompletionStage` that is completed when the initialization has completed
|
||||||
|
*/
|
||||||
|
def initializeDefaultPlugins(system: ClassicActorSystemProvider, timeout: Duration): CompletionStage[Done] =
|
||||||
|
initializePlugins(system, journalPluginId = "", snapshotPluginId = "", timeout)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the given journal and snapshot plugins.
|
||||||
|
*
|
||||||
|
* The `snapshotPluginId` can be empty (`""`) if snapshot plugin isn't used.
|
||||||
|
*
|
||||||
|
* @return a `CompletionStage` that is completed when the initialization has completed
|
||||||
|
*/
|
||||||
|
def initializePlugins(
|
||||||
|
system: ClassicActorSystemProvider,
|
||||||
|
journalPluginId: String,
|
||||||
|
snapshotPluginId: String,
|
||||||
|
timeout: Duration): CompletionStage[Done] =
|
||||||
|
scaladsl.PersistenceInit.initializePlugins(system, journalPluginId, snapshotPluginId, timeout.asScala).toJava
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,55 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.persistence.testkit.scaladsl
|
||||||
|
|
||||||
|
import java.util.UUID
|
||||||
|
|
||||||
|
import scala.concurrent.Future
|
||||||
|
import scala.concurrent.duration.FiniteDuration
|
||||||
|
|
||||||
|
import akka.Done
|
||||||
|
import akka.actor.ClassicActorSystemProvider
|
||||||
|
import akka.actor.ExtendedActorSystem
|
||||||
|
import akka.persistence.testkit.internal.PersistenceInitImpl
|
||||||
|
import akka.util.Timeout
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test utility to initialize persistence plugins. Useful when initialization order or coordination
|
||||||
|
* is needed. For example to avoid creating tables concurrently.
|
||||||
|
*/
|
||||||
|
object PersistenceInit {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the default journal and snapshot plugins.
|
||||||
|
*
|
||||||
|
* @return a `Future` that is completed when the initialization has completed
|
||||||
|
*/
|
||||||
|
def initializeDefaultPlugins(system: ClassicActorSystemProvider, timeout: FiniteDuration): Future[Done] =
|
||||||
|
initializePlugins(system, journalPluginId = "", snapshotPluginId = "", timeout)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the given journal and snapshot plugins.
|
||||||
|
*
|
||||||
|
* The `snapshotPluginId` can be empty (`""`) if snapshot plugin isn't used.
|
||||||
|
*
|
||||||
|
* @return a `Future` that is completed when the initialization has completed
|
||||||
|
*/
|
||||||
|
def initializePlugins(
|
||||||
|
system: ClassicActorSystemProvider,
|
||||||
|
journalPluginId: String,
|
||||||
|
snapshotPluginId: String,
|
||||||
|
timeout: FiniteDuration): Future[Done] = {
|
||||||
|
val persistenceId: String = s"persistenceInit-${UUID.randomUUID()}"
|
||||||
|
val extSystem = system.classicSystem.asInstanceOf[ExtendedActorSystem]
|
||||||
|
val ref =
|
||||||
|
extSystem.systemActorOf(
|
||||||
|
PersistenceInitImpl.props(journalPluginId, snapshotPluginId, persistenceId),
|
||||||
|
persistenceId)
|
||||||
|
import akka.pattern.ask
|
||||||
|
import extSystem.dispatcher
|
||||||
|
implicit val askTimeout: Timeout = timeout
|
||||||
|
(ref ? "start").map(_ => Done)
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue