=act, rem #2556 Optimize subscriptions of AddressTerminated
* unsubscribe in eventStream is too slow when using many actors that are watching remote actors, and becomes a problem for example when shutting down such a system * Previous eventStream solution: Stopping 20000 actors took 50355 ms * This solution: Stopping 20000 actors took 764 ms
This commit is contained in:
parent
ac5f4fc72e
commit
67925cb94e
7 changed files with 126 additions and 12 deletions
|
|
@ -0,0 +1,54 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.event
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.Address
|
||||
import akka.actor.Props
|
||||
import akka.testkit._
|
||||
|
||||
object AddressTerminatedTopicBenchSpec {
|
||||
|
||||
class Subscriber(testActor: ActorRef) extends Actor {
|
||||
AddressTerminatedTopic(context.system).subscribe(self)
|
||||
testActor ! "started"
|
||||
|
||||
override def postStop(): Unit = {
|
||||
AddressTerminatedTopic(context.system).unsubscribe(self)
|
||||
}
|
||||
|
||||
def receive = Actor.emptyBehavior
|
||||
}
|
||||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class AddressTerminatedTopicBenchSpec extends AkkaSpec("akka.loglevel=INFO") {
|
||||
import AddressTerminatedTopicBenchSpec._
|
||||
|
||||
"Subscribe and unsubscribe of AddressTerminated" must {
|
||||
|
||||
"be quick" in {
|
||||
val sys = ActorSystem(system.name + "2", system.settings.config)
|
||||
try {
|
||||
val num = 20000
|
||||
|
||||
val t1 = System.nanoTime()
|
||||
val p = Props(classOf[Subscriber], testActor)
|
||||
val subscribers = Vector.fill(num)(sys.actorOf(p))
|
||||
receiveN(num, 10.seconds)
|
||||
log.info("Starting {} actors took {} ms", num, (System.nanoTime() - t1).nanos.toMillis)
|
||||
|
||||
val t2 = System.nanoTime()
|
||||
shutdown(sys, 10.seconds, verifySystemShutdown = true)
|
||||
log.info("Stopping {} actors took {} ms", num, (System.nanoTime() - t2).nanos.toMillis)
|
||||
} finally {
|
||||
if (!sys.isTerminated) shutdown(sys)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -101,9 +101,9 @@ final case class Terminated private[akka] (@BeanProperty actor: ActorRef)(
|
|||
* INTERNAL API
|
||||
*
|
||||
* Used for remote death watch. Failure detector publish this to the
|
||||
* `eventStream` when a remote node is detected to be unreachable and/or decided to
|
||||
* [[akka.event.AddressTerminatedTopic]] when a remote node is detected to be unreachable and/or decided to
|
||||
* be removed.
|
||||
* The watcher ([[akka.actor.dungeon.DeathWatch]]) subscribes to the `eventStream`
|
||||
* The watcher ([[akka.actor.dungeon.DeathWatch]]) subscribes to the `AddressTerminatedTopic`
|
||||
* and translates this event to [[akka.actor.Terminated]], which is sent itself.
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
|
|
|
|||
|
|
@ -4,11 +4,12 @@
|
|||
|
||||
package akka.actor.dungeon
|
||||
|
||||
import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorRefScope, ActorCell, Actor, Address, AddressTerminated }
|
||||
import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorRefScope, ActorCell, Actor, Address }
|
||||
import akka.dispatch.sysmsg.{ DeathWatchNotification, Watch, Unwatch }
|
||||
import akka.event.Logging.{ Warning, Error, Debug }
|
||||
import scala.util.control.NonFatal
|
||||
import akka.actor.MinimalActorRef
|
||||
import akka.event.AddressTerminatedTopic
|
||||
|
||||
private[akka] trait DeathWatch { this: ActorCell ⇒
|
||||
|
||||
|
|
@ -192,9 +193,9 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
|
|||
}
|
||||
}
|
||||
|
||||
private def unsubscribeAddressTerminated(): Unit = system.eventStream.unsubscribe(self, classOf[AddressTerminated])
|
||||
private def unsubscribeAddressTerminated(): Unit = AddressTerminatedTopic(system).unsubscribe(self)
|
||||
|
||||
private def subscribeAddressTerminated(): Unit = system.eventStream.subscribe(self, classOf[AddressTerminated])
|
||||
private def subscribeAddressTerminated(): Unit = AddressTerminatedTopic(system).subscribe(self)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,56 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.event
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import scala.annotation.tailrec
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.AddressTerminated
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.Extension
|
||||
import akka.actor.ExtensionId
|
||||
import akka.actor.ExtensionIdProvider
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Watchers of remote actor references register themselves as subscribers
|
||||
* of [[akka.actor.AddressTerminated]] notifications. Remote and cluster
|
||||
* death watch publish `AddressTerminated` when a remote system is deemed
|
||||
* dead.
|
||||
*/
|
||||
private[akka] object AddressTerminatedTopic extends ExtensionId[AddressTerminatedTopic] with ExtensionIdProvider {
|
||||
override def get(system: ActorSystem): AddressTerminatedTopic = super.get(system)
|
||||
|
||||
override def lookup = AddressTerminatedTopic
|
||||
|
||||
override def createExtension(system: ExtendedActorSystem): AddressTerminatedTopic =
|
||||
new AddressTerminatedTopic
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] final class AddressTerminatedTopic extends Extension {
|
||||
|
||||
private val subscribers = new AtomicReference[Set[ActorRef]](Set.empty[ActorRef])
|
||||
|
||||
@tailrec def subscribe(subscriber: ActorRef): Unit = {
|
||||
val current = subscribers.get
|
||||
if (!subscribers.compareAndSet(current, current + subscriber))
|
||||
subscribe(subscriber) // retry
|
||||
}
|
||||
|
||||
@tailrec def unsubscribe(subscriber: ActorRef): Unit = {
|
||||
val current = subscribers.get
|
||||
if (!subscribers.compareAndSet(current, current - subscriber))
|
||||
unsubscribe(subscriber) // retry
|
||||
}
|
||||
|
||||
def publish(msg: AddressTerminated): Unit = {
|
||||
subscribers.get foreach { _.tell(msg, ActorRef.noSender) }
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -20,6 +20,7 @@ import akka.actor.SelectChildPattern
|
|||
import akka.actor.Identify
|
||||
import akka.actor.ActorIdentity
|
||||
import akka.actor.EmptyLocalActorRef
|
||||
import akka.event.AddressTerminatedTopic
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -52,7 +53,7 @@ private[akka] class RemoteSystemDaemon(
|
|||
|
||||
private val terminating = new Switch(false)
|
||||
|
||||
system.eventStream.subscribe(this, classOf[AddressTerminated])
|
||||
AddressTerminatedTopic(system).subscribe(this)
|
||||
|
||||
/**
|
||||
* Find the longest matching path which we know about and return that ref
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import akka.actor.InternalActorRef
|
|||
import akka.dispatch.sysmsg.DeathWatchNotification
|
||||
import akka.dispatch.sysmsg.Watch
|
||||
import akka.actor.Deploy
|
||||
import akka.event.AddressTerminatedTopic
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -79,7 +80,7 @@ private[akka] object RemoteWatcher {
|
|||
* to the peer actor on the other node, which replies with [[RemoteWatcher.HeartbeatRsp]]
|
||||
* message back. The failure detector on the watching side monitors these heartbeat messages.
|
||||
* If arrival of hearbeat messages stops it will be detected and this actor will publish
|
||||
* [[akka.actor.AddressTerminated]] to the `eventStream`.
|
||||
* [[akka.actor.AddressTerminated]] to the [[akka.event.AddressTerminatedTopic]].
|
||||
*
|
||||
* When all actors on a node have been unwatched it will stop sending heartbeat messages.
|
||||
*
|
||||
|
|
@ -171,7 +172,7 @@ private[akka] class RemoteWatcher(
|
|||
}
|
||||
|
||||
def publishAddressTerminated(address: Address): Unit =
|
||||
context.system.eventStream.publish(AddressTerminated(address))
|
||||
AddressTerminatedTopic(context.system).publish(AddressTerminated(address))
|
||||
|
||||
def quarantine(address: Address, uid: Option[Int]): Unit =
|
||||
remoteProvider.quarantine(address, uid)
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import scala.util.{ Failure, Success }
|
|||
import akka.remote.transport.AkkaPduCodec.Message
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
|
||||
import akka.event.AddressTerminatedTopic
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -398,7 +399,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
"Address is now gated for {} ms, all messages to this address will be delivered to dead letters. Reason: {}",
|
||||
remoteAddress, settings.RetryGateClosedFor.toMillis, reason.getMessage)
|
||||
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
|
||||
context.system.eventStream.publish(AddressTerminated(remoteAddress))
|
||||
AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
|
||||
Stop
|
||||
|
||||
case ShutDownAssociation(localAddress, remoteAddress, _) ⇒
|
||||
|
|
@ -406,7 +407,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
"Address is now gated for {} ms, all messages to this address will be delivered to dead letters.",
|
||||
remoteAddress, settings.RetryGateClosedFor.toMillis)
|
||||
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
|
||||
context.system.eventStream.publish(AddressTerminated(remoteAddress))
|
||||
AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
|
||||
Stop
|
||||
|
||||
case HopelessAssociation(localAddress, remoteAddress, Some(uid), _) ⇒
|
||||
|
|
@ -416,7 +417,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid))
|
||||
case _ ⇒ // disabled
|
||||
}
|
||||
context.system.eventStream.publish(AddressTerminated(remoteAddress))
|
||||
AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
|
||||
Stop
|
||||
|
||||
case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒
|
||||
|
|
@ -424,7 +425,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
"Address cannot be quarantined without knowing the UID, gating instead for {} ms.",
|
||||
remoteAddress, settings.RetryGateClosedFor.toMillis)
|
||||
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
|
||||
context.system.eventStream.publish(AddressTerminated(remoteAddress))
|
||||
AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
|
||||
Stop
|
||||
|
||||
case NonFatal(e) ⇒
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue