!act,doc #3893 Removed isTerminated checks from ActorClassification

Instead of isTerminated we now use death watch on subscribers.

! Breaking change - ActorClassification based event buses now require
  and actor system. Previously no actors were involved, but now someone
  has to `watch` the subscribers. The unsubscriber is an system actor,
  and won't be stopped automagically if a bus stops to be used (hard to
  determine what "stops being used" is)
* Replaced isTerminated checks with watching actors
* backing structure for ActorClassification swaped from
  ConcurrentHashMap to immutable.Map with CAS operations on it. This is
  required to avoid races and guarantee register/unregister ordering
  (messages sent with proper sequence numbers) to the unsubscriber.
  Performance tested it and still above 1.3million subscribe+unsubscribe
  ops per second (mac i7, retina), where as the CHM version was
  4 million - but that one could only work in the presence of
  itTerminated - so we pay the price here for removing it.
* `ActorClassification` starts the unsubscriber instance by itself,
  the unsubscriber is an system actor, and can be stopped via
  `ActorClassification#shutdown`
* Will unregister from unsubscriber, when no more subscriptions for
  given subscriber are left in this bus.
* Added missing "Java API: " for some types
* Updated docs to point out the automatic subscriber purging (on terminated)
This commit is contained in:
Konrad Malawski 2014-02-22 23:25:54 +00:00 committed by Konrad 'ktoso' Malawski
parent f57470926e
commit c046cdff0a
14 changed files with 471 additions and 157 deletions

View file

@ -5,7 +5,7 @@ package docs.event
import scala.concurrent.duration._
import akka.testkit.AkkaSpec
import akka.actor.ActorRef
import akka.actor.{ ActorSystem, ActorRef }
import akka.testkit.TestProbe
object EventBusDocSpec {
@ -126,7 +126,7 @@ object EventBusDocSpec {
final case class Notification(ref: ActorRef, id: Int)
class ActorBusImpl extends ActorEventBus with ActorClassifier with ActorClassification {
class ActorBusImpl(val system: ActorSystem) extends ActorEventBus with ActorClassifier with ActorClassification {
type Event = Notification
// is used for extracting the classifier from the incoming events
@ -187,7 +187,7 @@ class EventBusDocSpec extends AkkaSpec {
val probe2 = TestProbe()
val subscriber1 = probe1.ref
val subscriber2 = probe2.ref
val actorBus = new ActorBusImpl
val actorBus = new ActorBusImpl(system)
actorBus.subscribe(subscriber1, observer1)
actorBus.subscribe(subscriber2, observer1)
actorBus.subscribe(subscriber2, observer2)

View file

@ -84,16 +84,16 @@ trait PersistenceDocSpec {
}
//#deletion
}
class MyProcessor4 extends Processor {
//#recovery-completed
override def preStart(): Unit = {
super.preStart()
self ! "FIRST"
}
def receive = initializing.orElse(active)
def initializing: Receive = {
case "FIRST" =>
recoveryCompleted()
@ -102,7 +102,7 @@ trait PersistenceDocSpec {
case other if recoveryFinished =>
stash()
}
def recoveryCompleted(): Unit = {
// perform init after recovery, before any other messages
// ...