diff --git a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala index 85dd584d59..94d18f59e0 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala @@ -10,9 +10,11 @@ import org.scalatest.BeforeAndAfterEach import akka.testkit._ import scala.concurrent.duration._ import java.util.concurrent.atomic._ -import akka.actor.{ Props, Actor, ActorRef, ActorSystem } -import java.util.Comparator + +import akka.actor.{ Props, Actor, ActorRef, ActorSystem, PoisonPill, RootActorPath } import akka.japi.{ Procedure, Function } +import com.typesafe.config.{ Config, ConfigFactory } +import scala.concurrent.Await object EventBusSpec { class TestActorWrapperActor(testActor: ActorRef) extends Actor { @@ -23,7 +25,7 @@ object EventBusSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfterEach { +abstract class EventBusSpec(busName: String, conf: Config = ConfigFactory.empty()) extends AkkaSpec(conf) with BeforeAndAfterEach { import EventBusSpec._ type BusType <: EventBus @@ -37,13 +39,13 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit - busName must { + lazy val bus = createNewEventBus() + busName must { def createNewSubscriber() = createSubscriber(testActor).asInstanceOf[bus.Subscriber] def getClassifierFor(event: BusType#Event) = classifierFor(event).asInstanceOf[bus.Classifier] def createNewEvents(numberOfEvents: Int): Iterable[bus.Event] = createEvents(numberOfEvents).asInstanceOf[Iterable[bus.Event]] - val bus = createNewEventBus() val events = createNewEvents(100) val event = events.head val classifier = getClassifierFor(event) @@ -144,30 +146,137 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte } object ActorEventBusSpec { - class ComposedActorEventBus extends ActorEventBus with LookupClassification { - type Event = Int - type Classifier = String + class MyActorEventBus(protected val system: ActorSystem) extends ActorEventBus + with ActorClassification with ActorClassifier { - def classify(event: Event) = event.toString + type Event = Notification + + def classify(event: Event) = event.ref protected def mapSize = 32 def publish(event: Event, subscriber: Subscriber) = subscriber ! event } + + case class Notification(ref: ActorRef, payload: Int) } -class ActorEventBusSpec extends EventBusSpec("ActorEventBus") { +class ActorEventBusSpec(conf: Config) extends EventBusSpec("ActorEventBus", conf) { import akka.event.ActorEventBusSpec._ import EventBusSpec.TestActorWrapperActor - type BusType = ComposedActorEventBus - def createNewEventBus(): BusType = new ComposedActorEventBus + def this() { + this(ConfigFactory.parseString("akka.actor.debug.event-stream = on").withFallback(AkkaSpec.testConf)) + } - def createEvents(numberOfEvents: Int) = (0 until numberOfEvents) + type BusType = MyActorEventBus + def createNewEventBus(): BusType = new MyActorEventBus(system) + + // different actor in each event because we want each event to have a different classifier (see EventBusSpec tests) + def createEvents(numberOfEvents: Int) = (0 until numberOfEvents).map(Notification(TestProbe().ref, _)).toSeq def createSubscriber(pipeTo: ActorRef) = system.actorOf(Props(new TestActorWrapperActor(pipeTo))) - def classifierFor(event: BusType#Event) = event.toString + def classifierFor(event: BusType#Event) = event.ref def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = system.stop(subscriber) + + // ActorClassification specific tests + + "must unsubscribe subscriber when it terminates" in { + val a1 = createSubscriber(system.deadLetters) + val subs = createSubscriber(testActor) + def m(i: Int) = Notification(a1, i) + val p = TestProbe() + system.eventStream.subscribe(p.ref, classOf[Logging.Debug]) + + bus.subscribe(subs, a1) + bus.publish(m(1)) + expectMsg(m(1)) + + watch(subs) + subs ! PoisonPill // when a1 dies, subs has nothing subscribed + expectTerminated(subs) + expectUnsubscribedByUnsubscriber(p, subs) + + bus.publish(m(2)) + expectNoMsg(1 second) + + disposeSubscriber(system, subs) + disposeSubscriber(system, a1) + } + + "must keep subscriber even if its subscription-actors have died" in { + // Deaths of monitored actors should not influence the subscription. + // For example: one might still want to monitor messages classified to A + // even though it died, and handle these in some way. + val a1 = createSubscriber(system.deadLetters) + val subs = createSubscriber(testActor) + def m(i: Int) = Notification(a1, i) + + bus.subscribe(subs, a1) should equal(true) + + bus.publish(m(1)) + expectMsg(m(1)) + + watch(a1) + a1 ! PoisonPill + expectTerminated(a1) + + bus.publish(m(2)) // even though a1 has terminated, classification still applies + expectMsg(m(2)) + + disposeSubscriber(system, subs) + disposeSubscriber(system, a1) + } + + "must unregister subscriber only after it unsubscribes from all of it's subscriptions" in { + val a1, a2 = createSubscriber(system.deadLetters) + val subs = createSubscriber(testActor) + def m1(i: Int) = Notification(a1, i) + def m2(i: Int) = Notification(a2, i) + + val p = TestProbe() + system.eventStream.subscribe(p.ref, classOf[Logging.Debug]) + + bus.subscribe(subs, a1) should equal(true) + bus.subscribe(subs, a2) should equal(true) + + bus.publish(m1(1)) + bus.publish(m2(1)) + expectMsg(m1(1)) + expectMsg(m2(1)) + + bus.unsubscribe(subs, a1) + bus.publish(m1(2)) + expectNoMsg(1 second) + bus.publish(m2(2)) + expectMsg(m2(2)) + + bus.unsubscribe(subs, a2) + expectUnregisterFromUnsubscriber(p, subs) + bus.publish(m1(3)) + bus.publish(m2(3)) + expectNoMsg(1 second) + + disposeSubscriber(system, subs) + disposeSubscriber(system, a1) + disposeSubscriber(system, a2) + } + + private def expectUnsubscribedByUnsubscriber(p: TestProbe, a: ActorRef) { + val expectedMsg = s"actor $a has terminated, unsubscribing it from $bus" + p.fishForMessage(1 second, hint = expectedMsg) { + case Logging.Debug(_, _, msg) if msg equals expectedMsg ⇒ true + case other ⇒ false + } + } + + private def expectUnregisterFromUnsubscriber(p: TestProbe, a: ActorRef) { + val expectedMsg = s"unregistered watch of $a in $bus" + p.fishForMessage(1 second, hint = expectedMsg) { + case Logging.Debug(_, _, msg) if msg equals expectedMsg ⇒ true + case other ⇒ false + } + } } object ScanningEventBusSpec { diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index d8c1c45978..1e311a5f65 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -79,7 +79,11 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { "An EventStream" must { "manage subscriptions" in { - val bus = new EventStream(true) + //#event-bus-start-unsubscriber-scala + val bus = new EventStream(system, true) + bus.startUnsubscriber() + //#event-bus-start-unsubscriber-scala + bus.subscribe(testActor, classOf[M]) bus.publish(M(42)) within(1 second) { @@ -91,12 +95,12 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { } "not allow null as subscriber" in { - val bus = new EventStream(true) + val bus = new EventStream(system, true) intercept[IllegalArgumentException] { bus.subscribe(null, classOf[M]) }.getMessage should be("subscriber is null") } "not allow null as unsubscriber" in { - val bus = new EventStream(true) + val bus = new EventStream(system, true) intercept[IllegalArgumentException] { bus.unsubscribe(null, classOf[M]) }.getMessage should be("subscriber is null") intercept[IllegalArgumentException] { bus.unsubscribe(null) }.getMessage should be("subscriber is null") } @@ -115,7 +119,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { } "manage log levels" in { - val bus = new EventStream(false) + val bus = new EventStream(system, false) bus.startDefaultLoggers(impl) bus.publish(SetTarget(testActor)) expectMsg("OK") @@ -136,7 +140,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { val b1 = new B1 val b2 = new B2 val c = new C - val bus = new EventStream(false) + val bus = new EventStream(system, false) within(2 seconds) { bus.subscribe(testActor, classOf[B2]) should be(true) bus.publish(c) @@ -158,7 +162,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { } "manage sub-channels using classes and traits (update on subscribe)" in { - val es = new EventStream(false) + val es = new EventStream(system, false) val tm1 = new CC val tm2 = new CCATBT val a1, a2, a3, a4 = TestProbe() @@ -181,7 +185,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { } "manage sub-channels using classes and traits (update on unsubscribe)" in { - val es = new EventStream(false) + val es = new EventStream(system, false) val tm1 = new CC val tm2 = new CCATBT val a1, a2, a3, a4 = TestProbe() @@ -203,7 +207,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { } "manage sub-channels using classes and traits (update on unsubscribe all)" in { - val es = new EventStream(false) + val es = new EventStream(system, false) val tm1 = new CC val tm2 = new CCATBT val a1, a2, a3, a4 = TestProbe() @@ -225,7 +229,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { } "manage sub-channels using classes and traits (update on publish)" in { - val es = new EventStream(false) + val es = new EventStream(system, false) val tm1 = new CC val tm2 = new CCATBT val a1, a2 = TestProbe() @@ -241,7 +245,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { } "manage sub-channels using classes and traits (unsubscribe classes used with trait)" in { - val es = new EventStream(false) + val es = new EventStream(system, false) val tm1 = new CC val tm2 = new CCATBT val a1, a2, a3 = TestProbe() @@ -263,7 +267,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { } "manage sub-channels using classes and traits (subscribe after publish)" in { - val es = new EventStream(false) + val es = new EventStream(system, false) val tm1 = new CCATBT val a1, a2 = TestProbe() @@ -304,7 +308,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { a1.expectNoMsg(1 second) a2.expectMsg(tm) } finally { - sys.shutdown() + shutdown(sys) } } @@ -332,7 +336,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { es.subscribe(target, classOf[A]) should be(true) fishForDebugMessage(a2, s"unsubscribing $target from all channels") } finally { - sys.shutdown() + shutdown(sys) } } @@ -349,7 +353,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { refWillBeUsedAsUnsubscriber should equal(false) } finally { - sys.shutdown() + shutdown(sys) } } @@ -369,7 +373,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { fishForDebugMessage(a1, s"unwatching ${a2.ref}") } finally { - sys.shutdown() + shutdown(sys) } } @@ -398,7 +402,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { es.unsubscribe(a2.ref, classOf[T]) should equal(false) } finally { - sys.shutdown() + shutdown(sys) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 17eed69afb..b449ba4eb1 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -567,7 +567,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, import settings._ // this provides basic logging (to stdout) until .start() is called below - val eventStream: EventStream = new EventStream(DebugEventStream) + val eventStream = new EventStream(this, DebugEventStream) eventStream.startStdoutLogger(settings) val log: LoggingAdapter = new BusLogging(eventStream, "ActorSystem(" + name + ")", this.getClass) @@ -618,7 +618,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, provider.init(this) if (settings.LogDeadLetters > 0) logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener")) - startEventStreamUnsubscriber(eventStream) + eventStream.startUnsubscriber() registerOnTermination(stopScheduler()) loadExtensions() if (LogConfigOnStart) logConfiguration() @@ -663,10 +663,6 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, shutdown() } - def startEventStreamUnsubscriber(eventStream: EventStream) { - systemActorOf(Props(classOf[EventStreamUnsubscriber], eventStream, DebugEventStream), "eventStreamUnsubscriber") - } - //#create-scheduler /** * Create the scheduler service. This one needs one special behavior: if diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala index 357553934a..f1ddcafd97 100644 --- a/akka-actor/src/main/scala/akka/event/EventBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -4,13 +4,13 @@ package akka.event -import akka.actor.ActorRef +import akka.actor.{ ActorSystem, ActorRef } import akka.util.Index import java.util.concurrent.ConcurrentSkipListSet import java.util.Comparator import akka.util.{ Subclassification, SubclassifiedIndex } -import scala.collection.immutable.TreeSet import scala.collection.immutable +import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger } /** * Represents the base type for EventBuses @@ -251,80 +251,113 @@ trait ScanningClassification { self: EventBus ⇒ } /** - * Maps ActorRefs to ActorRefs to form an EventBus where ActorRefs can listen to other ActorRefs + * Maps ActorRefs to ActorRefs to form an EventBus where ActorRefs can listen to other ActorRefs. + * + * All subscribers will be watched by an [[akka.event.ActorClassificationUnsubscriber]] and unsubscribed when they terminate. + * The unsubscriber actor will not be stopped automatically, and if you want to stop using the bus you should stop it yourself. */ trait ActorClassification { this: ActorEventBus with ActorClassifier ⇒ - import java.util.concurrent.ConcurrentHashMap import scala.annotation.tailrec - private val empty = TreeSet.empty[ActorRef] - private val mappings = new ConcurrentHashMap[ActorRef, TreeSet[ActorRef]](mapSize) + + protected def system: ActorSystem + + private class ActorClassificationMappings(val seqNr: Int, val backing: Map[ActorRef, immutable.TreeSet[ActorRef]]) { + + def get(monitored: ActorRef): immutable.TreeSet[ActorRef] = backing.getOrElse(monitored, empty) + + def add(monitored: ActorRef, monitor: ActorRef) = { + val watchers = backing.get(monitored).getOrElse(empty) + monitor + new ActorClassificationMappings(seqNr + 1, backing.updated(monitored, watchers)) + } + + def remove(monitored: ActorRef, monitor: ActorRef) = { + val monitors = backing.get(monitored).getOrElse(empty) - monitor + new ActorClassificationMappings(seqNr + 1, backing.updated(monitored, monitors)) + } + + def remove(monitored: ActorRef) = { + val v = backing - monitored + new ActorClassificationMappings(seqNr + 1, v) + } + } + + private val mappings = new AtomicReference[ActorClassificationMappings]( + new ActorClassificationMappings(0, Map.empty[ActorRef, immutable.TreeSet[ActorRef]])) + + private val empty = immutable.TreeSet.empty[ActorRef] + + /** The unsubscriber takes care of unsubscribing actors, which have terminated. */ + protected lazy val unsubscriber = ActorClassificationUnsubscriber.start(system, this) @tailrec protected final def associate(monitored: ActorRef, monitor: ActorRef): Boolean = { - val current = mappings get monitored - current match { - case null ⇒ - if (monitored.isTerminated) false + val current = mappings.get + + current.backing.get(monitored) match { + case None ⇒ + val added = current.add(monitored, monitor) + + if (mappings.compareAndSet(current, added)) registerWithUnsubscriber(monitor, added.seqNr) + else associate(monitored, monitor) + + case Some(monitors) ⇒ + if (monitors.contains(monitored)) false else { - if (mappings.putIfAbsent(monitored, empty + monitor) ne null) associate(monitored, monitor) - else if (monitored.isTerminated) !dissociate(monitored, monitor) else true - } - case raw: TreeSet[_] ⇒ - val v = raw.asInstanceOf[TreeSet[ActorRef]] - if (monitored.isTerminated) false - if (v.contains(monitor)) true - else { - val added = v + monitor - if (!mappings.replace(monitored, v, added)) associate(monitored, monitor) - else if (monitored.isTerminated) !dissociate(monitored, monitor) else true + val added = current.add(monitored, monitor) + val noChange = current.backing == added.backing + + if (noChange) false + else if (mappings.compareAndSet(current, added)) registerWithUnsubscriber(monitor, added.seqNr) + else associate(monitored, monitor) } } } - protected final def dissociate(monitored: ActorRef): immutable.Iterable[ActorRef] = { + protected final def dissociate(actor: ActorRef): Unit = { @tailrec - def dissociateAsMonitored(monitored: ActorRef): immutable.Iterable[ActorRef] = { - val current = mappings get monitored - current match { - case null ⇒ empty - case raw: TreeSet[_] ⇒ - val v = raw.asInstanceOf[TreeSet[ActorRef]] - if (!mappings.remove(monitored, v)) dissociateAsMonitored(monitored) - else v + def dissociateAsMonitored(monitored: ActorRef): Unit = { + val current = mappings.get + if (current.backing.contains(monitored)) { + val removed = current.remove(monitored) + if (!mappings.compareAndSet(current, removed)) + dissociateAsMonitored(monitored) } } def dissociateAsMonitor(monitor: ActorRef): Unit = { - val i = mappings.entrySet.iterator - while (i.hasNext()) { - val entry = i.next() - val v = entry.getValue - v match { - case raw: TreeSet[_] ⇒ - val monitors = raw.asInstanceOf[TreeSet[ActorRef]] + val current = mappings.get + val i = current.backing.iterator + while (i.hasNext) { + val (key, value) = i.next() + value match { + case null ⇒ + // do nothing + + case monitors ⇒ if (monitors.contains(monitor)) - dissociate(entry.getKey, monitor) - case _ ⇒ //Dun care + dissociate(key, monitor) } } } - try { dissociateAsMonitored(monitored) } finally { dissociateAsMonitor(monitored) } + try { dissociateAsMonitored(actor) } finally { dissociateAsMonitor(actor) } } @tailrec protected final def dissociate(monitored: ActorRef, monitor: ActorRef): Boolean = { - val current = mappings get monitored - current match { - case null ⇒ false - case raw: TreeSet[_] ⇒ - val v = raw.asInstanceOf[TreeSet[ActorRef]] - val removed = v - monitor - if (removed eq raw) false - else if (removed.isEmpty) { - if (!mappings.remove(monitored, v)) dissociate(monitored, monitor) else true + val current = mappings.get + + current.backing.get(monitored) match { + case None ⇒ false + case Some(monitors) ⇒ + val removed = current.remove(monitored, monitor) + val removedMonitors = removed.get(monitored) + + if (monitors.isEmpty || monitors == removedMonitors) { + false } else { - if (!mappings.replace(monitored, v, removed)) dissociate(monitored, monitor) else true + if (mappings.compareAndSet(current, removed)) unregisterFromUnsubscriber(monitor, removed.seqNr) + else dissociate(monitored, monitor) } } } @@ -339,9 +372,11 @@ trait ActorClassification { this: ActorEventBus with ActorClassifier ⇒ */ protected def mapSize: Int - def publish(event: Event): Unit = mappings.get(classify(event)) match { - case null ⇒ () - case some ⇒ some foreach { _ ! event } + def publish(event: Event): Unit = { + mappings.get.backing.get(classify(event)) match { + case None ⇒ () + case Some(refs) ⇒ refs.foreach { _ ! event } + } } def subscribe(subscriber: Subscriber, to: Classifier): Boolean = @@ -357,4 +392,20 @@ trait ActorClassification { this: ActorEventBus with ActorClassifier ⇒ def unsubscribe(subscriber: Subscriber): Unit = if (subscriber eq null) throw new IllegalArgumentException("Subscriber is null") else dissociate(subscriber) + + /** + * INTERNAL API + */ + private[akka] def registerWithUnsubscriber(subscriber: ActorRef, seqNr: Int): Boolean = { + unsubscriber ! ActorClassificationUnsubscriber.Register(subscriber, seqNr) + true + } + + /** + * INTERNAL API + */ + private[akka] def unregisterFromUnsubscriber(subscriber: ActorRef, seqNr: Int): Boolean = { + unsubscriber ! ActorClassificationUnsubscriber.Unregister(subscriber, seqNr) + true + } } diff --git a/akka-actor/src/main/scala/akka/event/EventBusUnsubscribers.scala b/akka-actor/src/main/scala/akka/event/EventBusUnsubscribers.scala new file mode 100644 index 0000000000..c8eaa6649f --- /dev/null +++ b/akka-actor/src/main/scala/akka/event/EventBusUnsubscribers.scala @@ -0,0 +1,142 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.event + +import akka.actor._ +import akka.event.Logging.simpleName +import java.util.concurrent.atomic.AtomicInteger + +/** + * INTERNAL API + * + * Watches all actors which subscribe on the given eventStream, and unsubscribes them from it when they are Terminated. + * + * Assumptions note: + * We do not guarantee happens-before in the EventStream when 2 threads subscribe(a) / unsubscribe(a) on the same actor, + * thus the messages sent to this actor may appear to be reordered - this is fine, because the worst-case is starting to + * needlessly watch the actor which will not cause trouble for the stream. This is a trade-off between slowing down + * subscribe calls * because of the need of linearizing the history message sequence and the possibility of sometimes + * watching a few actors too much - we opt for the 2nd choice here. + */ +private[akka] class EventStreamUnsubscriber(eventStream: EventStream, debug: Boolean = false) extends Actor { + + import EventStreamUnsubscriber._ + + override def preStart() { + if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"registering unsubscriber with $eventStream")) + eventStream initUnsubscriber self + } + + def receive = { + case Register(actor) ⇒ + if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"watching $actor in order to unsubscribe from EventStream when it terminates")) + context watch actor + + case UnregisterIfNoMoreSubscribedChannels(actor) if eventStream.hasSubscriptions(actor) ⇒ + // do nothing + // hasSubscriptions can be slow, but it's better for this actor to take the hit than the EventStream + + case UnregisterIfNoMoreSubscribedChannels(actor) ⇒ + if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"unwatching $actor, since has no subscriptions")) + context unwatch actor + + case Terminated(actor) ⇒ + if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"unsubscribe $actor from $eventStream, because it was terminated")) + eventStream unsubscribe actor + } +} + +/** + * INTERNAL API + * + * Provides factory for [[akka.event.EventStreamUnsubscriber]] actors with **unique names**. + * This is needed if someone spins up more [[EventStream]]s using the same [[ActorSystem]], + * each stream gets it's own unsubscriber. + */ +private[akka] object EventStreamUnsubscriber { + + private val unsubscribersCount = new AtomicInteger(0) + + final case class Register(actor: ActorRef) + + final case class UnregisterIfNoMoreSubscribedChannels(actor: ActorRef) + + private def props(eventStream: EventStream, debug: Boolean) = + Props(classOf[EventStreamUnsubscriber], eventStream, debug) + + def start(system: ActorSystem, stream: EventStream) = { + val debug = system.settings.config.getBoolean("akka.actor.debug.event-stream") + system.asInstanceOf[ExtendedActorSystem] + .systemActorOf(props(stream, debug), "eventStreamUnsubscriber-" + unsubscribersCount.incrementAndGet()) + } + +} + +/** + * INTERNAL API + * + * Watches all actors which subscribe on the given event stream, and unsubscribes them from it when they are Terminated. + */ +private[akka] class ActorClassificationUnsubscriber(bus: ActorClassification, debug: Boolean) extends Actor with Stash { + + import ActorClassificationUnsubscriber._ + + private var atSeq = 0 + private def nextSeq = atSeq + 1 + + override def preStart() { + super.preStart() + if (debug) context.system.eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"will monitor $bus")) + } + + def receive = { + case Register(actor, seq) if seq == nextSeq ⇒ + if (debug) context.system.eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"registered watch for $actor in $bus")) + context watch actor + atSeq = nextSeq + unstashAll() + + case reg: Register ⇒ + stash() + + case Unregister(actor, seq) if seq == nextSeq ⇒ + if (debug) context.system.eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"unregistered watch of $actor in $bus")) + context unwatch actor + atSeq = nextSeq + unstashAll() + + case unreg: Unregister ⇒ + stash() + + case Terminated(actor) ⇒ + if (debug) context.system.eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"actor $actor has terminated, unsubscribing it from $bus")) + // the `unsubscribe` will trigger another `Unregister(actor, _)` message to this unsubscriber; + // but since that actor is terminated, there cannot be any harm in processing an Unregister for it. + bus unsubscribe actor + } + +} + +/** + * INTERNAL API + * + * Provides factory for [[akka.event.ActorClassificationUnsubscriber]] actors with **unique names**. + */ +private[akka] object ActorClassificationUnsubscriber { + + private val unsubscribersCount = new AtomicInteger(0) + + final case class Register(actor: ActorRef, seq: Int) + final case class Unregister(actor: ActorRef, seq: Int) + + def start(system: ActorSystem, bus: ActorClassification, debug: Boolean = false) = { + val debug = system.settings.config.getBoolean("akka.actor.debug.event-stream") + system.asInstanceOf[ExtendedActorSystem] + .systemActorOf(props(bus, debug), "actorClassificationUnsubscriber-" + unsubscribersCount.incrementAndGet()) + } + + private def props(eventBus: ActorClassification, debug: Boolean) = Props(classOf[ActorClassificationUnsubscriber], eventBus, debug) + +} + diff --git a/akka-actor/src/main/scala/akka/event/EventStream.scala b/akka-actor/src/main/scala/akka/event/EventStream.scala index 7eac2f4c78..9990c75720 100644 --- a/akka-actor/src/main/scala/akka/event/EventStream.scala +++ b/akka-actor/src/main/scala/akka/event/EventStream.scala @@ -5,17 +5,12 @@ package akka.event import language.implicitConversions -import akka.actor._ +import akka.actor.{ ActorRef, ActorSystem } import akka.event.Logging.simpleName import akka.util.Subclassification import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec -object EventStream { - //Why is this here and why isn't there a failing test if it is removed? - implicit def fromActorSystem(system: ActorSystem) = system.eventStream -} - /** * An Akka EventStream is a pub-sub stream of events both system and user generated, * where subscribers are ActorRefs and the channels are Classes and Events are any java.lang.Object. @@ -25,7 +20,7 @@ object EventStream { * The debug flag in the constructor toggles if operations on this EventStream should also be published * as Debug-Events */ -class EventStream(private val debug: Boolean = false) extends LoggingBus with SubchannelClassification { +class EventStream(sys: ActorSystem, private val debug: Boolean = false) extends LoggingBus with SubchannelClassification { type Event = AnyRef type Classifier = Class[_] @@ -66,10 +61,17 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su unregisterIfNoMoreSubscribedChannels(subscriber) } + /** + * ''Must'' be called after actor system is "ready". + * Starts system actor that takes care of unsubscribing subscribers that have terminated. + */ + def startUnsubscriber() = EventStreamUnsubscriber.start(sys, this) + /** * INTERNAL API */ - private[akka] def initUnsubscriber(unsubscriber: ActorRef): Boolean = { + @tailrec + final private[akka] def initUnsubscriber(unsubscriber: ActorRef): Boolean = { initiallySubscribedOrUnsubscriber.get match { case value @ Left(subscribers) ⇒ if (initiallySubscribedOrUnsubscriber.compareAndSet(value, Right(unsubscriber))) { @@ -91,7 +93,8 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su /** * INTERNAL API */ - private[akka] def registerWithUnsubscriber(subscriber: ActorRef): Unit = { + @tailrec + private def registerWithUnsubscriber(subscriber: ActorRef): Unit = { initiallySubscribedOrUnsubscriber.get match { case value @ Left(subscribers) ⇒ if (!initiallySubscribedOrUnsubscriber.compareAndSet(value, Left(subscribers + subscriber))) @@ -109,7 +112,8 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su * because it's an expensive operation, and we don want to block client-code for that long, the Actor will eventually * catch up and perform the apropriate operation. */ - private[akka] def unregisterIfNoMoreSubscribedChannels(subscriber: ActorRef): Unit = { + @tailrec + private def unregisterIfNoMoreSubscribedChannels(subscriber: ActorRef): Unit = { initiallySubscribedOrUnsubscriber.get match { case value @ Left(subscribers) ⇒ if (!initiallySubscribedOrUnsubscriber.compareAndSet(value, Left(subscribers - subscriber))) diff --git a/akka-actor/src/main/scala/akka/event/EventStreamUnsubscriber.scala b/akka-actor/src/main/scala/akka/event/EventStreamUnsubscriber.scala deleted file mode 100644 index 7b85ee8e52..0000000000 --- a/akka-actor/src/main/scala/akka/event/EventStreamUnsubscriber.scala +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.event - -import akka.actor._ -import akka.event.Logging.simpleName - -/** - * Watches all actors which subscribe on the given event stream, and unsubscribes them from it when they are Terminated. - */ -class EventStreamUnsubscriber(eventStream: EventStream, debug: Boolean = false) extends Actor with ActorLogging { - - import EventStreamUnsubscriber._ - - override def preStart() { - if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"registering unsubscriber with $eventStream")) - eventStream initUnsubscriber self - } - - def receive = { - case Register(actor) ⇒ - if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"watching $actor in order to unsubscribe from EventStream when it terminates")) - context watch actor - - case UnregisterIfNoMoreSubscribedChannels(actor) if eventStream.hasSubscriptions(actor) ⇒ - // do nothing - // hasSubscriptions can be slow, but it's better for this actor to take the hit than the EventStream - - case UnregisterIfNoMoreSubscribedChannels(actor) ⇒ - if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"unwatching $actor, since has no subscriptions")) - context unwatch actor - - case Terminated(actor) ⇒ - if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"unsubscribe $actor from $eventStream, because it was terminated")) - eventStream unsubscribe actor - } -} - -object EventStreamUnsubscriber { - final case class Register(actor: ActorRef) - final case class UnregisterIfNoMoreSubscribedChannels(actor: ActorRef) -} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/event/japi/EventBusJavaAPI.scala b/akka-actor/src/main/scala/akka/event/japi/EventBusJavaAPI.scala index 448293914b..80f1abdbed 100644 --- a/akka-actor/src/main/scala/akka/event/japi/EventBusJavaAPI.scala +++ b/akka-actor/src/main/scala/akka/event/japi/EventBusJavaAPI.scala @@ -4,7 +4,7 @@ package akka.event.japi import akka.util.Subclassification -import akka.actor.ActorRef +import akka.actor.{ ActorSystem, ActorRef } /** * Java API: See documentation for [[akka.event.EventBus]] @@ -89,12 +89,11 @@ abstract class LookupEventBus[E, S, C] extends EventBus[E, S, C] { } /** - * See documentation for [[akka.event.SubchannelClassification]] + * Java API: See documentation for [[akka.event.SubchannelClassification]] * E is the Event type * S is the Subscriber type * C is the Classifier type */ - abstract class SubchannelEventBus[E, S, C] extends EventBus[E, S, C] { private val bus = new akka.event.EventBus with akka.event.SubchannelClassification { type Event = E @@ -134,7 +133,7 @@ abstract class SubchannelEventBus[E, S, C] extends EventBus[E, S, C] { } /** - * See documentation for [[akka.event.ScanningClassification]] + * Java API: See documentation for [[akka.event.ScanningClassification]] * E is the Event type * S is the Subscriber type * C is the Classifier type @@ -185,15 +184,17 @@ abstract class ScanningEventBus[E, S, C] extends EventBus[E, S, C] { } /** - * See documentation for [[akka.event.ActorClassification]] + * Java API: See documentation for [[akka.event.ActorClassification]] * An EventBus where the Subscribers are ActorRefs and the Classifier is ActorRef * Means that ActorRefs "listen" to other ActorRefs * E is the Event type */ -abstract class ActorEventBus[E] extends EventBus[E, ActorRef, ActorRef] { +abstract class ActorEventBus[E](system: ActorSystem) extends EventBus[E, ActorRef, ActorRef] { private val bus = new akka.event.ActorEventBus with akka.event.ActorClassification with akka.event.ActorClassifier { type Event = E + override val system = ActorEventBus.this.system + override protected def mapSize: Int = ActorEventBus.this.mapSize override protected def classify(event: E): ActorRef = diff --git a/akka-docs/rst/java/code/docs/event/EventBusDocTest.java b/akka-docs/rst/java/code/docs/event/EventBusDocTest.java index 9292673d0c..15969814c0 100644 --- a/akka-docs/rst/java/code/docs/event/EventBusDocTest.java +++ b/akka-docs/rst/java/code/docs/event/EventBusDocTest.java @@ -3,20 +3,27 @@ */ package docs.event; +import akka.event.japi.EventBus; + import java.util.concurrent.TimeUnit; import org.junit.ClassRule; import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; import akka.actor.ActorSystem; import akka.actor.ActorRef; +import akka.event.japi.*; import akka.testkit.AkkaJUnitActorSystemResource; import akka.testkit.JavaTestKit; import akka.event.japi.EventBus; +import akka.util.Subclassification; +import org.junit.ClassRule; +import org.junit.Test; +import scala.concurrent.duration.FiniteDuration; //#lookup-bus import akka.event.japi.LookupEventBus; +import java.util.concurrent.TimeUnit; //#lookup-bus @@ -226,6 +233,12 @@ public class EventBusDocTest { static //#actor-bus public class ActorBusImpl extends ActorEventBus { + + // the ActorSystem will be used for book-keeping operations, such as subscribers terminating + public ActorBusImpl(ActorSystem system) { + super(system); + } + // is used for extracting the classifier from the incoming events @Override public ActorRef classify(Notification event) { return event.ref; @@ -299,7 +312,7 @@ public class EventBusDocTest { JavaTestKit probe2 = new JavaTestKit(system); ActorRef subscriber1 = probe1.getRef(); ActorRef subscriber2 = probe2.getRef(); - ActorBusImpl actorBus = new ActorBusImpl(); + ActorBusImpl actorBus = new ActorBusImpl(system); actorBus.subscribe(subscriber1, observer1); actorBus.subscribe(subscriber2, observer1); actorBus.subscribe(subscriber2, observer2); diff --git a/akka-docs/rst/java/event-bus.rst b/akka-docs/rst/java/event-bus.rst index ae1bb7070d..93e02c7a7c 100644 --- a/akka-docs/rst/java/event-bus.rst +++ b/akka-docs/rst/java/event-bus.rst @@ -104,6 +104,8 @@ A test for this implementation may look like this: This classifier takes always a time which is proportional to the number of subscriptions, independent of how many actually match. +.. _actor-classification-java: + Actor Classification -------------------- @@ -111,6 +113,11 @@ This classification was originally developed specifically for implementing :ref:`DeathWatch `: subscribers as well as classifiers are of type :class:`ActorRef`. +This classification requires an :class:`ActorSystem` in order to perform book-keeping +operations related to the subscribers being Actors, which can terminate without first +unsubscribing from the EventBus. ActorClassification maitains a system Actor which +takes care of unsubscribing terminated actors automatically. + The necessary methods to be implemented are illustrated with the following example: .. includecode:: code/docs/event/EventBusDocTest.java#actor-bus @@ -141,6 +148,8 @@ it can be subscribed like this: .. includecode:: code/docs/event/LoggingDocTest.java#deadletters +Similarily to `Actor Classification`_, :class:`EventStream` will automatically remove subscibers when they terminate. + .. note:: The event stream is a *local facility*, meaning that it will *not* distribute events to other nodes in a clustered environment (unless you subscribe a Remote Actor to the stream explicitly). If you need to broadcast events in an Akka cluster, *without* knowing your recipients explicitly (i.e. obtaining their ActorRefs), you may want to look into: :ref:`distributed-pub-sub`. diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 7f22d9a8e0..7ea70ce474 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -19,6 +19,25 @@ In earlier versions of Akka `TestKit.remaining` returned the default timeout con AssertionError if called outside of within. The old behavior however can still be achieved by calling `TestKit.remainingOrDefault` instead. +EventStream and ActorClassification EventBus now require an ActorSystem +======================================================================= + +Both the ``EventStream`` (:ref:`Scala `, :ref:`Java `) and the +``ActorClassification`` Event Bus (:ref:`Scala `, :ref:`Java `) now +require an ``ActorSystem`` to properly operate. The reason for that is moving away from stateful internal lifecycle checks +to a fully reactive model for unsubscribing actors that have ``Terminated``. + +If you have implemented a custom event bus, you will need to pass in the actor system through the constructor now: + +.. includecode:: ../scala/code/docs/event/EventBusDocSpec.scala#actor-bus + +If you have been creating EventStreams manually, you now have to provide an actor system and *start the unsubscriber*: + +.. includecode:: ../../../akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala#event-bus-start-unsubscriber-scala + +Please note that this change affects you only if you have implemented your own busses, Akka's own ``context.eventStream`` +is still there and does not require any attention from you concerning this change. + Removed Deprecated Features =========================== diff --git a/akka-docs/rst/scala/code/docs/event/EventBusDocSpec.scala b/akka-docs/rst/scala/code/docs/event/EventBusDocSpec.scala index 19caf087dc..41abc3d42e 100644 --- a/akka-docs/rst/scala/code/docs/event/EventBusDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/event/EventBusDocSpec.scala @@ -5,7 +5,7 @@ package docs.event import scala.concurrent.duration._ import akka.testkit.AkkaSpec -import akka.actor.ActorRef +import akka.actor.{ ActorSystem, ActorRef } import akka.testkit.TestProbe object EventBusDocSpec { @@ -126,7 +126,7 @@ object EventBusDocSpec { final case class Notification(ref: ActorRef, id: Int) - class ActorBusImpl extends ActorEventBus with ActorClassifier with ActorClassification { + class ActorBusImpl(val system: ActorSystem) extends ActorEventBus with ActorClassifier with ActorClassification { type Event = Notification // is used for extracting the classifier from the incoming events @@ -187,7 +187,7 @@ class EventBusDocSpec extends AkkaSpec { val probe2 = TestProbe() val subscriber1 = probe1.ref val subscriber2 = probe2.ref - val actorBus = new ActorBusImpl + val actorBus = new ActorBusImpl(system) actorBus.subscribe(subscriber1, observer1) actorBus.subscribe(subscriber2, observer1) actorBus.subscribe(subscriber2, observer2) diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 6253446802..1292d2a517 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -84,16 +84,16 @@ trait PersistenceDocSpec { } //#deletion } - + class MyProcessor4 extends Processor { //#recovery-completed override def preStart(): Unit = { super.preStart() self ! "FIRST" } - + def receive = initializing.orElse(active) - + def initializing: Receive = { case "FIRST" => recoveryCompleted() @@ -102,7 +102,7 @@ trait PersistenceDocSpec { case other if recoveryFinished => stash() } - + def recoveryCompleted(): Unit = { // perform init after recovery, before any other messages // ... diff --git a/akka-docs/rst/scala/event-bus.rst b/akka-docs/rst/scala/event-bus.rst index b26af8491e..1690156620 100644 --- a/akka-docs/rst/scala/event-bus.rst +++ b/akka-docs/rst/scala/event-bus.rst @@ -104,6 +104,8 @@ A test for this implementation may look like this: This classifier takes always a time which is proportional to the number of subscriptions, independent of how many actually match. +.. _actor-classification-scala: + Actor Classification -------------------- @@ -111,6 +113,11 @@ This classification was originally developed specifically for implementing :ref:`DeathWatch `: subscribers as well as classifiers are of type :class:`ActorRef`. +This classification requires an :class:`ActorSystem` in order to perform book-keeping +operations related to the subscribers being Actors, which can terminate without first +unsubscribing from the EventBus. ActorClassification maitains a system Actor which +takes care of unsubscribing terminated actors automatically. + The necessary methods to be implemented are illustrated with the following example: .. includecode:: code/docs/event/EventBusDocSpec.scala#actor-bus @@ -136,6 +143,8 @@ how a simple subscription works: .. includecode:: code/docs/event/LoggingDocSpec.scala#deadletters +Similarily to `Actor Classification`_, :class:`EventStream` will automatically remove subscibers when they terminate. + .. note:: The event stream is a *local facility*, meaning that it will *not* distribute events to other nodes in a clustered environment (unless you subscribe a Remote Actor to the stream explicitly). If you need to broadcast events in an Akka cluster, *without* knowing your recipients explicitly (i.e. obtaining their ActorRefs), you may want to look into: :ref:`distributed-pub-sub`.