diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingPersistenceSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingPersistenceSpec.scala new file mode 100644 index 0000000000..258c758c0b --- /dev/null +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingPersistenceSpec.scala @@ -0,0 +1,99 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ + +package akka.typed.cluster.sharding + +import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy +import akka.typed.{ ActorRef, ActorSystem, Props, TypedSpec } +import akka.typed.cluster.Cluster +import akka.typed.internal.adapter.ActorSystemAdapter +import akka.typed.scaladsl.Actor +import akka.typed.scaladsl.adapter._ +import akka.typed.testkit.TestKitSettings +import akka.typed.testkit.scaladsl.TestProbe +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.duration._ +import akka.typed.Behavior +import akka.typed.persistence.scaladsl.PersistentActor +import akka.typed.persistence.scaladsl.PersistentActor.PersistNothing + +object ClusterShardingPersistenceSpec { + val config = ConfigFactory.parseString( + """ + akka.actor.provider = cluster + + akka.remote.artery.enabled = true + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + akka.remote.artery.canonical.hostname = 127.0.0.1 + + akka.cluster.jmx.multi-mbeans-in-same-jvm = on + + akka.coordinated-shutdown.terminate-actor-system = off + + akka.actor { + serialize-messages = off + allow-java-serialization = off + } + + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + """.stripMargin) + + sealed trait Command + final case class Add(s: String) extends Command + final case class Get(replyTo: ActorRef[String]) extends Command + final case object StopPlz extends Command + + import PersistentActor._ + + val persistentActor: Behavior[Command] = + PersistentActor.persistentEntity[Command, String, String]( + persistenceIdFromActorName = name ⇒ "Test-" + name, + initialState = "", + actions = Actions((ctx, cmd, state) ⇒ cmd match { + case Add(s) ⇒ Persist(s) + case Get(replyTo) ⇒ + replyTo ! state + PersistNothing() + case StopPlz ⇒ Stop() + }), + applyEvent = (evt, state) ⇒ if (state.isEmpty) evt else state + "|" + evt) + + val typeKey = EntityTypeKey[Command]("test") + +} + +class ClusterShardingPersistenceSpec extends TypedSpec(ClusterShardingPersistenceSpec.config) with ScalaFutures { + import akka.typed.scaladsl.adapter._ + import ClusterShardingPersistenceSpec._ + + implicit val s = system + implicit val testkitSettings = TestKitSettings(system) + val sharding = ClusterSharding(system) + + implicit val untypedSystem = system.toUntyped + private val untypedCluster = akka.cluster.Cluster(untypedSystem) + + object `Typed cluster sharding with persistent actor` { + + untypedCluster.join(untypedCluster.selfAddress) + + def `01 start persistent actor`(): Unit = { + ClusterSharding(system).spawn(persistentActor, Props.empty, typeKey, + ClusterShardingSettings(system), maxNumberOfShards = 100, handOffStopMessage = StopPlz) + + val p = TestProbe[String]() + + val ref = ClusterSharding(system).entityRefFor(typeKey, "123") + ref ! Add("a") + ref ! Add("b") + ref ! Add("c") + ref ! Get(p.ref) + p.expectMsg("a|b|c") + } + } + +} diff --git a/akka-typed/src/main/scala/akka/typed/cluster/sharding/ClusterSharding.scala b/akka-typed/src/main/scala/akka/typed/cluster/sharding/ClusterSharding.scala index 18693b64c9..03f059618a 100644 --- a/akka-typed/src/main/scala/akka/typed/cluster/sharding/ClusterSharding.scala +++ b/akka-typed/src/main/scala/akka/typed/cluster/sharding/ClusterSharding.scala @@ -13,6 +13,7 @@ import akka.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Pro import scala.language.implicitConversions import scala.reflect.ClassTag +import akka.typed.Behavior.UntypedBehavior /** * Default envelope type that may be used with Cluster Sharding. @@ -227,9 +228,15 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh val ref = if (settings.shouldHostShard(cluster)) { system.log.info("Starting Shard Region [{}]...") + + val untypedProps = behavior match { + case u: UntypedBehavior[_] ⇒ u.untypedProps // PersistentBehavior + case _ ⇒ PropsAdapter(behavior, entityProps) + } + untypedSharding.start( typeKey.name, - PropsAdapter(behavior, entityProps), + untypedProps, untypedSettings, extractor, extractor, defaultShardAllocationStrategy(settings), @@ -262,14 +269,14 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh @InternalApi private implicit def convertExtractEntityId[E, A](extractor: ShardingMessageExtractor[E, A]): UntypedShardRegion.ExtractEntityId = { // TODO what if msg was null - case msg: E if extractor.entityId(msg.asInstanceOf[E]) ne null ⇒ + case msg: E @unchecked if extractor.entityId(msg) ne null ⇒ // we're evaluating entityId twice, I wonder if we could do it just once (same was in old sharding's Java DSL) - (extractor.entityId(msg.asInstanceOf[E]), extractor.entityMessage(msg.asInstanceOf[E])) + (extractor.entityId(msg), extractor.entityMessage(msg)) } @InternalApi private implicit def convertExtractShardId[E, A](extractor: ShardingMessageExtractor[E, A]): UntypedShardRegion.ExtractShardId = { - case msg: E ⇒ extractor.shardId(msg) + case msg: E @unchecked ⇒ extractor.shardId(msg) } }