Merge branch 'master' of github.com:jboner/akka
This commit is contained in:
commit
faa5b08dde
11 changed files with 353 additions and 161 deletions
3
.gitignore
vendored
3
.gitignore
vendored
|
|
@ -41,6 +41,7 @@ run-codefellow
|
|||
.classpath
|
||||
.idea
|
||||
.scala_dependencies
|
||||
.cache
|
||||
multiverse.log
|
||||
.eprj
|
||||
.*.swp
|
||||
|
|
@ -53,4 +54,4 @@ _akka_cluster/
|
|||
Makefile
|
||||
akka.sublime-project
|
||||
akka.sublime-workspace
|
||||
.target
|
||||
.target
|
||||
|
|
|
|||
|
|
@ -12,9 +12,15 @@ import akka.testkit._
|
|||
import akka.util.duration._
|
||||
import java.util.concurrent.atomic._
|
||||
import akka.actor.{ Props, Actor, ActorRef }
|
||||
import java.util.Comparator
|
||||
import akka.japi.{ Procedure, Function }
|
||||
|
||||
object EventBusSpec {
|
||||
|
||||
class TestActorWrapperActor(testActor: ActorRef) extends Actor {
|
||||
def receive = {
|
||||
case x ⇒ testActor forward x
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers with TestKit with BeforeAndAfterEach {
|
||||
|
|
@ -87,6 +93,7 @@ abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers
|
|||
bus.subscribe(subscriber, classifier)
|
||||
bus.publish(event)
|
||||
expectMsg(event)
|
||||
expectNoMsg(1 second)
|
||||
bus.unsubscribe(subscriber, classifier)
|
||||
}
|
||||
|
||||
|
|
@ -98,15 +105,17 @@ abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers
|
|||
expectMsg(event)
|
||||
expectMsg(event)
|
||||
expectMsg(event)
|
||||
expectNoMsg(1 second)
|
||||
bus.unsubscribe(subscriber, classifier)
|
||||
}
|
||||
|
||||
"publish the given event to all intended subscribers" in {
|
||||
val subscribers = Vector.fill(10)(createNewSubscriber())
|
||||
val range = 0 until 10
|
||||
val subscribers = range map (_ ⇒ createNewSubscriber())
|
||||
subscribers foreach { s ⇒ bus.subscribe(s, classifier) must be === true }
|
||||
bus.publish(event)
|
||||
(1 to 10) foreach { _ ⇒ expectMsg(event) }
|
||||
subscribers foreach disposeSubscriber
|
||||
range foreach { _ ⇒ expectMsg(event) }
|
||||
subscribers foreach { s ⇒ bus.unsubscribe(s, classifier) must be === true; disposeSubscriber(s) }
|
||||
}
|
||||
|
||||
"not publish the given event to any other subscribers than the intended ones" in {
|
||||
|
|
@ -135,29 +144,83 @@ abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers
|
|||
}
|
||||
|
||||
object ActorEventBusSpec {
|
||||
class ComposedActorEventBus extends ActorEventBus with LookupClassification with EventType[String] with ClassifierType[String] {
|
||||
def classify(event: String) = event
|
||||
def publish(event: String, subscriber: ActorRef) = subscriber ! event
|
||||
}
|
||||
class ComposedActorEventBus extends ActorEventBus with LookupClassification {
|
||||
type Event = Int
|
||||
type Classifier = String
|
||||
|
||||
class TestActorWrapperActor(testActor: ActorRef) extends Actor {
|
||||
def receive = {
|
||||
case x ⇒ testActor forward x
|
||||
}
|
||||
def classify(event: Event) = event.toString
|
||||
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = a compareTo b
|
||||
protected def mapSize = 32
|
||||
def publish(event: Event, subscriber: Subscriber) = subscriber ! event
|
||||
}
|
||||
}
|
||||
|
||||
class ActorEventBusSpec extends EventBusSpec("ActorEventBus") {
|
||||
import akka.event.ActorEventBusSpec._
|
||||
import EventBusSpec.TestActorWrapperActor
|
||||
|
||||
type BusType = ComposedActorEventBus
|
||||
def createNewEventBus(): BusType = new ComposedActorEventBus
|
||||
|
||||
def createEvents(numberOfEvents: Int) = (0 until numberOfEvents) map { _.toString }
|
||||
def createEvents(numberOfEvents: Int) = (0 until numberOfEvents)
|
||||
|
||||
def createSubscriber(pipeTo: ActorRef) = actorOf(Props(new TestActorWrapperActor(pipeTo)))
|
||||
|
||||
def classifierFor(event: BusType#Event) = event
|
||||
def classifierFor(event: BusType#Event) = event.toString
|
||||
|
||||
def disposeSubscriber(subscriber: BusType#Subscriber): Unit = subscriber.stop()
|
||||
}
|
||||
|
||||
object ScanningEventBusSpec {
|
||||
import akka.event.japi.ScanningEventBus
|
||||
|
||||
class MyScanningEventBus extends ScanningEventBus[Int, akka.japi.Procedure[Int], String] {
|
||||
protected def compareClassifiers(a: Classifier, b: Classifier): Int = a compareTo b
|
||||
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = System.identityHashCode(a) - System.identityHashCode(b)
|
||||
|
||||
protected def matches(classifier: Classifier, event: Event): Boolean = event.toString == classifier
|
||||
|
||||
protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber(event)
|
||||
}
|
||||
}
|
||||
|
||||
class ScanningEventBusSpec extends EventBusSpec("ScanningEventBus") {
|
||||
import ScanningEventBusSpec._
|
||||
|
||||
type BusType = MyScanningEventBus
|
||||
|
||||
def createNewEventBus(): BusType = new MyScanningEventBus
|
||||
|
||||
def createEvents(numberOfEvents: Int) = (0 until numberOfEvents)
|
||||
|
||||
def createSubscriber(pipeTo: ActorRef) = new Procedure[Int] { def apply(i: Int) = pipeTo ! i }
|
||||
|
||||
def classifierFor(event: BusType#Event) = event.toString
|
||||
|
||||
def disposeSubscriber(subscriber: BusType#Subscriber): Unit = ()
|
||||
}
|
||||
|
||||
object LookupEventBusSpec {
|
||||
class MyLookupEventBus extends akka.event.japi.LookupEventBus[Int, akka.japi.Procedure[Int], String] {
|
||||
protected def classify(event: Event): Classifier = event.toString
|
||||
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = System.identityHashCode(a) - System.identityHashCode(b)
|
||||
protected def mapSize = 32
|
||||
protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber(event)
|
||||
}
|
||||
}
|
||||
|
||||
class LookupEventBusSpec extends EventBusSpec("LookupEventBus") {
|
||||
import LookupEventBusSpec._
|
||||
|
||||
type BusType = MyLookupEventBus
|
||||
|
||||
def createNewEventBus(): BusType = new MyLookupEventBus
|
||||
|
||||
def createEvents(numberOfEvents: Int) = (0 until numberOfEvents)
|
||||
|
||||
def createSubscriber(pipeTo: ActorRef) = new Procedure[Int] { def apply(i: Int) = pipeTo ! i }
|
||||
|
||||
def classifierFor(event: BusType#Event) = event.toString
|
||||
|
||||
def disposeSubscriber(subscriber: BusType#Subscriber): Unit = ()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ abstract class AkkaPerformanceTest extends BenchmarkScenarios {
|
|||
val start = System.nanoTime
|
||||
val clients = (for (i ← 0 until numberOfClients) yield {
|
||||
val receiver = receivers(i % receivers.size)
|
||||
Props(new Client(receiver, orders, latch, repeatsPerClient + (if (i < oddRepeats) 1 else 0), delayMs)).withDispatcher(clientDispatcher)
|
||||
Props(new Client(receiver, orders, latch, repeatsPerClient + (if (i < oddRepeats) 1 else 0), sampling, delayMs)).withDispatcher(clientDispatcher)
|
||||
}).toList.map(actorOf(_))
|
||||
|
||||
clients.foreach(_ ! "run")
|
||||
|
|
@ -50,28 +50,35 @@ abstract class AkkaPerformanceTest extends BenchmarkScenarios {
|
|||
clients.foreach(_ ! PoisonPill)
|
||||
}
|
||||
|
||||
class Client(orderReceiver: ActorRef, orders: List[Order], latch: CountDownLatch, repeat: Int, delayMs: Int) extends Actor {
|
||||
def this(orderReceiver: ActorRef, orders: List[Order], latch: CountDownLatch, repeat: Int) {
|
||||
this(orderReceiver, orders, latch, repeat, 0)
|
||||
}
|
||||
class Client(
|
||||
orderReceiver: ActorRef,
|
||||
orders: List[Order],
|
||||
latch: CountDownLatch,
|
||||
repeat: Int,
|
||||
sampling: Int,
|
||||
delayMs: Int = 0) extends Actor {
|
||||
|
||||
def receive = {
|
||||
case "run" ⇒
|
||||
(1 to repeat).foreach(i ⇒
|
||||
{
|
||||
for (o ← orders) {
|
||||
var n = 0
|
||||
for (r ← 1 to repeat; o ← orders) {
|
||||
n += 1
|
||||
val rsp =
|
||||
if (n % sampling == 0) {
|
||||
val t0 = System.nanoTime
|
||||
val rsp = placeOrder(orderReceiver, o)
|
||||
val duration = System.nanoTime - t0
|
||||
stat.addValue(duration)
|
||||
if (!rsp.status) {
|
||||
EventHandler.error(this, "Invalid rsp")
|
||||
}
|
||||
delay(delayMs)
|
||||
rsp
|
||||
} else {
|
||||
placeOrder(orderReceiver, o)
|
||||
}
|
||||
})
|
||||
if (!rsp.status) {
|
||||
EventHandler.error(this, "Invalid rsp")
|
||||
}
|
||||
delay(delayMs)
|
||||
}
|
||||
latch.countDown()
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -49,6 +49,10 @@ trait PerformanceTest extends JUnitSuite {
|
|||
System.getProperty("benchmark.timeDilation", "1").toLong
|
||||
}
|
||||
|
||||
def sampling = {
|
||||
System.getProperty("benchmark.sampling", "100").toInt
|
||||
}
|
||||
|
||||
var stat: DescriptiveStatistics = _
|
||||
|
||||
val resultRepository = BenchResultRepository()
|
||||
|
|
@ -113,16 +117,18 @@ trait PerformanceTest extends JUnitSuite {
|
|||
75 -> (stat.getPercentile(75.0) / 1000).toLong,
|
||||
95 -> (stat.getPercentile(95.0) / 1000).toLong)
|
||||
|
||||
val n = stat.getN * sampling
|
||||
|
||||
val stats = Stats(
|
||||
name,
|
||||
load = numberOfClients,
|
||||
timestamp = TestStart.startTime,
|
||||
durationNanos = durationNs,
|
||||
n = stat.getN,
|
||||
n = n,
|
||||
min = (stat.getMin / 1000).toLong,
|
||||
max = (stat.getMax / 1000).toLong,
|
||||
mean = (stat.getMean / 1000).toLong,
|
||||
tps = (stat.getN.toDouble / durationS),
|
||||
tps = (n.toDouble / durationS),
|
||||
percentiles)
|
||||
|
||||
resultRepository.add(stats)
|
||||
|
|
|
|||
|
|
@ -780,21 +780,34 @@ trait Promise[T] extends Future[T] {
|
|||
|
||||
//Companion object to FState, just to provide a cheap, immutable default entry
|
||||
private[akka] object FState {
|
||||
val empty = new FState[Nothing]()
|
||||
def apply[T](): FState[T] = empty.asInstanceOf[FState[T]]
|
||||
}
|
||||
def apply[T](): FState[T] = EmptyPending.asInstanceOf[FState[T]]
|
||||
|
||||
/**
|
||||
* Represents the internal state of the DefaultCompletableFuture
|
||||
*/
|
||||
private[akka] case class FState[T](value: Option[Either[Throwable, T]] = None, listeners: List[Future[T] ⇒ Unit] = Nil)
|
||||
/**
|
||||
* Represents the internal state of the DefaultCompletableFuture
|
||||
*/
|
||||
|
||||
sealed trait FState[+T] { def value: Option[Either[Throwable, T]] }
|
||||
case class Pending[T](listeners: List[Future[T] ⇒ Unit] = Nil) extends FState[T] {
|
||||
def value: Option[Either[Throwable, T]] = None
|
||||
}
|
||||
case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
|
||||
def result: T = value.get.right.get
|
||||
}
|
||||
case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
|
||||
def exception: Throwable = value.get.left.get
|
||||
}
|
||||
case object Expired extends FState[Nothing] {
|
||||
def value: Option[Either[Throwable, Nothing]] = None
|
||||
}
|
||||
val EmptyPending = Pending[Nothing](Nil)
|
||||
}
|
||||
|
||||
/**
|
||||
* The default concrete Future implementation.
|
||||
*/
|
||||
class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDispatcher) extends AbstractPromise with Promise[T] {
|
||||
self ⇒
|
||||
|
||||
import FState.{ FState, Success, Failure, Pending, Expired }
|
||||
def this()(implicit dispatcher: MessageDispatcher) = this(Timeout.default)
|
||||
|
||||
def this(timeout: Long)(implicit dispatcher: MessageDispatcher) = this(Timeout(timeout))
|
||||
|
|
@ -842,7 +855,18 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].compareAndSet(this, oldState, newState)
|
||||
|
||||
@inline
|
||||
protected final def getState: FState[T] = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].get(this)
|
||||
protected final def getState: FState[T] = {
|
||||
|
||||
@tailrec
|
||||
def read(): FState[T] = {
|
||||
val cur = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].get(this)
|
||||
if (cur.isInstanceOf[Pending[_]] && isExpired) {
|
||||
if (updateState(cur, Expired)) Expired else read()
|
||||
} else cur
|
||||
}
|
||||
|
||||
read()
|
||||
}
|
||||
|
||||
def complete(value: Either[Throwable, T]): this.type = {
|
||||
val callbacks = {
|
||||
|
|
@ -850,15 +874,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
@tailrec
|
||||
def tryComplete: List[Future[T] ⇒ Unit] = {
|
||||
val cur = getState
|
||||
if (cur.value.isDefined) Nil
|
||||
else if ( /*cur.value.isEmpty && */ isExpired) {
|
||||
//Empty and expired, so remove listeners
|
||||
//TODO Perhaps cancel existing onTimeout listeners in the future here?
|
||||
updateState(cur, FState()) //Try to reset the state to the default, doesn't matter if it fails
|
||||
Nil
|
||||
} else {
|
||||
if (updateState(cur, FState(Option(value), Nil))) cur.listeners
|
||||
else tryComplete
|
||||
|
||||
cur match {
|
||||
case Pending(listeners) ⇒
|
||||
if (updateState(cur, if (value.isLeft) Failure(Some(value)) else Success(Some(value)))) listeners
|
||||
else tryComplete
|
||||
case _ ⇒ Nil
|
||||
}
|
||||
}
|
||||
tryComplete
|
||||
|
|
@ -876,10 +897,14 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
@tailrec //Returns whether the future has already been completed or not
|
||||
def tryAddCallback(): Boolean = {
|
||||
val cur = getState
|
||||
if (cur.value.isDefined) true
|
||||
else if (isExpired) false
|
||||
else if (updateState(cur, cur.copy(listeners = func :: cur.listeners))) false
|
||||
else tryAddCallback()
|
||||
cur match {
|
||||
case _: Success[_] | _: Failure[_] ⇒ true
|
||||
case Expired ⇒ false
|
||||
case p: Pending[_] ⇒
|
||||
val pt = p.asInstanceOf[Pending[T]]
|
||||
if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false
|
||||
else tryAddCallback()
|
||||
}
|
||||
}
|
||||
|
||||
if (tryAddCallback()) Future.dispatchTask(() ⇒ notifyCompleted(func))
|
||||
|
|
@ -912,10 +937,10 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
|
||||
final def orElse[A >: T](fallback: ⇒ A): Future[A] =
|
||||
if (timeout.duration.isFinite) {
|
||||
value match {
|
||||
case Some(_) ⇒ this
|
||||
case _ if isExpired ⇒ Future[A](fallback)
|
||||
case _ ⇒
|
||||
getState match {
|
||||
case _: Success[_] | _: Failure[_] ⇒ this
|
||||
case Expired ⇒ Future[A](fallback)
|
||||
case _: Pending[_] ⇒
|
||||
val promise = new DefaultPromise[A](Timeout.never) //TODO FIXME We can't have infinite timeout here, doesn't make sense.
|
||||
promise completeWith this
|
||||
val runnable = new Runnable {
|
||||
|
|
|
|||
|
|
@ -51,14 +51,23 @@ object ThreadPoolConfig {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Function0 without the fun stuff (mostly for the sake of the Java API side of things)
|
||||
*/
|
||||
trait ExecutorServiceFactory {
|
||||
def createExecutorService: ExecutorService
|
||||
}
|
||||
|
||||
/**
|
||||
* Generic way to specify an ExecutorService to a Dispatcher, create it with the given name if desired
|
||||
*/
|
||||
trait ExecutorServiceFactoryProvider {
|
||||
def createExecutorServiceFactory(name: String): ExecutorServiceFactory
|
||||
}
|
||||
|
||||
/**
|
||||
* A small configuration DSL to create ThreadPoolExecutors that can be provided as an ExecutorServiceFactoryProvider to Dispatcher
|
||||
*/
|
||||
case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout,
|
||||
corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize,
|
||||
maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
|
||||
|
|
@ -89,6 +98,9 @@ object ThreadPoolConfigDispatcherBuilder {
|
|||
def conf_?[T](opt: Option[T])(fun: (T) ⇒ ThreadPoolConfigDispatcherBuilder ⇒ ThreadPoolConfigDispatcherBuilder): Option[(ThreadPoolConfigDispatcherBuilder) ⇒ ThreadPoolConfigDispatcherBuilder] = opt map fun
|
||||
}
|
||||
|
||||
/**
|
||||
* A DSL to configure and create a MessageDispatcher with a ThreadPoolExecutor
|
||||
*/
|
||||
case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) ⇒ MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder {
|
||||
import ThreadPoolConfig._
|
||||
def build = dispatcherFactory(config)
|
||||
|
|
@ -223,6 +235,9 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* As the name says
|
||||
*/
|
||||
trait ExecutorServiceDelegate extends ExecutorService {
|
||||
|
||||
def executor: ExecutorService
|
||||
|
|
@ -254,6 +269,9 @@ trait ExecutorServiceDelegate extends ExecutorService {
|
|||
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
|
||||
}
|
||||
|
||||
/**
|
||||
* An ExecutorService that only creates the underlying Executor if any of the methods of the ExecutorService are called
|
||||
*/
|
||||
trait LazyExecutorService extends ExecutorServiceDelegate {
|
||||
|
||||
def createExecutor: ExecutorService
|
||||
|
|
@ -263,6 +281,9 @@ trait LazyExecutorService extends ExecutorServiceDelegate {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A concrete implementation of LazyExecutorService (Scala API)
|
||||
*/
|
||||
class LazyExecutorServiceWrapper(executorFactory: ⇒ ExecutorService) extends LazyExecutorService {
|
||||
def createExecutor = executorFactory
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,87 +9,169 @@ import akka.util.Index
|
|||
import java.util.concurrent.ConcurrentSkipListSet
|
||||
import java.util.Comparator
|
||||
|
||||
/**
|
||||
* Represents the base type for EventBuses
|
||||
* Internally has an Event type, a Classifier type and a Subscriber type
|
||||
*
|
||||
* For the Java API, @see akka.event.japi.*
|
||||
*/
|
||||
trait EventBus {
|
||||
type Event
|
||||
type Classifier
|
||||
type Subscriber
|
||||
|
||||
/**
|
||||
* Attempts to register the subscriber to the specified Classifier
|
||||
* @returns true if successful and false if not (because it was already subscribed to that Classifier, or otherwise)
|
||||
*/
|
||||
def subscribe(subscriber: Subscriber, to: Classifier): Boolean
|
||||
|
||||
/**
|
||||
* Attempts to deregister the subscriber from the specified Classifier
|
||||
* @returns true if successful and false if not (because it wasn't subscribed to that Classifier, or otherwise)
|
||||
*/
|
||||
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean
|
||||
|
||||
/**
|
||||
* Attempts to deregister the subscriber from all Classifiers it may be subscribed to
|
||||
*/
|
||||
def unsubscribe(subscriber: Subscriber): Unit
|
||||
|
||||
/**
|
||||
* Publishes the specified Event to this bus
|
||||
*/
|
||||
def publish(event: Event): Unit
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents an EventBus where the Subscriber type is ActorRef
|
||||
*/
|
||||
trait ActorEventBus extends EventBus {
|
||||
type Subscriber = ActorRef
|
||||
}
|
||||
|
||||
/**
|
||||
* Can be mixed into an EventBus to specify that the Classifier type is ActorRef
|
||||
*/
|
||||
trait ActorClassifier { self: EventBus ⇒
|
||||
type Classifier = ActorRef
|
||||
}
|
||||
|
||||
/**
|
||||
* Can be mixed into an EventBus to specify that the Classifier type is a Function from Event to Boolean (predicate)
|
||||
*/
|
||||
trait PredicateClassifier { self: EventBus ⇒
|
||||
type Classifier = Event ⇒ Boolean
|
||||
}
|
||||
|
||||
trait EventType[T] { self: EventBus ⇒
|
||||
type Event = T
|
||||
}
|
||||
|
||||
trait ClassifierType[T] { self: EventBus ⇒
|
||||
type Classifier = T
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps Subscribers to Classifiers using equality on Classifier to store a Set of Subscribers (hence the need for compareSubscribers)
|
||||
* Maps Events to Classifiers through the classify-method (so it knows who to publish to)
|
||||
*
|
||||
* The compareSubscribers need to provide a total ordering of the Subscribers
|
||||
*/
|
||||
trait LookupClassification { self: EventBus ⇒
|
||||
protected final val subscribers = new Index[Classifier, Subscriber]
|
||||
|
||||
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.put(to, subscriber)
|
||||
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove(from, subscriber)
|
||||
def unsubscribe(subscriber: Subscriber): Unit = subscribers.removeValue(subscriber)
|
||||
protected final val subscribers = new Index[Classifier, Subscriber](mapSize(), new Comparator[Subscriber] {
|
||||
def compare(a: Subscriber, b: Subscriber): Int = compareSubscribers(a, b)
|
||||
})
|
||||
|
||||
/**
|
||||
* This is a size hint for the number of Classifiers you expect to have (use powers of 2)
|
||||
*/
|
||||
protected def mapSize(): Int
|
||||
|
||||
/**
|
||||
* Provides a total ordering of Subscribers (think java.util.Comparator.compare)
|
||||
*/
|
||||
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int
|
||||
|
||||
/**
|
||||
* Returns the Classifier associated with the given Event
|
||||
*/
|
||||
protected def classify(event: Event): Classifier
|
||||
|
||||
/**
|
||||
* Publishes the given Event to the given Subscriber
|
||||
*/
|
||||
protected def publish(event: Event, subscriber: Subscriber): Unit
|
||||
|
||||
def publish(event: Event): Unit =
|
||||
subscribers.valueIterator(classify(event)).foreach(publish(event, _))
|
||||
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.put(to, subscriber)
|
||||
|
||||
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove(from, subscriber)
|
||||
|
||||
def unsubscribe(subscriber: Subscriber): Unit = subscribers.removeValue(subscriber)
|
||||
|
||||
def publish(event: Event): Unit = {
|
||||
val i = subscribers.valueIterator(classify(event))
|
||||
while (i.hasNext) publish(event, i.next())
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps Classifiers to Subscribers and selects which Subscriber should receive which publication through scanning through all Subscribers
|
||||
* through the matches(classifier, event) method
|
||||
*
|
||||
* Note: the compareClassifiers and compareSubscribers must together form an absolute ordering (think java.util.Comparator.compare)
|
||||
*/
|
||||
trait ScanningClassification { self: EventBus ⇒
|
||||
protected final val subscribers = new ConcurrentSkipListSet[(Classifier, Subscriber)](ordering)
|
||||
protected final val subscribers = new ConcurrentSkipListSet[(Classifier, Subscriber)](new Comparator[(Classifier, Subscriber)] {
|
||||
def compare(a: (Classifier, Subscriber), b: (Classifier, Subscriber)): Int = {
|
||||
val cM = compareClassifiers(a._1, b._1)
|
||||
if (cM != 0) cM
|
||||
else compareSubscribers(a._2, b._2)
|
||||
}
|
||||
})
|
||||
|
||||
/**
|
||||
* Provides a total ordering of Classifiers (think java.util.Comparator.compare)
|
||||
*/
|
||||
protected def compareClassifiers(a: Classifier, b: Classifier): Int
|
||||
|
||||
/**
|
||||
* Provides a total ordering of Subscribers (think java.util.Comparator.compare)
|
||||
*/
|
||||
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int
|
||||
|
||||
/**
|
||||
* Returns whether the specified Classifier matches the specified Event
|
||||
*/
|
||||
protected def matches(classifier: Classifier, event: Event): Boolean
|
||||
|
||||
/**
|
||||
* Publishes the specified Event to the specified Subscriber
|
||||
*/
|
||||
protected def publish(event: Event, subscriber: Subscriber): Unit
|
||||
|
||||
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.add((to, subscriber))
|
||||
|
||||
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove((from, subscriber))
|
||||
|
||||
def unsubscribe(subscriber: Subscriber): Unit = {
|
||||
val i = subscribers.iterator()
|
||||
while (i.hasNext) {
|
||||
val e = i.next()
|
||||
if (subscriber == e._2) i.remove()
|
||||
if (compareSubscribers(subscriber, e._2) == 0) i.remove()
|
||||
}
|
||||
}
|
||||
|
||||
protected def ordering: Comparator[(Classifier, Subscriber)]
|
||||
|
||||
protected def matches(classifier: Classifier, event: Event): Boolean
|
||||
|
||||
protected def publish(event: Event, subscriber: Subscriber): Unit
|
||||
|
||||
def publish(event: Event): Unit = {
|
||||
val currentSubscribers = subscribers.iterator()
|
||||
while (currentSubscribers.hasNext) {
|
||||
val (classifier, subscriber) = currentSubscribers.next()
|
||||
if (matches(classifier, event)) publish(event, subscriber)
|
||||
if (matches(classifier, event))
|
||||
publish(event, subscriber)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps ActorRefs to ActorRefs to form an EventBus where ActorRefs can listen to other ActorRefs
|
||||
*/
|
||||
trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import scala.annotation.tailrec
|
||||
|
||||
def mapSize: Int
|
||||
|
||||
protected val mappings = new ConcurrentHashMap[ActorRef, Vector[ActorRef]](mapSize)
|
||||
|
||||
@tailrec
|
||||
|
|
@ -170,8 +252,16 @@ trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Classifier associated with the specified Event
|
||||
*/
|
||||
protected def classify(event: Event): Classifier
|
||||
|
||||
/**
|
||||
* This is a size hint for the number of Classifiers you expect to have (use powers of 2)
|
||||
*/
|
||||
protected def mapSize: Int
|
||||
|
||||
def publish(event: Event): Unit = mappings.get(classify(event)) match {
|
||||
case null ⇒
|
||||
case raw: Vector[_] ⇒
|
||||
|
|
|
|||
38
akka-actor/src/main/scala/akka/event/EventBusJavaAPI.scala
Normal file
38
akka-actor/src/main/scala/akka/event/EventBusJavaAPI.scala
Normal file
|
|
@ -0,0 +1,38 @@
|
|||
package akka.event.japi
|
||||
|
||||
import akka.event._
|
||||
|
||||
/**
|
||||
* See documentation for akka.event.LookupClassification
|
||||
* E is the Event type
|
||||
* S is the Subscriber type
|
||||
* C is the Classifier type
|
||||
*/
|
||||
abstract class LookupEventBus[E, S, C] extends EventBus with LookupClassification {
|
||||
type Event = E
|
||||
type Subscriber = S
|
||||
type Classifier = C
|
||||
}
|
||||
|
||||
/**
|
||||
* See documentation for akka.event.ScanningClassification
|
||||
* E is the Event type
|
||||
* S is the Subscriber type
|
||||
* C is the Classifier type
|
||||
*/
|
||||
abstract class ScanningEventBus[E, S, C] extends EventBus with ScanningClassification {
|
||||
type Event = E
|
||||
type Subscriber = S
|
||||
type Classifier = C
|
||||
}
|
||||
|
||||
/**
|
||||
* See documentation for akka.event.ActorClassification
|
||||
* An EventBus where the Subscribers are ActorRefs and the Classifier is ActorRef
|
||||
* Means that ActorRefs "listen" to other ActorRefs
|
||||
* E is the Event type
|
||||
*/
|
||||
|
||||
abstract class ActorEventBus[E] extends akka.event.ActorEventBus with ActorClassification with ActorClassifier {
|
||||
|
||||
}
|
||||
|
|
@ -6,8 +6,8 @@ package akka.util
|
|||
|
||||
import annotation.tailrec
|
||||
|
||||
import java.util.{ Set ⇒ JSet }
|
||||
import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap }
|
||||
import java.util.{ Comparator, Set ⇒ JSet }
|
||||
|
||||
/**
|
||||
* An implementation of a ConcurrentMultiMap
|
||||
|
|
@ -16,8 +16,13 @@ import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap }
|
|||
*
|
||||
* @author Viktor Klang
|
||||
*/
|
||||
class Index[K, V] {
|
||||
private val container = new ConcurrentHashMap[K, JSet[V]]
|
||||
class Index[K, V](val mapSize: Int, val valueComparator: Comparator[V]) {
|
||||
|
||||
def this(mapSize: Int, cmp: (V, V) ⇒ Int) = this(mapSize, new Comparator[V] {
|
||||
def compare(a: V, b: V): Int = cmp(a, b)
|
||||
})
|
||||
|
||||
private val container = new ConcurrentHashMap[K, JSet[V]](mapSize)
|
||||
private val emptySet = new ConcurrentSkipListSet[V]
|
||||
|
||||
/**
|
||||
|
|
@ -41,7 +46,7 @@ class Index[K, V] {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
val newSet = new ConcurrentSkipListSet[V]
|
||||
val newSet = new ConcurrentSkipListSet[V](valueComparator)
|
||||
newSet add v
|
||||
|
||||
// Parry for two simultaneous putIfAbsent(id,newSet)
|
||||
|
|
@ -172,4 +177,4 @@ class Index[K, V] {
|
|||
*
|
||||
* @author Viktor Klang
|
||||
*/
|
||||
class ConcurrentMultiMap[K, V] extends Index[K, V]
|
||||
class ConcurrentMultiMap[K, V](mapSize: Int, valueComparator: Comparator[V]) extends Index[K, V](mapSize, valueComparator)
|
||||
|
|
|
|||
|
|
@ -48,17 +48,22 @@ htmlhelp_basename = 'Akkadoc'
|
|||
|
||||
# -- Options for LaTeX output --------------------------------------------------
|
||||
|
||||
def setup(app):
|
||||
from sphinx.util.texescape import tex_replacements
|
||||
tex_replacements.append((u'⇒', ur'\(\Rightarrow\)'))
|
||||
|
||||
latex_paper_size = 'a4'
|
||||
latex_font_size = '10pt'
|
||||
|
||||
latex_documents = [
|
||||
('index', 'Akka.tex', u' Akka Documentation',
|
||||
u'Scalable Solutions AB', 'manual'),
|
||||
u'Typesafe Inc', 'manual'),
|
||||
]
|
||||
|
||||
latex_elements = {
|
||||
'classoptions': ',oneside,openany',
|
||||
'babel': '\\usepackage[english]{babel}',
|
||||
'fontpkg': '\\PassOptionsToPackage{warn}{textcomp} \\usepackage{times}',
|
||||
'preamble': '\\definecolor{VerbatimColor}{rgb}{0.935,0.935,0.935}'
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ import java.net.InetSocketAddress
|
|||
import java.util.concurrent._
|
||||
import java.util.concurrent.atomic._
|
||||
import akka.AkkaException
|
||||
import java.util.Comparator
|
||||
|
||||
class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
|
||||
def this(msg: String) = this(msg, null);
|
||||
|
|
@ -57,7 +58,6 @@ object RemoteEncoder {
|
|||
trait NettyRemoteClientModule extends RemoteClientModule {
|
||||
self: ListenerManagement ⇒
|
||||
private val remoteClients = new HashMap[RemoteAddress, RemoteClient]
|
||||
private val remoteActors = new Index[RemoteAddress, Uuid]
|
||||
private val lock = new ReadWriteGuard
|
||||
|
||||
protected[akka] def send[T](message: Any,
|
||||
|
|
@ -147,18 +147,11 @@ abstract class RemoteClient private[akka] (
|
|||
val module: NettyRemoteClientModule,
|
||||
val remoteAddress: InetSocketAddress) {
|
||||
|
||||
val useTransactionLog = config.getBool("akka.remote.client.buffering.retry-message-send-on-failure", false)
|
||||
val transactionLogCapacity = config.getInt("akka.remote.client.buffering.capacity", -1)
|
||||
|
||||
val name = this.getClass.getSimpleName + "@" +
|
||||
remoteAddress.getAddress.getHostAddress + "::" +
|
||||
remoteAddress.getPort
|
||||
|
||||
protected val futures = new ConcurrentHashMap[Uuid, Promise[_]]
|
||||
protected val pendingRequests = {
|
||||
if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)]
|
||||
else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity)
|
||||
}
|
||||
|
||||
private[remote] val runSwitch = new Switch()
|
||||
|
||||
|
|
@ -172,19 +165,6 @@ abstract class RemoteClient private[akka] (
|
|||
|
||||
def shutdown(): Boolean
|
||||
|
||||
/**
|
||||
* Returns an array with the current pending messages not yet delivered.
|
||||
*/
|
||||
def pendingMessages: Array[Any] = {
|
||||
var messages = Vector[Any]()
|
||||
val iter = pendingRequests.iterator
|
||||
while (iter.hasNext) {
|
||||
val (_, _, message) = iter.next
|
||||
messages = messages :+ MessageSerializer.deserialize(message.getMessage)
|
||||
}
|
||||
messages.toArray
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts the message to the wireprotocol and sends the message across the wire
|
||||
*/
|
||||
|
|
@ -220,13 +200,7 @@ abstract class RemoteClient private[akka] (
|
|||
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
notifyListeners(RemoteClientError(e, module, remoteAddress))
|
||||
|
||||
if (useTransactionLog && !pendingRequests.offer((true, null, request))) { // Add the request to the tx log after a failing send
|
||||
pendingRequests.clear()
|
||||
throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached")
|
||||
}
|
||||
case e: Exception ⇒ notifyListeners(RemoteClientError(e, module, remoteAddress))
|
||||
}
|
||||
None
|
||||
|
||||
|
|
@ -240,14 +214,8 @@ abstract class RemoteClient private[akka] (
|
|||
futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails
|
||||
|
||||
def handleRequestReplyError(future: ChannelFuture) = {
|
||||
if (useTransactionLog && !pendingRequests.offer((false, futureUuid, request))) { // Add the request to the tx log after a failing send
|
||||
pendingRequests.clear()
|
||||
throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached")
|
||||
|
||||
} else {
|
||||
val f = futures.remove(futureUuid) // Clean up future
|
||||
if (f ne null) f.completeWithException(future.getCause)
|
||||
}
|
||||
val f = futures.remove(futureUuid) // Clean up future
|
||||
if (f ne null) f.completeWithException(future.getCause)
|
||||
}
|
||||
|
||||
var future: ChannelFuture = null
|
||||
|
|
@ -275,41 +243,6 @@ abstract class RemoteClient private[akka] (
|
|||
throw exception
|
||||
}
|
||||
}
|
||||
|
||||
private[remote] def sendPendingRequests() = pendingRequests synchronized {
|
||||
// ensure only one thread at a time can flush the log
|
||||
val nrOfMessages = pendingRequests.size
|
||||
if (nrOfMessages > 0) EventHandler.info(this, "Resending [%s] previously failed messages after remote client reconnect" format nrOfMessages)
|
||||
var pendingRequest = pendingRequests.peek
|
||||
|
||||
while (pendingRequest ne null) {
|
||||
val (isOneWay, futureUuid, message) = pendingRequest
|
||||
|
||||
if (isOneWay) {
|
||||
// tell
|
||||
val future = currentChannel.write(RemoteEncoder.encode(message))
|
||||
future.awaitUninterruptibly()
|
||||
|
||||
if (future.isCancelled && !future.isSuccess) {
|
||||
notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress))
|
||||
}
|
||||
|
||||
} else {
|
||||
// ask
|
||||
val future = currentChannel.write(RemoteEncoder.encode(message))
|
||||
future.awaitUninterruptibly()
|
||||
|
||||
if (future.isCancelled || !future.isSuccess) {
|
||||
val f = futures.remove(futureUuid) // Clean up future
|
||||
if (f ne null) f.completeWithException(future.getCause)
|
||||
notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress))
|
||||
}
|
||||
}
|
||||
|
||||
pendingRequests.remove(pendingRequest)
|
||||
pendingRequest = pendingRequests.peek // try to grab next message
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -440,7 +373,6 @@ class ActiveRemoteClient private[akka] (
|
|||
bootstrap.releaseExternalResources()
|
||||
bootstrap = null
|
||||
connection = null
|
||||
pendingRequests.clear()
|
||||
|
||||
EventHandler.info(this, "[%s] has been shut down".format(name))
|
||||
}
|
||||
|
|
@ -555,7 +487,6 @@ class ActiveRemoteClientHandler(
|
|||
|
||||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
try {
|
||||
if (client.useTransactionLog) client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect
|
||||
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
|
||||
EventHandler.debug(this, "Remote client connected to [%s]".format(ctx.getChannel.getRemoteAddress))
|
||||
client.resetReconnectionTimeWindow
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue