Automatic passivation for typed sharding, #25512

This commit is contained in:
Johan Andrén 2018-11-06 19:15:23 +01:00 committed by Patrik Nordwall
parent 3ccc67b3d6
commit 133c41375f
8 changed files with 236 additions and 18 deletions

View file

@ -12,7 +12,6 @@ import akka.actor.ActorSystem
import akka.actor.Deploy
import akka.actor.Props
import akka.actor.Terminated
import akka.cluster.sharding.Shard.ShardCommand
import akka.actor.Actor
import akka.util.MessageBufferMap
@ -26,6 +25,8 @@ import akka.actor.Stash
import akka.persistence._
import akka.actor.NoSerializationVerificationNeeded
import scala.concurrent.duration._
/**
* INTERNAL API
* @see [[ClusterSharding$ ClusterSharding extension]]
@ -115,6 +116,9 @@ private[akka] object Shard {
Props(new Shard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage))
.withDeploy(Deploy.local)
}
private case object PassivateIdleTick extends NoSerializationVerificationNeeded
}
/**
@ -144,11 +148,20 @@ private[akka] class Shard(
var state = State.Empty
var idByRef = Map.empty[ActorRef, EntityId]
var refById = Map.empty[EntityId, ActorRef]
var lastMessageTimestamp = Map.empty[EntityId, Long]
var passivating = Set.empty[ActorRef]
val messageBuffers = new MessageBufferMap[EntityId]
var handOffStopper: Option[ActorRef] = None
import context.dispatcher
val passivateIdleTask = if (settings.passivateIdleEntityAfter > Duration.Zero) {
val idleInterval = settings.passivateIdleEntityAfter / 2
Some(context.system.scheduler.schedule(idleInterval, idleInterval, self, PassivateIdleTick))
} else {
None
}
initialized()
def initialized(): Unit = context.parent ! ShardInitialized(shardId)
@ -166,6 +179,7 @@ private[akka] class Shard(
case msg: ShardRegion.StartEntityAck receiveStartEntityAck(msg)
case msg: ShardRegionCommand receiveShardRegionCommand(msg)
case msg: ShardQuery receiveShardQuery(msg)
case PassivateIdleTick passivateIdleEntities()
case msg if extractEntityId.isDefinedAt(msg) deliverMessage(msg, sender())
}
@ -176,6 +190,9 @@ private[akka] class Shard(
def receiveStartEntity(start: ShardRegion.StartEntity): Unit = {
log.debug("Got a request from [{}] to start entity [{}] in shard [{}]", sender(), start.entityId, shardId)
if (passivateIdleTask.isDefined) {
lastMessageTimestamp = lastMessageTimestamp.updated(start.entityId, System.nanoTime())
}
getEntity(start.entityId)
sender() ! ShardRegion.StartEntityAck(start.entityId, shardId)
}
@ -240,6 +257,9 @@ private[akka] class Shard(
val id = idByRef(ref)
idByRef -= ref
refById -= id
if (passivateIdleTask.isDefined) {
lastMessageTimestamp -= id
}
if (messageBuffers.getOrEmpty(id).nonEmpty) {
log.debug("Starting entity [{}] again, there are buffered messages for it", id)
sendMsgBuffer(EntityStarted(id))
@ -253,8 +273,6 @@ private[akka] class Shard(
def passivate(entity: ActorRef, stopMessage: Any): Unit = {
idByRef.get(entity) match {
case Some(id) if (!messageBuffers.contains(id)) {
log.debug("Passivating started on entity {}", id)
passivating = passivating + entity
messageBuffers.add(id)
entity ! stopMessage
@ -265,6 +283,17 @@ private[akka] class Shard(
}
}
def passivateIdleEntities(): Unit = {
val deadline = System.nanoTime() - settings.passivateIdleEntityAfter.toNanos
val refsToPassivate = lastMessageTimestamp.collect {
case (entityId, lastMessageTimestamp) if lastMessageTimestamp < deadline refById(entityId)
}
if (refsToPassivate.nonEmpty) {
log.debug("Passivating [{}] idle entities", refsToPassivate.size)
refsToPassivate.foreach(passivate(_, handOffStopMessage))
}
}
// EntityStopped handler
def passivateCompleted(event: EntityStopped): Unit = {
log.debug("Entity stopped after passivation [{}]", event.entityId)
@ -295,25 +324,31 @@ private[akka] class Shard(
if (id == null || id == "") {
log.warning("Id must not be empty, dropping message [{}]", msg.getClass.getName)
context.system.deadLetters ! msg
} else if (payload.isInstanceOf[ShardRegion.StartEntity]) {
// in case it was wrapped, used in Typed
receiveStartEntity(payload.asInstanceOf[ShardRegion.StartEntity])
} else {
messageBuffers.contains(id) match {
case false deliverTo(id, msg, payload, snd)
payload match {
case start: ShardRegion.StartEntity
// in case it was wrapped, used in Typed
receiveStartEntity(start)
case _
messageBuffers.contains(id) match {
case false deliverTo(id, msg, payload, snd)
case true if messageBuffers.totalSize >= bufferSize
log.debug("Buffer is full, dropping message for entity [{}]", id)
context.system.deadLetters ! msg
case true if messageBuffers.totalSize >= bufferSize
log.debug("Buffer is full, dropping message for entity [{}]", id)
context.system.deadLetters ! msg
case true
log.debug("Message for entity [{}] buffered", id)
messageBuffers.append(id, msg, snd)
case true
log.debug("Message for entity [{}] buffered", id)
messageBuffers.append(id, msg, snd)
}
}
}
}
def deliverTo(id: EntityId, msg: Any, payload: Msg, snd: ActorRef): Unit = {
if (passivateIdleTask.isDefined) {
lastMessageTimestamp = lastMessageTimestamp.updated(id, System.nanoTime())
}
val name = URLEncoder.encode(id, "utf-8")
context.child(name) match {
case Some(actor) actor.tell(payload, snd)
@ -329,10 +364,17 @@ private[akka] class Shard(
val a = context.watch(context.actorOf(entityProps(id), name))
idByRef = idByRef.updated(a, id)
refById = refById.updated(id, a)
if (passivateIdleTask.isDefined) {
lastMessageTimestamp += (id -> System.nanoTime())
}
state = state.copy(state.entities + id)
a
}
}
override def postStop(): Unit = {
passivateIdleTask.foreach(_.cancel())
}
}
private[akka] object RememberEntityStarter {
@ -425,6 +467,9 @@ private[akka] trait RememberingShard { selfType: Shard ⇒
val id = idByRef(ref)
idByRef -= ref
refById -= id
if (passivateIdleTask.isDefined) {
lastMessageTimestamp -= id
}
if (messageBuffers.getOrEmpty(id).nonEmpty) {
//Note; because we're not persisting the EntityStopped, we don't need
// to persist the EntityStarted either.