diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index 8302d5b80e..d8c1c45978 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -6,7 +6,7 @@ package akka.event import language.postfixOps import scala.concurrent.duration._ -import akka.actor.{ Actor, ActorRef, ActorSystemImpl, ActorSystem, Props, UnhandledMessage } +import akka.actor._ import com.typesafe.config.ConfigFactory import scala.collection.JavaConverters._ import akka.event.Logging.InitializeLogger @@ -28,11 +28,15 @@ object EventStreamSpec { akka { actor.serialize-messages = off stdout-loglevel = WARNING - loglevel = DEBUG + loglevel = WARNING actor.debug.unhandled = on } """) + val configUnhandledWithDebug = + ConfigFactory.parseString("akka.actor.debug.event-stream = on") + .withFallback(configUnhandled) + final case class M(i: Int) final case class SetTarget(ref: ActorRef) @@ -274,6 +278,130 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { es.unsubscribe(a1.ref, classOf[AT]) should be(true) es.unsubscribe(a2.ref, classOf[BTT]) should be(true) } + + "unsubscribe an actor on its termination" in { + val sys = ActorSystem("EventStreamSpecUnsubscribeOnTerminated", configUnhandledWithDebug) + + try { + val es = sys.eventStream + val a1, a2 = TestProbe() + val tm = new A + + val target = sys.actorOf(Props(new Actor { + def receive = { case in ⇒ a1.ref forward in } + }), "to-be-killed") + + es.subscribe(a2.ref, classOf[Any]) + es.subscribe(target, classOf[A]) should be(true) + es.subscribe(target, classOf[A]) should be(false) + + target ! PoisonPill + fishForDebugMessage(a2, s"unsubscribing $target from all channels") + fishForDebugMessage(a2, s"unwatching $target") + + es.publish(tm) + + a1.expectNoMsg(1 second) + a2.expectMsg(tm) + } finally { + sys.shutdown() + } + } + + "unsubscribe the actor, when it subscribes already in terminated state" in { + val sys = ActorSystem("EventStreamSpecUnsubscribeTerminated", configUnhandledWithDebug) + + try { + val es = sys.eventStream + val a1, a2 = TestProbe() + + val target = system.actorOf(Props(new Actor { + def receive = { case in ⇒ a1.ref forward in } + }), "to-be-killed") + + watch(target) + target ! PoisonPill + expectTerminated(target) + + es.subscribe(a2.ref, classOf[Any]) + + // target1 is Terminated; When subscribing, it will be unsubscribed by the Unsubscriber right away + es.subscribe(target, classOf[A]) should be(true) + fishForDebugMessage(a2, s"unsubscribing $target from all channels") + + es.subscribe(target, classOf[A]) should be(true) + fishForDebugMessage(a2, s"unsubscribing $target from all channels") + } finally { + sys.shutdown() + } + } + + "not allow initializing a TerminatedUnsubscriber twice" in { + val sys = ActorSystem("MustNotAllowDoubleInitOfTerminatedUnsubscriber", config) + // initializes an TerminatedUnsubscriber during start + + try { + val es = sys.eventStream + val p = TestProbe() + + val refWillBeUsedAsUnsubscriber = es.initUnsubscriber(p.ref) + + refWillBeUsedAsUnsubscriber should equal(false) + + } finally { + sys.shutdown() + } + } + + "unwatch an actor from unsubscriber when that actor unsubscribes from the stream" in { + val sys = ActorSystem("MustUnregisterDuringUnsubscribe", configUnhandledWithDebug) + + try { + val es = sys.eventStream + val a1, a2 = TestProbe() + + es.subscribe(a1.ref, classOf[Logging.Debug]) + + es.subscribe(a2.ref, classOf[A]) + fishForDebugMessage(a1, s"watching ${a2.ref}") + + es.unsubscribe(a2.ref) + fishForDebugMessage(a1, s"unwatching ${a2.ref}") + + } finally { + sys.shutdown() + } + } + + "unwatch an actor from unsubscriber when that actor unsubscribes from channels it subscribed" in { + val sys = ActorSystem("MustUnregisterWhenNoMoreChannelSubscriptions", configUnhandledWithDebug) + + try { + val es = sys.eventStream + val a1, a2 = TestProbe() + + es.subscribe(a1.ref, classOf[Logging.Debug]) + + es.subscribe(a2.ref, classOf[A]) + es.subscribe(a2.ref, classOf[T]) + fishForDebugMessage(a1, s"watching ${a2.ref}") + + es.unsubscribe(a2.ref, classOf[A]) should equal(true) + fishForDebugMessage(a1, s"unsubscribing ${a2.ref} from channel class akka.event.EventStreamSpec$$A") + a1.expectNoMsg(1 second) + + es.unsubscribe(a2.ref, classOf[T]) should equal(true) + fishForDebugMessage(a1, s"unsubscribing ${a2.ref} from channel interface akka.event.EventStreamSpec$$T") + fishForDebugMessage(a1, s"unwatching ${a2.ref}, since has no subscriptions") + a1.expectNoMsg(1 second) + + es.unsubscribe(a2.ref, classOf[T]) should equal(false) + + } finally { + sys.shutdown() + } + } + } private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) { @@ -284,4 +412,11 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { msg foreach (expectMsg(_)) } + private def fishForDebugMessage(a: TestProbe, messagePrefix: String) { + a.fishForMessage(3 seconds, hint = "expected debug message prefix: " + messagePrefix) { + case Logging.Debug(_, _, msg: String) if msg startsWith messagePrefix ⇒ true + case other ⇒ false + } + } + } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 0a7af66956..17eed69afb 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -618,6 +618,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, provider.init(this) if (settings.LogDeadLetters > 0) logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener")) + startEventStreamUnsubscriber(eventStream) registerOnTermination(stopScheduler()) loadExtensions() if (LogConfigOnStart) logConfiguration() @@ -662,6 +663,10 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, shutdown() } + def startEventStreamUnsubscriber(eventStream: EventStream) { + systemActorOf(Props(classOf[EventStreamUnsubscriber], eventStream, DebugEventStream), "eventStreamUnsubscriber") + } + //#create-scheduler /** * Create the scheduler service. This one needs one special behavior: if diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala index 9bbe4d86d0..357553934a 100644 --- a/akka-actor/src/main/scala/akka/event/EventBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -175,6 +175,13 @@ trait SubchannelClassification { this: EventBus ⇒ recv foreach (publish(event, _)) } + /** + * INTERNAL API + * Expensive call! Avoid calling directly from event bus subscribe / unsubscribe. + */ + private[akka] def hasSubscriptions(subscriber: Subscriber): Boolean = + cache.values exists { _ contains subscriber } + private def removeFromCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit = cache = (cache /: changes) { case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) -- cs) @@ -184,6 +191,7 @@ trait SubchannelClassification { this: EventBus ⇒ cache = (cache /: changes) { case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) ++ cs) } + } /** diff --git a/akka-actor/src/main/scala/akka/event/EventStream.scala b/akka-actor/src/main/scala/akka/event/EventStream.scala index d3dee900fa..7eac2f4c78 100644 --- a/akka-actor/src/main/scala/akka/event/EventStream.scala +++ b/akka-actor/src/main/scala/akka/event/EventStream.scala @@ -5,9 +5,11 @@ package akka.event import language.implicitConversions -import akka.actor.{ ActorRef, ActorSystem } +import akka.actor._ import akka.event.Logging.simpleName import akka.util.Subclassification +import java.util.concurrent.atomic.AtomicReference +import scala.annotation.tailrec object EventStream { //Why is this here and why isn't there a failing test if it is removed? @@ -28,6 +30,9 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su type Event = AnyRef type Classifier = Class[_] + /** Either the list of subscribed actors, or a ref to an [[akka.event.EventStreamUnsubscriber]] */ + private val initiallySubscribedOrUnsubscriber = new AtomicReference[Either[Set[ActorRef], ActorRef]](Left(Set.empty)) + protected implicit val subclassification = new Subclassification[Class[_]] { def isEqual(x: Class[_], y: Class[_]) = x == y def isSubclass(x: Class[_], y: Class[_]) = y isAssignableFrom x @@ -36,13 +41,13 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su protected def classify(event: AnyRef): Class[_] = event.getClass protected def publish(event: AnyRef, subscriber: ActorRef) = { - if (subscriber.isTerminated) unsubscribe(subscriber) - else subscriber ! event + subscriber ! event } override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = { if (subscriber eq null) throw new IllegalArgumentException("subscriber is null") if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "subscribing " + subscriber + " to channel " + channel)) + registerWithUnsubscriber(subscriber) super.subscribe(subscriber, channel) } @@ -50,6 +55,7 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su if (subscriber eq null) throw new IllegalArgumentException("subscriber is null") val ret = super.unsubscribe(subscriber, channel) if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from channel " + channel)) + unregisterIfNoMoreSubscribedChannels(subscriber) ret } @@ -57,6 +63,61 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su if (subscriber eq null) throw new IllegalArgumentException("subscriber is null") super.unsubscribe(subscriber) if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from all channels")) + unregisterIfNoMoreSubscribedChannels(subscriber) + } + + /** + * INTERNAL API + */ + private[akka] def initUnsubscriber(unsubscriber: ActorRef): Boolean = { + initiallySubscribedOrUnsubscriber.get match { + case value @ Left(subscribers) ⇒ + if (initiallySubscribedOrUnsubscriber.compareAndSet(value, Right(unsubscriber))) { + if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "initialized unsubscriber to: " + unsubscriber + ", registering " + subscribers.size + " initial subscribers with it")) + subscribers foreach registerWithUnsubscriber + true + } else { + // recurse, because either new subscribers have been registered since `get` (retry Left case), + // or another thread has succeeded in setting it's unsubscriber (end on Right case) + initUnsubscriber(unsubscriber) + } + + case Right(presentUnsubscriber) ⇒ + if (debug) publish(Logging.Debug(simpleName(this), this.getClass, s"not using unsubscriber $unsubscriber, because already initialized with $presentUnsubscriber")) + false + } + } + + /** + * INTERNAL API + */ + private[akka] def registerWithUnsubscriber(subscriber: ActorRef): Unit = { + initiallySubscribedOrUnsubscriber.get match { + case value @ Left(subscribers) ⇒ + if (!initiallySubscribedOrUnsubscriber.compareAndSet(value, Left(subscribers + subscriber))) + registerWithUnsubscriber(subscriber) + + case Right(unsubscriber) ⇒ + unsubscriber ! EventStreamUnsubscriber.Register(subscriber) + } + } + + /** + * INTERNAL API + * + * The actual check if the subscriber still has subscriptions is performed by the `EventStreamUnsubscriber`, + * because it's an expensive operation, and we don want to block client-code for that long, the Actor will eventually + * catch up and perform the apropriate operation. + */ + private[akka] def unregisterIfNoMoreSubscribedChannels(subscriber: ActorRef): Unit = { + initiallySubscribedOrUnsubscriber.get match { + case value @ Left(subscribers) ⇒ + if (!initiallySubscribedOrUnsubscriber.compareAndSet(value, Left(subscribers - subscriber))) + unregisterIfNoMoreSubscribedChannels(subscriber) + + case Right(unsubscriber) ⇒ + unsubscriber ! EventStreamUnsubscriber.UnregisterIfNoMoreSubscribedChannels(subscriber) + } } } diff --git a/akka-actor/src/main/scala/akka/event/EventStreamUnsubscriber.scala b/akka-actor/src/main/scala/akka/event/EventStreamUnsubscriber.scala new file mode 100644 index 0000000000..7b85ee8e52 --- /dev/null +++ b/akka-actor/src/main/scala/akka/event/EventStreamUnsubscriber.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.event + +import akka.actor._ +import akka.event.Logging.simpleName + +/** + * Watches all actors which subscribe on the given event stream, and unsubscribes them from it when they are Terminated. + */ +class EventStreamUnsubscriber(eventStream: EventStream, debug: Boolean = false) extends Actor with ActorLogging { + + import EventStreamUnsubscriber._ + + override def preStart() { + if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"registering unsubscriber with $eventStream")) + eventStream initUnsubscriber self + } + + def receive = { + case Register(actor) ⇒ + if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"watching $actor in order to unsubscribe from EventStream when it terminates")) + context watch actor + + case UnregisterIfNoMoreSubscribedChannels(actor) if eventStream.hasSubscriptions(actor) ⇒ + // do nothing + // hasSubscriptions can be slow, but it's better for this actor to take the hit than the EventStream + + case UnregisterIfNoMoreSubscribedChannels(actor) ⇒ + if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"unwatching $actor, since has no subscriptions")) + context unwatch actor + + case Terminated(actor) ⇒ + if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"unsubscribe $actor from $eventStream, because it was terminated")) + eventStream unsubscribe actor + } +} + +object EventStreamUnsubscriber { + final case class Register(actor: ActorRef) + final case class UnregisterIfNoMoreSubscribedChannels(actor: ActorRef) +} \ No newline at end of file