diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index b655a1ab21..ce5b345346 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -103,8 +103,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) @volatile private[cluster] var _latestStats = ClusterStats() - private[cluster] val eventBus: ClusterEventBus = new ClusterEventBus - // ======================================================== // ===================== WORK DAEMONS ===================== // ======================================================== @@ -167,7 +165,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } }).withDispatcher(UseDispatcher), name = "clusterEventBusListener") - eventBus.subscribe(listener, classOf[ClusterDomainEvent]) + subscribe(listener, classOf[ClusterDomainEvent]) listener } @@ -261,7 +259,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) * The `to` Class can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]] * or subclass. */ - def subscribe(subscriber: ActorRef, to: Class[_]): Unit = eventBus.subscribe(subscriber, to) + def subscribe(subscriber: ActorRef, to: Class[_]): Unit = system.eventStream.subscribe(subscriber, to) + + /** + * Subscribe to cluster domain events. + */ + def unsubscribe(subscriber: ActorRef): Unit = system.eventStream.unsubscribe(subscriber) /** * Try to join this cluster node with the node specified by 'address'. @@ -299,7 +302,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress) system.stop(clusterDaemons) - eventBus.unsubscribe(eventBusListener) + unsubscribe(eventBusListener) system.stop(eventBusListener) scheduler.close() diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 3414c19faf..9016349a84 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -8,6 +8,7 @@ import scala.concurrent.util.{ Deadline, Duration } import scala.concurrent.forkjoin.ThreadLocalRandom import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, RootActorPath, PoisonPill, Scheduler } import akka.actor.Status.Failure +import akka.event.EventStream import akka.routing.ScatterGatherFirstCompletedRouter import akka.util.Timeout import akka.pattern.{ AskTimeoutException, ask, pipe } @@ -125,7 +126,6 @@ private[cluster] trait ClusterEnvironment { private[cluster] def selfAddress: Address private[cluster] def scheduler: Scheduler private[cluster] def seedNodes: IndexedSeq[Address] - private[cluster] def eventBus: ClusterEventBus private[cluster] def shutdown(): Unit } @@ -808,17 +808,19 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) } def publishState(): Unit = { - environment.eventBus publish MembershipGossipChanged(latestGossip) - environment.eventBus publish InternalStatsChanged(stats) + eventStream publish MembershipGossipChanged(latestGossip) + eventStream publish InternalStatsChanged(stats) } def publishMembers(oldMembers: SortedSet[Member]): Unit = { val oldMembersStatus = oldMembers.map(m ⇒ (m.address, m.status)) val newMembersStatus = latestGossip.members.map(m ⇒ (m.address, m.status)) if (newMembersStatus != oldMembersStatus) - environment.eventBus publish MembersChanged(latestGossip.members) + eventStream publish MembersChanged(latestGossip.members) } + def eventStream: EventStream = context.system.eventStream + def ping(p: Ping): Unit = sender ! Pong(p) } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEventBus.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEventBus.scala deleted file mode 100644 index d5a7c7ee6f..0000000000 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEventBus.scala +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.cluster - -import akka.event.ActorEventBus -import akka.event.SubchannelClassification -import akka.actor.ActorRef -import akka.util.Subclassification - -/** - * Changes to the Cluster are published to this local event bus - * as [[akka.cluster.ClusterEvent.ClusterDomainEvent]] subclasses. - */ -class ClusterEventBus extends ActorEventBus with SubchannelClassification { - - type Event = AnyRef - type Classifier = Class[_] - - protected implicit val subclassification = new Subclassification[Class[_]] { - def isEqual(x: Class[_], y: Class[_]) = x == y - def isSubclass(x: Class[_], y: Class[_]) = y isAssignableFrom x - } - - protected def classify(event: AnyRef): Class[_] = event.getClass - - protected def publish(event: AnyRef, subscriber: ActorRef) = { - if (subscriber.isTerminated) unsubscribe(subscriber) - else subscriber ! event - } - -} \ No newline at end of file