parent
8417e70460
commit
bd576adfc6
2 changed files with 97 additions and 2 deletions
|
|
@ -0,0 +1,91 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.typed.cluster
|
||||||
|
|
||||||
|
import akka.typed.ActorRef
|
||||||
|
import akka.typed.Behavior
|
||||||
|
import akka.typed.Props
|
||||||
|
import akka.typed.TypedSpec
|
||||||
|
import akka.typed.persistence.scaladsl.PersistentActor
|
||||||
|
import akka.typed.testkit.TestKitSettings
|
||||||
|
import akka.typed.testkit.scaladsl.TestProbe
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
import org.scalatest.concurrent.ScalaFutures
|
||||||
|
|
||||||
|
object ClusterSingletonPersistenceSpec {
|
||||||
|
val config = ConfigFactory.parseString(
|
||||||
|
"""
|
||||||
|
akka.actor.provider = cluster
|
||||||
|
|
||||||
|
akka.remote.artery.enabled = true
|
||||||
|
akka.remote.netty.tcp.port = 0
|
||||||
|
akka.remote.artery.canonical.port = 0
|
||||||
|
akka.remote.artery.canonical.hostname = 127.0.0.1
|
||||||
|
|
||||||
|
akka.coordinated-shutdown.terminate-actor-system = off
|
||||||
|
|
||||||
|
akka.actor {
|
||||||
|
serialize-messages = off
|
||||||
|
allow-java-serialization = off
|
||||||
|
}
|
||||||
|
|
||||||
|
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
|
||||||
|
""".stripMargin)
|
||||||
|
|
||||||
|
sealed trait Command
|
||||||
|
final case class Add(s: String) extends Command
|
||||||
|
final case class Get(replyTo: ActorRef[String]) extends Command
|
||||||
|
private final case object StopPlz extends Command
|
||||||
|
|
||||||
|
import PersistentActor._
|
||||||
|
|
||||||
|
val persistentActor: Behavior[Command] =
|
||||||
|
PersistentActor.immutable[Command, String, String](
|
||||||
|
persistenceId = "TheSingleton",
|
||||||
|
initialState = "",
|
||||||
|
commandHandler = CommandHandler((ctx, state, cmd) ⇒ cmd match {
|
||||||
|
case Add(s) ⇒ Effect.persist(s)
|
||||||
|
case Get(replyTo) ⇒
|
||||||
|
replyTo ! state
|
||||||
|
Effect.none
|
||||||
|
case StopPlz ⇒ Effect.stop
|
||||||
|
}),
|
||||||
|
eventHandler = (state, evt) ⇒ if (state.isEmpty) evt else state + "|" + evt)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
class ClusterSingletonPersistenceSpec extends TypedSpec(ClusterSingletonPersistenceSpec.config) with ScalaFutures {
|
||||||
|
import ClusterSingletonPersistenceSpec._
|
||||||
|
import akka.typed.scaladsl.adapter._
|
||||||
|
|
||||||
|
implicit val s = system
|
||||||
|
implicit val testkitSettings = TestKitSettings(system)
|
||||||
|
|
||||||
|
implicit val untypedSystem = system.toUntyped
|
||||||
|
private val untypedCluster = akka.cluster.Cluster(untypedSystem)
|
||||||
|
|
||||||
|
object `Typed cluster singleton with persistent actor` {
|
||||||
|
|
||||||
|
untypedCluster.join(untypedCluster.selfAddress)
|
||||||
|
|
||||||
|
def `01 start persistent actor`(): Unit = {
|
||||||
|
val ref = ClusterSingleton(system).spawn(
|
||||||
|
behavior = persistentActor,
|
||||||
|
singletonName = "singleton",
|
||||||
|
props = Props.empty,
|
||||||
|
settings = ClusterSingletonSettings(system),
|
||||||
|
terminationMessage = StopPlz)
|
||||||
|
|
||||||
|
val p = TestProbe[String]()
|
||||||
|
|
||||||
|
ref ! Add("a")
|
||||||
|
ref ! Add("b")
|
||||||
|
ref ! Add("c")
|
||||||
|
ref ! Get(p.ref)
|
||||||
|
p.expectMsg("a|b|c")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -9,6 +9,7 @@ import java.util.function.{ Function ⇒ JFunction }
|
||||||
import akka.actor.{ ExtendedActorSystem, InvalidActorNameException }
|
import akka.actor.{ ExtendedActorSystem, InvalidActorNameException }
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import akka.cluster.singleton.{ ClusterSingletonProxy, ClusterSingletonManager ⇒ OldSingletonManager }
|
import akka.cluster.singleton.{ ClusterSingletonProxy, ClusterSingletonManager ⇒ OldSingletonManager }
|
||||||
|
import akka.typed.Behavior.UntypedBehavior
|
||||||
import akka.typed.cluster.{ Cluster, ClusterSingleton, ClusterSingletonImpl, ClusterSingletonSettings }
|
import akka.typed.cluster.{ Cluster, ClusterSingleton, ClusterSingletonImpl, ClusterSingletonSettings }
|
||||||
import akka.typed.internal.adapter.ActorSystemAdapter
|
import akka.typed.internal.adapter.ActorSystemAdapter
|
||||||
import akka.typed.scaladsl.adapter._
|
import akka.typed.scaladsl.adapter._
|
||||||
|
|
@ -38,10 +39,13 @@ private[akka] final class AdaptedClusterSingletonImpl(system: ActorSystem[_]) ex
|
||||||
if (settings.shouldRunManager(cluster)) {
|
if (settings.shouldRunManager(cluster)) {
|
||||||
val managerName = managerNameFor(singletonName)
|
val managerName = managerNameFor(singletonName)
|
||||||
// start singleton on this node
|
// start singleton on this node
|
||||||
val adaptedProps = PropsAdapter(behavior, props)
|
val untypedProps = behavior match {
|
||||||
|
case u: UntypedBehavior[_] ⇒ u.untypedProps // PersistentBehavior
|
||||||
|
case _ ⇒ PropsAdapter(behavior, props)
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
untypedSystem.systemActorOf(
|
untypedSystem.systemActorOf(
|
||||||
OldSingletonManager.props(adaptedProps, terminationMessage, settings.toManagerSettings(singletonName)),
|
OldSingletonManager.props(untypedProps, terminationMessage, settings.toManagerSettings(singletonName)),
|
||||||
managerName)
|
managerName)
|
||||||
} catch {
|
} catch {
|
||||||
case ex: InvalidActorNameException if ex.getMessage.endsWith("is not unique!") ⇒
|
case ex: InvalidActorNameException if ex.getMessage.endsWith("is not unique!") ⇒
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue