Save EntityStarted when StartEntity requested via remembered entities (for validation) (#26061)

* Save EntityStarted when StartEntity requested via remembered entities
This commit is contained in:
Patrik Nordwall 2018-12-06 14:49:47 +01:00 committed by Christopher Batey
parent e5c1fc02a9
commit e1a0a1fa3f
4 changed files with 100 additions and 30 deletions

View file

@ -0,0 +1,2 @@
# Rename internal method #25840
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard.getEntity")

View file

@ -1,3 +1,7 @@
# #23751 warn if handOffStopMessage not handled
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion#HandOffStopper.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.handOffStopperProps")
# ##25809 Save EntityStarted when StartEntity requested via remembered entities
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard.getEntity")

View file

@ -13,10 +13,9 @@ import akka.actor.Deploy
import akka.actor.Props
import akka.actor.Terminated
import akka.actor.Actor
import akka.util.MessageBufferMap
import akka.util.{ ConstantFun, MessageBufferMap }
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.cluster.Cluster
import akka.cluster.ddata.ORSet
import akka.cluster.ddata.ORSetKey
@ -184,17 +183,17 @@ private[akka] class Shard(
}
def receiveShardCommand(msg: ShardCommand): Unit = msg match {
case RestartEntity(id) getEntity(id)
case RestartEntity(id) getOrCreateEntity(id)
case RestartEntities(ids) restartEntities(ids)
}
def receiveStartEntity(start: ShardRegion.StartEntity): Unit = {
log.debug("Got a request from [{}] to start entity [{}] in shard [{}]", sender(), start.entityId, shardId)
val requester = sender()
log.debug("Got a request from [{}] to start entity [{}] in shard [{}]", requester, start.entityId, shardId)
if (passivateIdleTask.isDefined) {
lastMessageTimestamp = lastMessageTimestamp.updated(start.entityId, System.nanoTime())
}
getEntity(start.entityId)
sender() ! ShardRegion.StartEntityAck(start.entityId, shardId)
getOrCreateEntity(start.entityId, _ processChange(EntityStarted(start.entityId))(_ requester ! ShardRegion.StartEntityAck(start.entityId, shardId)))
}
def receiveStartEntityAck(ack: ShardRegion.StartEntityAck): Unit = {
@ -310,8 +309,7 @@ private[akka] class Shard(
if (messages.nonEmpty) {
log.debug("Sending message buffer for entity [{}] ([{}] messages)", event.entityId, messages.size)
getEntity(event.entityId)
getOrCreateEntity(event.entityId)
//Now there is no deliveryBuffer we can try to redeliver
// and as the child exists, the message will be directly forwarded
messages.foreach {
@ -350,18 +348,15 @@ private[akka] class Shard(
if (passivateIdleTask.isDefined) {
lastMessageTimestamp = lastMessageTimestamp.updated(id, System.nanoTime())
}
getOrCreateEntity(id).tell(payload, snd)
}
def getOrCreateEntity(id: EntityId, onCreate: ActorRef Unit = ConstantFun.scalaAnyToUnit): ActorRef = {
val name = URLEncoder.encode(id, "utf-8")
context.child(name) match {
case Some(actor) actor.tell(payload, snd)
case None getEntity(id).tell(payload, snd)
}
}
def getEntity(id: EntityId): ActorRef = {
val name = URLEncoder.encode(id, "utf-8")
context.child(name).getOrElse {
case Some(child) child
case None
log.debug("Starting entity [{}] in shard [{}]", id, shardId)
val a = context.watch(context.actorOf(entityProps(id), name))
idByRef = idByRef.updated(a, id)
refById = refById.updated(id, a)
@ -369,6 +364,7 @@ private[akka] class Shard(
lastMessageTimestamp += (id -> System.nanoTime())
}
state = state.copy(state.entities + id)
onCreate(a)
a
}
}
@ -415,10 +411,12 @@ private[akka] class RememberEntityStarter(
}
def sendStart(ids: Set[ShardRegion.EntityId]): Unit = {
// these go through the region rather the directly to the shard
// so that shard mapping changes are picked up
ids.foreach(id region ! ShardRegion.StartEntity(id))
}
override def receive = {
override def receive: Receive = {
case ack: ShardRegion.StartEntityAck
waitingForAck -= ack.entityId
// inform whoever requested the start that it happened
@ -438,7 +436,9 @@ private[akka] class RememberEntityStarter(
/**
* INTERNAL API: Common things for PersistentShard and DDataShard
*/
private[akka] trait RememberingShard { selfType: Shard
private[akka] trait RememberingShard {
selfType: Shard
import ShardRegion.{ EntityId, Msg }
import Shard._
import akka.pattern.pipe
@ -495,7 +495,7 @@ private[akka] trait RememberingShard { selfType: Shard ⇒
case None
if (state.entities.contains(id)) {
require(!messageBuffers.contains(id), s"Message buffers contains id [$id].")
getEntity(id).tell(payload, snd)
getOrCreateEntity(id).tell(payload, snd)
} else {
//Note; we only do this if remembering, otherwise the buffer is an overhead
messageBuffers.append(id, msg, snd)
@ -503,7 +503,6 @@ private[akka] trait RememberingShard { selfType: Shard ⇒
}
}
}
}
/**
@ -765,6 +764,7 @@ object EntityRecoveryStrategy {
}
trait EntityRecoveryStrategy {
import ShardRegion.EntityId
import scala.concurrent.Future
@ -772,12 +772,15 @@ trait EntityRecoveryStrategy {
}
final class AllAtOnceEntityRecoveryStrategy extends EntityRecoveryStrategy {
import ShardRegion.EntityId
override def recoverEntities(entities: Set[EntityId]): Set[Future[Set[EntityId]]] =
if (entities.isEmpty) Set.empty else Set(Future.successful(entities))
}
final class ConstantRateEntityRecoveryStrategy(actorSystem: ActorSystem, frequency: FiniteDuration, numberOfEntities: Int) extends EntityRecoveryStrategy {
import ShardRegion.EntityId
import actorSystem.dispatcher
import akka.pattern.after

View file

@ -0,0 +1,61 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import akka.actor.{ Actor, PoisonPill, Props }
import akka.cluster.sharding.PersistentShardSpec.EntityActor
import akka.cluster.sharding.Shard.{ GetShardStats, ShardStats }
import akka.cluster.sharding.ShardRegion.{ StartEntity, StartEntityAck }
import akka.testkit.{ AkkaSpec, ImplicitSender }
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
object PersistentShardSpec {
class EntityActor(id: String) extends Actor {
override def receive: Receive = {
case _
}
}
val config = ConfigFactory.parseString(
"""
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
""".stripMargin)
}
class PersistentShardSpec extends AkkaSpec(PersistentShardSpec.config) with WordSpecLike with ImplicitSender {
"Persistent Shard" must {
"remember entities started with StartEntity" in {
val props = Props(new PersistentShard(
"cats",
"shard-1",
id Props(new EntityActor(id)),
ClusterShardingSettings(system),
{
case _ ("entity-1", "msg")
},
_ "shard-1",
PoisonPill
))
val persistentShard = system.actorOf(props)
watch(persistentShard)
persistentShard ! StartEntity("entity-1")
expectMsg(StartEntityAck("entity-1", "shard-1"))
persistentShard ! PoisonPill
expectTerminated(persistentShard)
system.log.info("Starting shard again")
val secondIncarnation = system.actorOf(props)
secondIncarnation ! GetShardStats
awaitAssert(expectMsg(ShardStats("shard-1", 1)))
}
}
}