Typed ClusterSharding must handle PersistentBehavior, #23773

* becuase we use the `PropsAdapter` in sharding and the `PersistentBehavior` can't be wrapped.
This commit is contained in:
Patrik Nordwall 2017-10-03 14:54:43 +02:00
parent 96ffced4dc
commit 2049d9630c
2 changed files with 110 additions and 4 deletions

View file

@ -0,0 +1,99 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed.cluster.sharding
import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy
import akka.typed.{ ActorRef, ActorSystem, Props, TypedSpec }
import akka.typed.cluster.Cluster
import akka.typed.internal.adapter.ActorSystemAdapter
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.adapter._
import akka.typed.testkit.TestKitSettings
import akka.typed.testkit.scaladsl.TestProbe
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.duration._
import akka.typed.Behavior
import akka.typed.persistence.scaladsl.PersistentActor
import akka.typed.persistence.scaladsl.PersistentActor.PersistNothing
object ClusterShardingPersistenceSpec {
val config = ConfigFactory.parseString(
"""
akka.actor.provider = cluster
akka.remote.artery.enabled = true
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.remote.artery.canonical.hostname = 127.0.0.1
akka.cluster.jmx.multi-mbeans-in-same-jvm = on
akka.coordinated-shutdown.terminate-actor-system = off
akka.actor {
serialize-messages = off
allow-java-serialization = off
}
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
""".stripMargin)
sealed trait Command
final case class Add(s: String) extends Command
final case class Get(replyTo: ActorRef[String]) extends Command
final case object StopPlz extends Command
import PersistentActor._
val persistentActor: Behavior[Command] =
PersistentActor.persistentEntity[Command, String, String](
persistenceIdFromActorName = name "Test-" + name,
initialState = "",
actions = Actions((ctx, cmd, state) cmd match {
case Add(s) Persist(s)
case Get(replyTo)
replyTo ! state
PersistNothing()
case StopPlz Stop()
}),
applyEvent = (evt, state) if (state.isEmpty) evt else state + "|" + evt)
val typeKey = EntityTypeKey[Command]("test")
}
class ClusterShardingPersistenceSpec extends TypedSpec(ClusterShardingPersistenceSpec.config) with ScalaFutures {
import akka.typed.scaladsl.adapter._
import ClusterShardingPersistenceSpec._
implicit val s = system
implicit val testkitSettings = TestKitSettings(system)
val sharding = ClusterSharding(system)
implicit val untypedSystem = system.toUntyped
private val untypedCluster = akka.cluster.Cluster(untypedSystem)
object `Typed cluster sharding with persistent actor` {
untypedCluster.join(untypedCluster.selfAddress)
def `01 start persistent actor`(): Unit = {
ClusterSharding(system).spawn(persistentActor, Props.empty, typeKey,
ClusterShardingSettings(system), maxNumberOfShards = 100, handOffStopMessage = StopPlz)
val p = TestProbe[String]()
val ref = ClusterSharding(system).entityRefFor(typeKey, "123")
ref ! Add("a")
ref ! Add("b")
ref ! Add("c")
ref ! Get(p.ref)
p.expectMsg("a|b|c")
}
}
}

View file

@ -13,6 +13,7 @@ import akka.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Pro
import scala.language.implicitConversions
import scala.reflect.ClassTag
import akka.typed.Behavior.UntypedBehavior
/**
* Default envelope type that may be used with Cluster Sharding.
@ -227,9 +228,15 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh
val ref =
if (settings.shouldHostShard(cluster)) {
system.log.info("Starting Shard Region [{}]...")
val untypedProps = behavior match {
case u: UntypedBehavior[_] u.untypedProps // PersistentBehavior
case _ PropsAdapter(behavior, entityProps)
}
untypedSharding.start(
typeKey.name,
PropsAdapter(behavior, entityProps),
untypedProps,
untypedSettings,
extractor, extractor,
defaultShardAllocationStrategy(settings),
@ -262,14 +269,14 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh
@InternalApi
private implicit def convertExtractEntityId[E, A](extractor: ShardingMessageExtractor[E, A]): UntypedShardRegion.ExtractEntityId = {
// TODO what if msg was null
case msg: E if extractor.entityId(msg.asInstanceOf[E]) ne null
case msg: E @unchecked if extractor.entityId(msg) ne null
// we're evaluating entityId twice, I wonder if we could do it just once (same was in old sharding's Java DSL)
(extractor.entityId(msg.asInstanceOf[E]), extractor.entityMessage(msg.asInstanceOf[E]))
(extractor.entityId(msg), extractor.entityMessage(msg))
}
@InternalApi
private implicit def convertExtractShardId[E, A](extractor: ShardingMessageExtractor[E, A]): UntypedShardRegion.ExtractShardId = {
case msg: E extractor.shardId(msg)
case msg: E @unchecked extractor.shardId(msg)
}
}