move to testkit, and docs
This commit is contained in:
parent
414b28f518
commit
7e07ee17a6
7 changed files with 225 additions and 48 deletions
|
|
@ -188,4 +188,25 @@ For tests that involve more than one Cluster node you have to use another journa
|
|||
While it's possible to use the @ref:[Persistence Plugin Proxy](../persistence-plugins.md#persistence-plugin-proxy)
|
||||
it's often better and more realistic to use a real database.
|
||||
|
||||
See [akka-samples issue #128](https://github.com/akka/akka-samples/issues/128).
|
||||
See [akka-samples issue #128](https://github.com/akka/akka-samples/issues/128).
|
||||
|
||||
### Plugin initialization
|
||||
|
||||
Some Persistence plugins create tables automatically, but has the limitation that it can't be done concurrently
|
||||
from several ActorSystems. That can be a problem if the test creates a Cluster and all nodes tries to initialize
|
||||
the plugins at the same time. To coordinate initialization you can use the `PersistenceInit` utility.
|
||||
|
||||
`PersistenceInit` is part of `akka-persistence-testkit` and you need to add the dependency to your project:
|
||||
|
||||
@@dependency[sbt,Maven,Gradle] {
|
||||
group="com.typesafe.akka"
|
||||
artifact="akka-persistence-testkit_$scala.binary_version$"
|
||||
version="$akka.version$"
|
||||
}
|
||||
|
||||
Scala
|
||||
: @@snip [PersistenceInitSpec.scala](/akka-docs/src/test/scala/docs/persistence/testkit/PersistenceInitSpec.scala) { #imports #init }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceInitTest.java](/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java) { #imports #init }
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.persistence.testkit;
|
||||
|
||||
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
|
||||
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import jdocs.AbstractJavaTest;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
//#imports
|
||||
import akka.persistence.testkit.javadsl.PersistenceInit;
|
||||
import akka.Done;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
//#imports
|
||||
|
||||
public class PersistenceInitTest extends AbstractJavaTest {
|
||||
@ClassRule
|
||||
public static final TestKitJunitResource testKit =
|
||||
new TestKitJunitResource(
|
||||
ConfigFactory.parseString(
|
||||
"akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n"
|
||||
+ "akka.persistence.journal.inmem.test-serialization = on \n" +
|
||||
"akka.persistence.snapshot-store.plugin = \"akka.persistence.snapshot-store.local\" \n"
|
||||
+ "akka.persistence.snapshot-store.local.dir = \"target/snapshot-"
|
||||
+ UUID.randomUUID().toString()
|
||||
+ "\" \n"
|
||||
)
|
||||
.withFallback(ConfigFactory.defaultApplication()));
|
||||
|
||||
@Test
|
||||
public void testInit() throws Exception {
|
||||
//#init
|
||||
Duration timeout = Duration.ofSeconds(5);
|
||||
CompletionStage<Done> done = PersistenceInit.initializeDefaultPlugins(testKit.system(), timeout);
|
||||
done.toCompletableFuture().get(timeout.getSeconds(), TimeUnit.SECONDS);
|
||||
//#init
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.persistence.testkit
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import org.scalatest.wordspec.AnyWordSpecLike
|
||||
|
||||
//#imports
|
||||
import akka.persistence.testkit.scaladsl.PersistenceInit
|
||||
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
|
||||
//#imports
|
||||
|
||||
class PersistenceInitSpec
|
||||
extends ScalaTestWithActorTestKit(
|
||||
"""
|
||||
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
|
||||
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
|
||||
akka.persistence.snapshot-store.local.dir = "target/snapshot-${UUID.randomUUID().toString}"
|
||||
""")
|
||||
with AnyWordSpecLike {
|
||||
|
||||
"PersistenceInit" should {
|
||||
"initialize plugins" in {
|
||||
//#init
|
||||
val timeout = 5.seconds
|
||||
val done: Future[Done] = PersistenceInit.initializeDefaultPlugins(system, timeout)
|
||||
Await.result(done, timeout)
|
||||
//#init
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,21 +1,24 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
package akka.persistence
|
||||
|
||||
package akka.persistence.testkit.internal
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import akka.actor.ActorLogging
|
||||
import akka.actor.Props
|
||||
import akka.annotation.InternalApi
|
||||
import akka.persistence.PersistentActor
|
||||
import akka.persistence.RecoveryCompleted
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] object PersistenceInit {
|
||||
@InternalApi private[akka] object PersistenceInitImpl {
|
||||
|
||||
def props(journalPluginId: String, snapshotPluginId: String, persistenceId: String): Props = {
|
||||
Props(new PersistenceInit(journalPluginId, snapshotPluginId, persistenceId))
|
||||
Props(new PersistenceInitImpl(journalPluginId, snapshotPluginId, persistenceId))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -24,7 +27,7 @@ import akka.annotation.InternalApi
|
|||
* and send any message to it. It will reply to the `sender()` with the same message when
|
||||
* recovery has completed.
|
||||
*/
|
||||
@InternalApi private[akka] class PersistenceInit(
|
||||
@InternalApi private[akka] class PersistenceInitImpl(
|
||||
override val journalPluginId: String,
|
||||
override val snapshotPluginId: String,
|
||||
override val persistenceId: String)
|
||||
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.testkit.javadsl
|
||||
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.CompletionStage
|
||||
|
||||
import scala.compat.java8.FutureConverters._
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.ClassicActorSystemProvider
|
||||
import akka.persistence.testkit.scaladsl
|
||||
import akka.util.JavaDurationConverters._
|
||||
|
||||
/**
|
||||
* Test utility to initialize persistence plugins. Useful when initialization order or coordination
|
||||
* is needed. For example to avoid creating tables concurrently.
|
||||
*/
|
||||
object PersistenceInit {
|
||||
|
||||
/**
|
||||
* Initialize the default journal and snapshot plugins.
|
||||
*
|
||||
* @return a `CompletionStage` that is completed when the initialization has completed
|
||||
*/
|
||||
def initializeDefaultPlugins(system: ClassicActorSystemProvider, timeout: Duration): CompletionStage[Done] =
|
||||
initializePlugins(system, journalPluginId = "", snapshotPluginId = "", timeout)
|
||||
|
||||
/**
|
||||
* Initialize the given journal and snapshot plugins.
|
||||
*
|
||||
* The `snapshotPluginId` can be empty (`""`) if snapshot plugin isn't used.
|
||||
*
|
||||
* @return a `CompletionStage` that is completed when the initialization has completed
|
||||
*/
|
||||
def initializePlugins(
|
||||
system: ClassicActorSystemProvider,
|
||||
journalPluginId: String,
|
||||
snapshotPluginId: String,
|
||||
timeout: Duration): CompletionStage[Done] =
|
||||
scaladsl.PersistenceInit.initializePlugins(system, journalPluginId, snapshotPluginId, timeout.asScala).toJava
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.testkit.scaladsl
|
||||
|
||||
import java.util.UUID
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.ClassicActorSystemProvider
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.persistence.testkit.internal.PersistenceInitImpl
|
||||
import akka.util.Timeout
|
||||
|
||||
/**
|
||||
* Test utility to initialize persistence plugins. Useful when initialization order or coordination
|
||||
* is needed. For example to avoid creating tables concurrently.
|
||||
*/
|
||||
object PersistenceInit {
|
||||
|
||||
/**
|
||||
* Initialize the default journal and snapshot plugins.
|
||||
*
|
||||
* @return a `Future` that is completed when the initialization has completed
|
||||
*/
|
||||
def initializeDefaultPlugins(system: ClassicActorSystemProvider, timeout: FiniteDuration): Future[Done] =
|
||||
initializePlugins(system, journalPluginId = "", snapshotPluginId = "", timeout)
|
||||
|
||||
/**
|
||||
* Initialize the given journal and snapshot plugins.
|
||||
*
|
||||
* The `snapshotPluginId` can be empty (`""`) if snapshot plugin isn't used.
|
||||
*
|
||||
* @return a `Future` that is completed when the initialization has completed
|
||||
*/
|
||||
def initializePlugins(
|
||||
system: ClassicActorSystemProvider,
|
||||
journalPluginId: String,
|
||||
snapshotPluginId: String,
|
||||
timeout: FiniteDuration): Future[Done] = {
|
||||
val persistenceId: String = s"persistenceInit-${UUID.randomUUID()}"
|
||||
val extSystem = system.classicSystem.asInstanceOf[ExtendedActorSystem]
|
||||
val ref =
|
||||
extSystem.systemActorOf(
|
||||
PersistenceInitImpl.props(journalPluginId, snapshotPluginId, persistenceId),
|
||||
persistenceId)
|
||||
import akka.pattern.ask
|
||||
import extSystem.dispatcher
|
||||
implicit val askTimeout: Timeout = timeout
|
||||
(ref ? "start").map(_ => Done)
|
||||
}
|
||||
}
|
||||
|
|
@ -4,29 +4,23 @@
|
|||
|
||||
package akka.persistence
|
||||
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import java.util.function.Consumer
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import akka.Done
|
||||
import akka.actor._
|
||||
import akka.annotation.InternalApi
|
||||
import akka.annotation.InternalStableApi
|
||||
import akka.event.Logging
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.persistence.journal.EventAdapters
|
||||
import akka.persistence.journal.IdentityEventAdapters
|
||||
import akka.event.{ Logging, LoggingAdapter }
|
||||
import akka.persistence.journal.{ EventAdapters, IdentityEventAdapters }
|
||||
import akka.util.Collections.EmptyImmutableSeq
|
||||
import akka.util.Helpers.ConfigOps
|
||||
import com.typesafe.config.{ Config, ConfigFactory }
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.util.Reflect
|
||||
import akka.util.Timeout
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import akka.annotation.InternalApi
|
||||
import akka.annotation.InternalStableApi
|
||||
|
||||
/**
|
||||
* Persistence configuration.
|
||||
|
|
@ -287,33 +281,6 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
|||
private def verifySnapshotPluginConfigExists(pluginConfig: Config, configPath: String): Unit =
|
||||
verifyPluginConfigExists(pluginConfig.withFallback(system.settings.config), configPath, "Snapshot store")
|
||||
|
||||
/**
|
||||
* Scala API: Initialize the given journal and snapshot plugins.
|
||||
*
|
||||
* The `snapshotPluginId` can be empty (`""`) if snapshot plugin isn't used.
|
||||
* @return a `Future` that is completed when the initialization has completed
|
||||
*/
|
||||
def initializePlugin(journalPluginId: String, snapshotPluginId: String, timeout: FiniteDuration): Future[Done] = {
|
||||
val persistenceId: String = s"persistenceInit-${UUID.randomUUID()}"
|
||||
val ref =
|
||||
system.systemActorOf(PersistenceInit.props(journalPluginId, snapshotPluginId, persistenceId), persistenceId)
|
||||
import akka.pattern.ask
|
||||
import system.dispatcher
|
||||
implicit val askTimeout: Timeout = timeout
|
||||
(ref ? "start").map(_ => Done)
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API: Initialize the given journal and snapshot plugins.
|
||||
*
|
||||
* The `snapshotPluginId` can be empty (`""`) if snapshot plugin isn't used.
|
||||
* @return a `Future` that is completed when the initialization has completed
|
||||
*/
|
||||
def initializePlugin(journalPluginId: String, snapshotPluginId: String, timeout: java.time.Duration): Future[Done] = {
|
||||
import akka.util.JavaDurationConverters._
|
||||
initializePlugin(journalPluginId, snapshotPluginId, timeout.asScala)
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an [[akka.persistence.journal.EventAdapters]] object which serves as a per-journal collection of bound event adapters.
|
||||
* If no adapters are registered for a given journal the EventAdapters object will simply return the identity
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue