diff --git a/akka-actor-tests/src/test/scala/akka/event/AddressTerminatedTopicBenchSpec.scala b/akka-actor-tests/src/test/scala/akka/event/AddressTerminatedTopicBenchSpec.scala new file mode 100644 index 0000000000..bea5a92dc9 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/event/AddressTerminatedTopicBenchSpec.scala @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.event + +import scala.concurrent.duration._ +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.Address +import akka.actor.Props +import akka.testkit._ + +object AddressTerminatedTopicBenchSpec { + + class Subscriber(testActor: ActorRef) extends Actor { + AddressTerminatedTopic(context.system).subscribe(self) + testActor ! "started" + + override def postStop(): Unit = { + AddressTerminatedTopic(context.system).unsubscribe(self) + } + + def receive = Actor.emptyBehavior + } +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class AddressTerminatedTopicBenchSpec extends AkkaSpec("akka.loglevel=INFO") { + import AddressTerminatedTopicBenchSpec._ + + "Subscribe and unsubscribe of AddressTerminated" must { + + "be quick" in { + val sys = ActorSystem(system.name + "2", system.settings.config) + try { + val num = 20000 + + val t1 = System.nanoTime() + val p = Props(classOf[Subscriber], testActor) + val subscribers = Vector.fill(num)(sys.actorOf(p)) + receiveN(num, 10.seconds) + log.info("Starting {} actors took {} ms", num, (System.nanoTime() - t1).nanos.toMillis) + + val t2 = System.nanoTime() + shutdown(sys, 10.seconds, verifySystemShutdown = true) + log.info("Stopping {} actors took {} ms", num, (System.nanoTime() - t2).nanos.toMillis) + } finally { + if (!sys.isTerminated) shutdown(sys) + } + } + + } +} diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 58f891d6d9..13f923d35a 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -101,9 +101,9 @@ final case class Terminated private[akka] (@BeanProperty actor: ActorRef)( * INTERNAL API * * Used for remote death watch. Failure detector publish this to the - * `eventStream` when a remote node is detected to be unreachable and/or decided to + * [[akka.event.AddressTerminatedTopic]] when a remote node is detected to be unreachable and/or decided to * be removed. - * The watcher ([[akka.actor.dungeon.DeathWatch]]) subscribes to the `eventStream` + * The watcher ([[akka.actor.dungeon.DeathWatch]]) subscribes to the `AddressTerminatedTopic` * and translates this event to [[akka.actor.Terminated]], which is sent itself. */ @SerialVersionUID(1L) diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index 3684c8dd2e..b78fb871a1 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -4,11 +4,12 @@ package akka.actor.dungeon -import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorRefScope, ActorCell, Actor, Address, AddressTerminated } +import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorRefScope, ActorCell, Actor, Address } import akka.dispatch.sysmsg.{ DeathWatchNotification, Watch, Unwatch } import akka.event.Logging.{ Warning, Error, Debug } import scala.util.control.NonFatal import akka.actor.MinimalActorRef +import akka.event.AddressTerminatedTopic private[akka] trait DeathWatch { this: ActorCell ⇒ @@ -192,9 +193,9 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ } } - private def unsubscribeAddressTerminated(): Unit = system.eventStream.unsubscribe(self, classOf[AddressTerminated]) + private def unsubscribeAddressTerminated(): Unit = AddressTerminatedTopic(system).unsubscribe(self) - private def subscribeAddressTerminated(): Unit = system.eventStream.subscribe(self, classOf[AddressTerminated]) + private def subscribeAddressTerminated(): Unit = AddressTerminatedTopic(system).subscribe(self) } diff --git a/akka-actor/src/main/scala/akka/event/AddressTerminatedTopic.scala b/akka-actor/src/main/scala/akka/event/AddressTerminatedTopic.scala new file mode 100644 index 0000000000..df43982b71 --- /dev/null +++ b/akka-actor/src/main/scala/akka/event/AddressTerminatedTopic.scala @@ -0,0 +1,56 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.event + +import java.util.concurrent.atomic.AtomicReference +import scala.annotation.tailrec +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.AddressTerminated +import akka.actor.ExtendedActorSystem +import akka.actor.Extension +import akka.actor.ExtensionId +import akka.actor.ExtensionIdProvider + +/** + * INTERNAL API + * + * Watchers of remote actor references register themselves as subscribers + * of [[akka.actor.AddressTerminated]] notifications. Remote and cluster + * death watch publish `AddressTerminated` when a remote system is deemed + * dead. + */ +private[akka] object AddressTerminatedTopic extends ExtensionId[AddressTerminatedTopic] with ExtensionIdProvider { + override def get(system: ActorSystem): AddressTerminatedTopic = super.get(system) + + override def lookup = AddressTerminatedTopic + + override def createExtension(system: ExtendedActorSystem): AddressTerminatedTopic = + new AddressTerminatedTopic +} + +/** + * INTERNAL API + */ +private[akka] final class AddressTerminatedTopic extends Extension { + + private val subscribers = new AtomicReference[Set[ActorRef]](Set.empty[ActorRef]) + + @tailrec def subscribe(subscriber: ActorRef): Unit = { + val current = subscribers.get + if (!subscribers.compareAndSet(current, current + subscriber)) + subscribe(subscriber) // retry + } + + @tailrec def unsubscribe(subscriber: ActorRef): Unit = { + val current = subscribers.get + if (!subscribers.compareAndSet(current, current - subscriber)) + unsubscribe(subscriber) // retry + } + + def publish(msg: AddressTerminated): Unit = { + subscribers.get foreach { _.tell(msg, ActorRef.noSender) } + } + +} \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index a7e17e7687..b9af025d91 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -20,6 +20,7 @@ import akka.actor.SelectChildPattern import akka.actor.Identify import akka.actor.ActorIdentity import akka.actor.EmptyLocalActorRef +import akka.event.AddressTerminatedTopic /** * INTERNAL API @@ -52,7 +53,7 @@ private[akka] class RemoteSystemDaemon( private val terminating = new Switch(false) - system.eventStream.subscribe(this, classOf[AddressTerminated]) + AddressTerminatedTopic(system).subscribe(this) /** * Find the longest matching path which we know about and return that ref diff --git a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala index 3d523c7963..086f098602 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala @@ -19,6 +19,7 @@ import akka.actor.InternalActorRef import akka.dispatch.sysmsg.DeathWatchNotification import akka.dispatch.sysmsg.Watch import akka.actor.Deploy +import akka.event.AddressTerminatedTopic /** * INTERNAL API @@ -79,7 +80,7 @@ private[akka] object RemoteWatcher { * to the peer actor on the other node, which replies with [[RemoteWatcher.HeartbeatRsp]] * message back. The failure detector on the watching side monitors these heartbeat messages. * If arrival of hearbeat messages stops it will be detected and this actor will publish - * [[akka.actor.AddressTerminated]] to the `eventStream`. + * [[akka.actor.AddressTerminated]] to the [[akka.event.AddressTerminatedTopic]]. * * When all actors on a node have been unwatched it will stop sending heartbeat messages. * @@ -171,7 +172,7 @@ private[akka] class RemoteWatcher( } def publishAddressTerminated(address: Address): Unit = - context.system.eventStream.publish(AddressTerminated(address)) + AddressTerminatedTopic(context.system).publish(AddressTerminated(address)) def quarantine(address: Address, uid: Option[Int]): Unit = remoteProvider.quarantine(address, uid) diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index d34b486692..b5a3f65b2d 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -22,6 +22,7 @@ import scala.util.{ Failure, Success } import akka.remote.transport.AkkaPduCodec.Message import java.util.concurrent.ConcurrentHashMap import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } +import akka.event.AddressTerminatedTopic /** * INTERNAL API @@ -398,7 +399,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends "Address is now gated for {} ms, all messages to this address will be delivered to dead letters. Reason: {}", remoteAddress, settings.RetryGateClosedFor.toMillis, reason.getMessage) endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) - context.system.eventStream.publish(AddressTerminated(remoteAddress)) + AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress)) Stop case ShutDownAssociation(localAddress, remoteAddress, _) ⇒ @@ -406,7 +407,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends "Address is now gated for {} ms, all messages to this address will be delivered to dead letters.", remoteAddress, settings.RetryGateClosedFor.toMillis) endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) - context.system.eventStream.publish(AddressTerminated(remoteAddress)) + AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress)) Stop case HopelessAssociation(localAddress, remoteAddress, Some(uid), _) ⇒ @@ -416,7 +417,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid)) case _ ⇒ // disabled } - context.system.eventStream.publish(AddressTerminated(remoteAddress)) + AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress)) Stop case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒ @@ -424,7 +425,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends "Address cannot be quarantined without knowing the UID, gating instead for {} ms.", remoteAddress, settings.RetryGateClosedFor.toMillis) endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) - context.system.eventStream.publish(AddressTerminated(remoteAddress)) + AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress)) Stop case NonFatal(e) ⇒