From bd576adfc65823a71c99bd34c20f911f0213781f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 6 Dec 2017 10:03:50 +0100 Subject: [PATCH] fix Typed Cluster Singleton with PersistentActor, #24112 (#24114) --- .../ClusterSingletonPersistenceSpec.scala | 91 +++++++++++++++++++ .../AdaptedClusterSingletonImpl.scala | 8 +- 2 files changed, 97 insertions(+), 2 deletions(-) create mode 100644 akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterSingletonPersistenceSpec.scala diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterSingletonPersistenceSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterSingletonPersistenceSpec.scala new file mode 100644 index 0000000000..7609a9e2d9 --- /dev/null +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterSingletonPersistenceSpec.scala @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ + +package akka.typed.cluster + +import akka.typed.ActorRef +import akka.typed.Behavior +import akka.typed.Props +import akka.typed.TypedSpec +import akka.typed.persistence.scaladsl.PersistentActor +import akka.typed.testkit.TestKitSettings +import akka.typed.testkit.scaladsl.TestProbe +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.ScalaFutures + +object ClusterSingletonPersistenceSpec { + val config = ConfigFactory.parseString( + """ + akka.actor.provider = cluster + + akka.remote.artery.enabled = true + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + akka.remote.artery.canonical.hostname = 127.0.0.1 + + akka.coordinated-shutdown.terminate-actor-system = off + + akka.actor { + serialize-messages = off + allow-java-serialization = off + } + + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + """.stripMargin) + + sealed trait Command + final case class Add(s: String) extends Command + final case class Get(replyTo: ActorRef[String]) extends Command + private final case object StopPlz extends Command + + import PersistentActor._ + + val persistentActor: Behavior[Command] = + PersistentActor.immutable[Command, String, String]( + persistenceId = "TheSingleton", + initialState = "", + commandHandler = CommandHandler((ctx, state, cmd) ⇒ cmd match { + case Add(s) ⇒ Effect.persist(s) + case Get(replyTo) ⇒ + replyTo ! state + Effect.none + case StopPlz ⇒ Effect.stop + }), + eventHandler = (state, evt) ⇒ if (state.isEmpty) evt else state + "|" + evt) + +} + +class ClusterSingletonPersistenceSpec extends TypedSpec(ClusterSingletonPersistenceSpec.config) with ScalaFutures { + import ClusterSingletonPersistenceSpec._ + import akka.typed.scaladsl.adapter._ + + implicit val s = system + implicit val testkitSettings = TestKitSettings(system) + + implicit val untypedSystem = system.toUntyped + private val untypedCluster = akka.cluster.Cluster(untypedSystem) + + object `Typed cluster singleton with persistent actor` { + + untypedCluster.join(untypedCluster.selfAddress) + + def `01 start persistent actor`(): Unit = { + val ref = ClusterSingleton(system).spawn( + behavior = persistentActor, + singletonName = "singleton", + props = Props.empty, + settings = ClusterSingletonSettings(system), + terminationMessage = StopPlz) + + val p = TestProbe[String]() + + ref ! Add("a") + ref ! Add("b") + ref ! Add("c") + ref ! Get(p.ref) + p.expectMsg("a|b|c") + } + } + +} diff --git a/akka-typed/src/main/scala/akka/typed/cluster/internal/AdaptedClusterSingletonImpl.scala b/akka-typed/src/main/scala/akka/typed/cluster/internal/AdaptedClusterSingletonImpl.scala index ef60ddb4d4..fcd99afc18 100644 --- a/akka-typed/src/main/scala/akka/typed/cluster/internal/AdaptedClusterSingletonImpl.scala +++ b/akka-typed/src/main/scala/akka/typed/cluster/internal/AdaptedClusterSingletonImpl.scala @@ -9,6 +9,7 @@ import java.util.function.{ Function ⇒ JFunction } import akka.actor.{ ExtendedActorSystem, InvalidActorNameException } import akka.annotation.InternalApi import akka.cluster.singleton.{ ClusterSingletonProxy, ClusterSingletonManager ⇒ OldSingletonManager } +import akka.typed.Behavior.UntypedBehavior import akka.typed.cluster.{ Cluster, ClusterSingleton, ClusterSingletonImpl, ClusterSingletonSettings } import akka.typed.internal.adapter.ActorSystemAdapter import akka.typed.scaladsl.adapter._ @@ -38,10 +39,13 @@ private[akka] final class AdaptedClusterSingletonImpl(system: ActorSystem[_]) ex if (settings.shouldRunManager(cluster)) { val managerName = managerNameFor(singletonName) // start singleton on this node - val adaptedProps = PropsAdapter(behavior, props) + val untypedProps = behavior match { + case u: UntypedBehavior[_] ⇒ u.untypedProps // PersistentBehavior + case _ ⇒ PropsAdapter(behavior, props) + } try { untypedSystem.systemActorOf( - OldSingletonManager.props(adaptedProps, terminationMessage, settings.toManagerSettings(singletonName)), + OldSingletonManager.props(untypedProps, terminationMessage, settings.toManagerSettings(singletonName)), managerName) } catch { case ex: InvalidActorNameException if ex.getMessage.endsWith("is not unique!") ⇒