diff --git a/akka-cluster-sharding/src/main/mima-filters/2.5.17.backwards.excludes b/akka-cluster-sharding/src/main/mima-filters/2.5.17.backwards.excludes new file mode 100644 index 0000000000..287ddb8b03 --- /dev/null +++ b/akka-cluster-sharding/src/main/mima-filters/2.5.17.backwards.excludes @@ -0,0 +1,2 @@ +# Rename internal method #25840 +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard.getEntity") diff --git a/akka-cluster-sharding/src/main/mima-filters/2.5.18.backwards.excludes b/akka-cluster-sharding/src/main/mima-filters/2.5.18.backwards.excludes index 460960f77e..6fb5bcbd3c 100644 --- a/akka-cluster-sharding/src/main/mima-filters/2.5.18.backwards.excludes +++ b/akka-cluster-sharding/src/main/mima-filters/2.5.18.backwards.excludes @@ -1,3 +1,7 @@ # #23751 warn if handOffStopMessage not handled ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion#HandOffStopper.this") -ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.handOffStopperProps") \ No newline at end of file +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.handOffStopperProps") + +# ##25809 Save EntityStarted when StartEntity requested via remembered entities +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard.getEntity") + diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 65730e4e0a..fa6a7325bd 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -13,10 +13,9 @@ import akka.actor.Deploy import akka.actor.Props import akka.actor.Terminated import akka.actor.Actor -import akka.util.MessageBufferMap +import akka.util.{ ConstantFun, MessageBufferMap } import scala.concurrent.Future -import scala.concurrent.duration._ import akka.cluster.Cluster import akka.cluster.ddata.ORSet import akka.cluster.ddata.ORSetKey @@ -184,17 +183,17 @@ private[akka] class Shard( } def receiveShardCommand(msg: ShardCommand): Unit = msg match { - case RestartEntity(id) ⇒ getEntity(id) + case RestartEntity(id) ⇒ getOrCreateEntity(id) case RestartEntities(ids) ⇒ restartEntities(ids) } def receiveStartEntity(start: ShardRegion.StartEntity): Unit = { - log.debug("Got a request from [{}] to start entity [{}] in shard [{}]", sender(), start.entityId, shardId) + val requester = sender() + log.debug("Got a request from [{}] to start entity [{}] in shard [{}]", requester, start.entityId, shardId) if (passivateIdleTask.isDefined) { lastMessageTimestamp = lastMessageTimestamp.updated(start.entityId, System.nanoTime()) } - getEntity(start.entityId) - sender() ! ShardRegion.StartEntityAck(start.entityId, shardId) + getOrCreateEntity(start.entityId, _ ⇒ processChange(EntityStarted(start.entityId))(_ ⇒ requester ! ShardRegion.StartEntityAck(start.entityId, shardId))) } def receiveStartEntityAck(ack: ShardRegion.StartEntityAck): Unit = { @@ -310,8 +309,7 @@ private[akka] class Shard( if (messages.nonEmpty) { log.debug("Sending message buffer for entity [{}] ([{}] messages)", event.entityId, messages.size) - getEntity(event.entityId) - + getOrCreateEntity(event.entityId) //Now there is no deliveryBuffer we can try to redeliver // and as the child exists, the message will be directly forwarded messages.foreach { @@ -350,26 +348,24 @@ private[akka] class Shard( if (passivateIdleTask.isDefined) { lastMessageTimestamp = lastMessageTimestamp.updated(id, System.nanoTime()) } - val name = URLEncoder.encode(id, "utf-8") - context.child(name) match { - case Some(actor) ⇒ actor.tell(payload, snd) - case None ⇒ getEntity(id).tell(payload, snd) - } + getOrCreateEntity(id).tell(payload, snd) } - def getEntity(id: EntityId): ActorRef = { + def getOrCreateEntity(id: EntityId, onCreate: ActorRef ⇒ Unit = ConstantFun.scalaAnyToUnit): ActorRef = { val name = URLEncoder.encode(id, "utf-8") - context.child(name).getOrElse { - log.debug("Starting entity [{}] in shard [{}]", id, shardId) - - val a = context.watch(context.actorOf(entityProps(id), name)) - idByRef = idByRef.updated(a, id) - refById = refById.updated(id, a) - if (passivateIdleTask.isDefined) { - lastMessageTimestamp += (id -> System.nanoTime()) - } - state = state.copy(state.entities + id) - a + context.child(name) match { + case Some(child) ⇒ child + case None ⇒ + log.debug("Starting entity [{}] in shard [{}]", id, shardId) + val a = context.watch(context.actorOf(entityProps(id), name)) + idByRef = idByRef.updated(a, id) + refById = refById.updated(id, a) + if (passivateIdleTask.isDefined) { + lastMessageTimestamp += (id -> System.nanoTime()) + } + state = state.copy(state.entities + id) + onCreate(a) + a } } @@ -415,10 +411,12 @@ private[akka] class RememberEntityStarter( } def sendStart(ids: Set[ShardRegion.EntityId]): Unit = { + // these go through the region rather the directly to the shard + // so that shard mapping changes are picked up ids.foreach(id ⇒ region ! ShardRegion.StartEntity(id)) } - override def receive = { + override def receive: Receive = { case ack: ShardRegion.StartEntityAck ⇒ waitingForAck -= ack.entityId // inform whoever requested the start that it happened @@ -438,7 +436,9 @@ private[akka] class RememberEntityStarter( /** * INTERNAL API: Common things for PersistentShard and DDataShard */ -private[akka] trait RememberingShard { selfType: Shard ⇒ +private[akka] trait RememberingShard { + selfType: Shard ⇒ + import ShardRegion.{ EntityId, Msg } import Shard._ import akka.pattern.pipe @@ -495,7 +495,7 @@ private[akka] trait RememberingShard { selfType: Shard ⇒ case None ⇒ if (state.entities.contains(id)) { require(!messageBuffers.contains(id), s"Message buffers contains id [$id].") - getEntity(id).tell(payload, snd) + getOrCreateEntity(id).tell(payload, snd) } else { //Note; we only do this if remembering, otherwise the buffer is an overhead messageBuffers.append(id, msg, snd) @@ -503,7 +503,6 @@ private[akka] trait RememberingShard { selfType: Shard ⇒ } } } - } /** @@ -765,6 +764,7 @@ object EntityRecoveryStrategy { } trait EntityRecoveryStrategy { + import ShardRegion.EntityId import scala.concurrent.Future @@ -772,12 +772,15 @@ trait EntityRecoveryStrategy { } final class AllAtOnceEntityRecoveryStrategy extends EntityRecoveryStrategy { + import ShardRegion.EntityId + override def recoverEntities(entities: Set[EntityId]): Set[Future[Set[EntityId]]] = if (entities.isEmpty) Set.empty else Set(Future.successful(entities)) } final class ConstantRateEntityRecoveryStrategy(actorSystem: ActorSystem, frequency: FiniteDuration, numberOfEntities: Int) extends EntityRecoveryStrategy { + import ShardRegion.EntityId import actorSystem.dispatcher import akka.pattern.after diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardSpec.scala new file mode 100644 index 0000000000..001707777d --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardSpec.scala @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.cluster.sharding + +import akka.actor.{ Actor, PoisonPill, Props } +import akka.cluster.sharding.PersistentShardSpec.EntityActor +import akka.cluster.sharding.Shard.{ GetShardStats, ShardStats } +import akka.cluster.sharding.ShardRegion.{ StartEntity, StartEntityAck } +import akka.testkit.{ AkkaSpec, ImplicitSender } +import com.typesafe.config.ConfigFactory +import org.scalatest.WordSpecLike + +object PersistentShardSpec { + class EntityActor(id: String) extends Actor { + override def receive: Receive = { + case _ ⇒ + } + } + + val config = ConfigFactory.parseString( + """ + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + """.stripMargin) +} + +class PersistentShardSpec extends AkkaSpec(PersistentShardSpec.config) with WordSpecLike with ImplicitSender { + + "Persistent Shard" must { + + "remember entities started with StartEntity" in { + val props = Props(new PersistentShard( + "cats", + "shard-1", + id ⇒ Props(new EntityActor(id)), + ClusterShardingSettings(system), + { + case _ ⇒ ("entity-1", "msg") + }, + _ ⇒ "shard-1", + PoisonPill + )) + val persistentShard = system.actorOf(props) + watch(persistentShard) + + persistentShard ! StartEntity("entity-1") + expectMsg(StartEntityAck("entity-1", "shard-1")) + + persistentShard ! PoisonPill + expectTerminated(persistentShard) + + system.log.info("Starting shard again") + val secondIncarnation = system.actorOf(props) + + secondIncarnation ! GetShardStats + awaitAssert(expectMsg(ShardStats("shard-1", 1))) + } + } + +}