diff --git a/.gitignore b/.gitignore index 4052098935..bdd7ccc6a0 100755 --- a/.gitignore +++ b/.gitignore @@ -41,6 +41,7 @@ run-codefellow .classpath .idea .scala_dependencies +.cache multiverse.log .eprj .*.swp @@ -53,4 +54,4 @@ _akka_cluster/ Makefile akka.sublime-project akka.sublime-workspace -.target \ No newline at end of file +.target diff --git a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala index f4646a4b1d..df36a59c09 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala @@ -12,9 +12,15 @@ import akka.testkit._ import akka.util.duration._ import java.util.concurrent.atomic._ import akka.actor.{ Props, Actor, ActorRef } +import java.util.Comparator +import akka.japi.{ Procedure, Function } object EventBusSpec { - + class TestActorWrapperActor(testActor: ActorRef) extends Actor { + def receive = { + case x ⇒ testActor forward x + } + } } abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers with TestKit with BeforeAndAfterEach { @@ -87,6 +93,7 @@ abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers bus.subscribe(subscriber, classifier) bus.publish(event) expectMsg(event) + expectNoMsg(1 second) bus.unsubscribe(subscriber, classifier) } @@ -98,15 +105,17 @@ abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers expectMsg(event) expectMsg(event) expectMsg(event) + expectNoMsg(1 second) bus.unsubscribe(subscriber, classifier) } "publish the given event to all intended subscribers" in { - val subscribers = Vector.fill(10)(createNewSubscriber()) + val range = 0 until 10 + val subscribers = range map (_ ⇒ createNewSubscriber()) subscribers foreach { s ⇒ bus.subscribe(s, classifier) must be === true } bus.publish(event) - (1 to 10) foreach { _ ⇒ expectMsg(event) } - subscribers foreach disposeSubscriber + range foreach { _ ⇒ expectMsg(event) } + subscribers foreach { s ⇒ bus.unsubscribe(s, classifier) must be === true; disposeSubscriber(s) } } "not publish the given event to any other subscribers than the intended ones" in { @@ -135,29 +144,83 @@ abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers } object ActorEventBusSpec { - class ComposedActorEventBus extends ActorEventBus with LookupClassification with EventType[String] with ClassifierType[String] { - def classify(event: String) = event - def publish(event: String, subscriber: ActorRef) = subscriber ! event - } + class ComposedActorEventBus extends ActorEventBus with LookupClassification { + type Event = Int + type Classifier = String - class TestActorWrapperActor(testActor: ActorRef) extends Actor { - def receive = { - case x ⇒ testActor forward x - } + def classify(event: Event) = event.toString + protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = a compareTo b + protected def mapSize = 32 + def publish(event: Event, subscriber: Subscriber) = subscriber ! event } } class ActorEventBusSpec extends EventBusSpec("ActorEventBus") { import akka.event.ActorEventBusSpec._ + import EventBusSpec.TestActorWrapperActor type BusType = ComposedActorEventBus def createNewEventBus(): BusType = new ComposedActorEventBus - def createEvents(numberOfEvents: Int) = (0 until numberOfEvents) map { _.toString } + def createEvents(numberOfEvents: Int) = (0 until numberOfEvents) def createSubscriber(pipeTo: ActorRef) = actorOf(Props(new TestActorWrapperActor(pipeTo))) - def classifierFor(event: BusType#Event) = event + def classifierFor(event: BusType#Event) = event.toString def disposeSubscriber(subscriber: BusType#Subscriber): Unit = subscriber.stop() } + +object ScanningEventBusSpec { + import akka.event.japi.ScanningEventBus + + class MyScanningEventBus extends ScanningEventBus[Int, akka.japi.Procedure[Int], String] { + protected def compareClassifiers(a: Classifier, b: Classifier): Int = a compareTo b + protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = System.identityHashCode(a) - System.identityHashCode(b) + + protected def matches(classifier: Classifier, event: Event): Boolean = event.toString == classifier + + protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber(event) + } +} + +class ScanningEventBusSpec extends EventBusSpec("ScanningEventBus") { + import ScanningEventBusSpec._ + + type BusType = MyScanningEventBus + + def createNewEventBus(): BusType = new MyScanningEventBus + + def createEvents(numberOfEvents: Int) = (0 until numberOfEvents) + + def createSubscriber(pipeTo: ActorRef) = new Procedure[Int] { def apply(i: Int) = pipeTo ! i } + + def classifierFor(event: BusType#Event) = event.toString + + def disposeSubscriber(subscriber: BusType#Subscriber): Unit = () +} + +object LookupEventBusSpec { + class MyLookupEventBus extends akka.event.japi.LookupEventBus[Int, akka.japi.Procedure[Int], String] { + protected def classify(event: Event): Classifier = event.toString + protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = System.identityHashCode(a) - System.identityHashCode(b) + protected def mapSize = 32 + protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber(event) + } +} + +class LookupEventBusSpec extends EventBusSpec("LookupEventBus") { + import LookupEventBusSpec._ + + type BusType = MyLookupEventBus + + def createNewEventBus(): BusType = new MyLookupEventBus + + def createEvents(numberOfEvents: Int) = (0 until numberOfEvents) + + def createSubscriber(pipeTo: ActorRef) = new Procedure[Int] { def apply(i: Int) = pipeTo ! i } + + def classifierFor(event: BusType#Event) = event.toString + + def disposeSubscriber(subscriber: BusType#Subscriber): Unit = () +} diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/AkkaPerformanceTest.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/AkkaPerformanceTest.scala index 99d204a8a7..e821d15063 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/AkkaPerformanceTest.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/AkkaPerformanceTest.scala @@ -37,7 +37,7 @@ abstract class AkkaPerformanceTest extends BenchmarkScenarios { val start = System.nanoTime val clients = (for (i ← 0 until numberOfClients) yield { val receiver = receivers(i % receivers.size) - Props(new Client(receiver, orders, latch, repeatsPerClient + (if (i < oddRepeats) 1 else 0), delayMs)).withDispatcher(clientDispatcher) + Props(new Client(receiver, orders, latch, repeatsPerClient + (if (i < oddRepeats) 1 else 0), sampling, delayMs)).withDispatcher(clientDispatcher) }).toList.map(actorOf(_)) clients.foreach(_ ! "run") @@ -50,28 +50,35 @@ abstract class AkkaPerformanceTest extends BenchmarkScenarios { clients.foreach(_ ! PoisonPill) } - class Client(orderReceiver: ActorRef, orders: List[Order], latch: CountDownLatch, repeat: Int, delayMs: Int) extends Actor { - def this(orderReceiver: ActorRef, orders: List[Order], latch: CountDownLatch, repeat: Int) { - this(orderReceiver, orders, latch, repeat, 0) - } + class Client( + orderReceiver: ActorRef, + orders: List[Order], + latch: CountDownLatch, + repeat: Int, + sampling: Int, + delayMs: Int = 0) extends Actor { def receive = { case "run" ⇒ - (1 to repeat).foreach(i ⇒ - { - for (o ← orders) { + var n = 0 + for (r ← 1 to repeat; o ← orders) { + n += 1 + val rsp = + if (n % sampling == 0) { val t0 = System.nanoTime val rsp = placeOrder(orderReceiver, o) val duration = System.nanoTime - t0 stat.addValue(duration) - if (!rsp.status) { - EventHandler.error(this, "Invalid rsp") - } - delay(delayMs) + rsp + } else { + placeOrder(orderReceiver, o) } - }) + if (!rsp.status) { + EventHandler.error(this, "Invalid rsp") + } + delay(delayMs) + } latch.countDown() - } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala index 95963f1b5c..f522be7ddb 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala @@ -49,6 +49,10 @@ trait PerformanceTest extends JUnitSuite { System.getProperty("benchmark.timeDilation", "1").toLong } + def sampling = { + System.getProperty("benchmark.sampling", "100").toInt + } + var stat: DescriptiveStatistics = _ val resultRepository = BenchResultRepository() @@ -113,16 +117,18 @@ trait PerformanceTest extends JUnitSuite { 75 -> (stat.getPercentile(75.0) / 1000).toLong, 95 -> (stat.getPercentile(95.0) / 1000).toLong) + val n = stat.getN * sampling + val stats = Stats( name, load = numberOfClients, timestamp = TestStart.startTime, durationNanos = durationNs, - n = stat.getN, + n = n, min = (stat.getMin / 1000).toLong, max = (stat.getMax / 1000).toLong, mean = (stat.getMean / 1000).toLong, - tps = (stat.getN.toDouble / durationS), + tps = (n.toDouble / durationS), percentiles) resultRepository.add(stats) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 5f9f779b81..4a92d0b8f0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -780,21 +780,34 @@ trait Promise[T] extends Future[T] { //Companion object to FState, just to provide a cheap, immutable default entry private[akka] object FState { - val empty = new FState[Nothing]() - def apply[T](): FState[T] = empty.asInstanceOf[FState[T]] -} + def apply[T](): FState[T] = EmptyPending.asInstanceOf[FState[T]] -/** - * Represents the internal state of the DefaultCompletableFuture - */ -private[akka] case class FState[T](value: Option[Either[Throwable, T]] = None, listeners: List[Future[T] ⇒ Unit] = Nil) + /** + * Represents the internal state of the DefaultCompletableFuture + */ + + sealed trait FState[+T] { def value: Option[Either[Throwable, T]] } + case class Pending[T](listeners: List[Future[T] ⇒ Unit] = Nil) extends FState[T] { + def value: Option[Either[Throwable, T]] = None + } + case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { + def result: T = value.get.right.get + } + case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { + def exception: Throwable = value.get.left.get + } + case object Expired extends FState[Nothing] { + def value: Option[Either[Throwable, Nothing]] = None + } + val EmptyPending = Pending[Nothing](Nil) +} /** * The default concrete Future implementation. */ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDispatcher) extends AbstractPromise with Promise[T] { self ⇒ - + import FState.{ FState, Success, Failure, Pending, Expired } def this()(implicit dispatcher: MessageDispatcher) = this(Timeout.default) def this(timeout: Long)(implicit dispatcher: MessageDispatcher) = this(Timeout(timeout)) @@ -842,7 +855,18 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].compareAndSet(this, oldState, newState) @inline - protected final def getState: FState[T] = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].get(this) + protected final def getState: FState[T] = { + + @tailrec + def read(): FState[T] = { + val cur = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].get(this) + if (cur.isInstanceOf[Pending[_]] && isExpired) { + if (updateState(cur, Expired)) Expired else read() + } else cur + } + + read() + } def complete(value: Either[Throwable, T]): this.type = { val callbacks = { @@ -850,15 +874,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi @tailrec def tryComplete: List[Future[T] ⇒ Unit] = { val cur = getState - if (cur.value.isDefined) Nil - else if ( /*cur.value.isEmpty && */ isExpired) { - //Empty and expired, so remove listeners - //TODO Perhaps cancel existing onTimeout listeners in the future here? - updateState(cur, FState()) //Try to reset the state to the default, doesn't matter if it fails - Nil - } else { - if (updateState(cur, FState(Option(value), Nil))) cur.listeners - else tryComplete + + cur match { + case Pending(listeners) ⇒ + if (updateState(cur, if (value.isLeft) Failure(Some(value)) else Success(Some(value)))) listeners + else tryComplete + case _ ⇒ Nil } } tryComplete @@ -876,10 +897,14 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi @tailrec //Returns whether the future has already been completed or not def tryAddCallback(): Boolean = { val cur = getState - if (cur.value.isDefined) true - else if (isExpired) false - else if (updateState(cur, cur.copy(listeners = func :: cur.listeners))) false - else tryAddCallback() + cur match { + case _: Success[_] | _: Failure[_] ⇒ true + case Expired ⇒ false + case p: Pending[_] ⇒ + val pt = p.asInstanceOf[Pending[T]] + if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false + else tryAddCallback() + } } if (tryAddCallback()) Future.dispatchTask(() ⇒ notifyCompleted(func)) @@ -912,10 +937,10 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi final def orElse[A >: T](fallback: ⇒ A): Future[A] = if (timeout.duration.isFinite) { - value match { - case Some(_) ⇒ this - case _ if isExpired ⇒ Future[A](fallback) - case _ ⇒ + getState match { + case _: Success[_] | _: Failure[_] ⇒ this + case Expired ⇒ Future[A](fallback) + case _: Pending[_] ⇒ val promise = new DefaultPromise[A](Timeout.never) //TODO FIXME We can't have infinite timeout here, doesn't make sense. promise completeWith this val runnable = new Runnable { diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 314796d61b..329273ffa0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -51,14 +51,23 @@ object ThreadPoolConfig { } } +/** + * Function0 without the fun stuff (mostly for the sake of the Java API side of things) + */ trait ExecutorServiceFactory { def createExecutorService: ExecutorService } +/** + * Generic way to specify an ExecutorService to a Dispatcher, create it with the given name if desired + */ trait ExecutorServiceFactoryProvider { def createExecutorServiceFactory(name: String): ExecutorServiceFactory } +/** + * A small configuration DSL to create ThreadPoolExecutors that can be provided as an ExecutorServiceFactoryProvider to Dispatcher + */ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout, corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize, maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize, @@ -89,6 +98,9 @@ object ThreadPoolConfigDispatcherBuilder { def conf_?[T](opt: Option[T])(fun: (T) ⇒ ThreadPoolConfigDispatcherBuilder ⇒ ThreadPoolConfigDispatcherBuilder): Option[(ThreadPoolConfigDispatcherBuilder) ⇒ ThreadPoolConfigDispatcherBuilder] = opt map fun } +/** + * A DSL to configure and create a MessageDispatcher with a ThreadPoolExecutor + */ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) ⇒ MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder { import ThreadPoolConfig._ def build = dispatcherFactory(config) @@ -223,6 +235,9 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend } } +/** + * As the name says + */ trait ExecutorServiceDelegate extends ExecutorService { def executor: ExecutorService @@ -254,6 +269,9 @@ trait ExecutorServiceDelegate extends ExecutorService { def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit) } +/** + * An ExecutorService that only creates the underlying Executor if any of the methods of the ExecutorService are called + */ trait LazyExecutorService extends ExecutorServiceDelegate { def createExecutor: ExecutorService @@ -263,6 +281,9 @@ trait LazyExecutorService extends ExecutorServiceDelegate { } } +/** + * A concrete implementation of LazyExecutorService (Scala API) + */ class LazyExecutorServiceWrapper(executorFactory: ⇒ ExecutorService) extends LazyExecutorService { def createExecutor = executorFactory } diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala index 29bf3cd5a1..33319fbb13 100644 --- a/akka-actor/src/main/scala/akka/event/EventBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -9,87 +9,169 @@ import akka.util.Index import java.util.concurrent.ConcurrentSkipListSet import java.util.Comparator +/** + * Represents the base type for EventBuses + * Internally has an Event type, a Classifier type and a Subscriber type + * + * For the Java API, @see akka.event.japi.* + */ trait EventBus { type Event type Classifier type Subscriber + /** + * Attempts to register the subscriber to the specified Classifier + * @returns true if successful and false if not (because it was already subscribed to that Classifier, or otherwise) + */ def subscribe(subscriber: Subscriber, to: Classifier): Boolean + + /** + * Attempts to deregister the subscriber from the specified Classifier + * @returns true if successful and false if not (because it wasn't subscribed to that Classifier, or otherwise) + */ def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean + + /** + * Attempts to deregister the subscriber from all Classifiers it may be subscribed to + */ def unsubscribe(subscriber: Subscriber): Unit + /** + * Publishes the specified Event to this bus + */ def publish(event: Event): Unit } +/** + * Represents an EventBus where the Subscriber type is ActorRef + */ trait ActorEventBus extends EventBus { type Subscriber = ActorRef } +/** + * Can be mixed into an EventBus to specify that the Classifier type is ActorRef + */ trait ActorClassifier { self: EventBus ⇒ type Classifier = ActorRef } +/** + * Can be mixed into an EventBus to specify that the Classifier type is a Function from Event to Boolean (predicate) + */ trait PredicateClassifier { self: EventBus ⇒ type Classifier = Event ⇒ Boolean } -trait EventType[T] { self: EventBus ⇒ - type Event = T -} - -trait ClassifierType[T] { self: EventBus ⇒ - type Classifier = T -} - +/** + * Maps Subscribers to Classifiers using equality on Classifier to store a Set of Subscribers (hence the need for compareSubscribers) + * Maps Events to Classifiers through the classify-method (so it knows who to publish to) + * + * The compareSubscribers need to provide a total ordering of the Subscribers + */ trait LookupClassification { self: EventBus ⇒ - protected final val subscribers = new Index[Classifier, Subscriber] - def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.put(to, subscriber) - def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove(from, subscriber) - def unsubscribe(subscriber: Subscriber): Unit = subscribers.removeValue(subscriber) + protected final val subscribers = new Index[Classifier, Subscriber](mapSize(), new Comparator[Subscriber] { + def compare(a: Subscriber, b: Subscriber): Int = compareSubscribers(a, b) + }) + /** + * This is a size hint for the number of Classifiers you expect to have (use powers of 2) + */ + protected def mapSize(): Int + + /** + * Provides a total ordering of Subscribers (think java.util.Comparator.compare) + */ + protected def compareSubscribers(a: Subscriber, b: Subscriber): Int + + /** + * Returns the Classifier associated with the given Event + */ protected def classify(event: Event): Classifier + /** + * Publishes the given Event to the given Subscriber + */ protected def publish(event: Event, subscriber: Subscriber): Unit - def publish(event: Event): Unit = - subscribers.valueIterator(classify(event)).foreach(publish(event, _)) + def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.put(to, subscriber) + + def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove(from, subscriber) + + def unsubscribe(subscriber: Subscriber): Unit = subscribers.removeValue(subscriber) + + def publish(event: Event): Unit = { + val i = subscribers.valueIterator(classify(event)) + while (i.hasNext) publish(event, i.next()) + } } +/** + * Maps Classifiers to Subscribers and selects which Subscriber should receive which publication through scanning through all Subscribers + * through the matches(classifier, event) method + * + * Note: the compareClassifiers and compareSubscribers must together form an absolute ordering (think java.util.Comparator.compare) + */ trait ScanningClassification { self: EventBus ⇒ - protected final val subscribers = new ConcurrentSkipListSet[(Classifier, Subscriber)](ordering) + protected final val subscribers = new ConcurrentSkipListSet[(Classifier, Subscriber)](new Comparator[(Classifier, Subscriber)] { + def compare(a: (Classifier, Subscriber), b: (Classifier, Subscriber)): Int = { + val cM = compareClassifiers(a._1, b._1) + if (cM != 0) cM + else compareSubscribers(a._2, b._2) + } + }) + + /** + * Provides a total ordering of Classifiers (think java.util.Comparator.compare) + */ + protected def compareClassifiers(a: Classifier, b: Classifier): Int + + /** + * Provides a total ordering of Subscribers (think java.util.Comparator.compare) + */ + protected def compareSubscribers(a: Subscriber, b: Subscriber): Int + + /** + * Returns whether the specified Classifier matches the specified Event + */ + protected def matches(classifier: Classifier, event: Event): Boolean + + /** + * Publishes the specified Event to the specified Subscriber + */ + protected def publish(event: Event, subscriber: Subscriber): Unit def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.add((to, subscriber)) + def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove((from, subscriber)) + def unsubscribe(subscriber: Subscriber): Unit = { val i = subscribers.iterator() while (i.hasNext) { val e = i.next() - if (subscriber == e._2) i.remove() + if (compareSubscribers(subscriber, e._2) == 0) i.remove() } } - protected def ordering: Comparator[(Classifier, Subscriber)] - - protected def matches(classifier: Classifier, event: Event): Boolean - - protected def publish(event: Event, subscriber: Subscriber): Unit - def publish(event: Event): Unit = { val currentSubscribers = subscribers.iterator() while (currentSubscribers.hasNext) { val (classifier, subscriber) = currentSubscribers.next() - if (matches(classifier, event)) publish(event, subscriber) + if (matches(classifier, event)) + publish(event, subscriber) } } } +/** + * Maps ActorRefs to ActorRefs to form an EventBus where ActorRefs can listen to other ActorRefs + */ trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒ import java.util.concurrent.ConcurrentHashMap import scala.annotation.tailrec - def mapSize: Int - protected val mappings = new ConcurrentHashMap[ActorRef, Vector[ActorRef]](mapSize) @tailrec @@ -170,8 +252,16 @@ trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒ } } + /** + * Returns the Classifier associated with the specified Event + */ protected def classify(event: Event): Classifier + /** + * This is a size hint for the number of Classifiers you expect to have (use powers of 2) + */ + protected def mapSize: Int + def publish(event: Event): Unit = mappings.get(classify(event)) match { case null ⇒ case raw: Vector[_] ⇒ diff --git a/akka-actor/src/main/scala/akka/event/EventBusJavaAPI.scala b/akka-actor/src/main/scala/akka/event/EventBusJavaAPI.scala new file mode 100644 index 0000000000..669198c187 --- /dev/null +++ b/akka-actor/src/main/scala/akka/event/EventBusJavaAPI.scala @@ -0,0 +1,38 @@ +package akka.event.japi + +import akka.event._ + +/** + * See documentation for akka.event.LookupClassification + * E is the Event type + * S is the Subscriber type + * C is the Classifier type + */ +abstract class LookupEventBus[E, S, C] extends EventBus with LookupClassification { + type Event = E + type Subscriber = S + type Classifier = C +} + +/** + * See documentation for akka.event.ScanningClassification + * E is the Event type + * S is the Subscriber type + * C is the Classifier type + */ +abstract class ScanningEventBus[E, S, C] extends EventBus with ScanningClassification { + type Event = E + type Subscriber = S + type Classifier = C +} + +/** + * See documentation for akka.event.ActorClassification + * An EventBus where the Subscribers are ActorRefs and the Classifier is ActorRef + * Means that ActorRefs "listen" to other ActorRefs + * E is the Event type + */ + +abstract class ActorEventBus[E] extends akka.event.ActorEventBus with ActorClassification with ActorClassifier { + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/util/Index.scala b/akka-actor/src/main/scala/akka/util/Index.scala index b8d776e1a2..afbb7a2c20 100644 --- a/akka-actor/src/main/scala/akka/util/Index.scala +++ b/akka-actor/src/main/scala/akka/util/Index.scala @@ -6,8 +6,8 @@ package akka.util import annotation.tailrec -import java.util.{ Set ⇒ JSet } import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap } +import java.util.{ Comparator, Set ⇒ JSet } /** * An implementation of a ConcurrentMultiMap @@ -16,8 +16,13 @@ import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap } * * @author Viktor Klang */ -class Index[K, V] { - private val container = new ConcurrentHashMap[K, JSet[V]] +class Index[K, V](val mapSize: Int, val valueComparator: Comparator[V]) { + + def this(mapSize: Int, cmp: (V, V) ⇒ Int) = this(mapSize, new Comparator[V] { + def compare(a: V, b: V): Int = cmp(a, b) + }) + + private val container = new ConcurrentHashMap[K, JSet[V]](mapSize) private val emptySet = new ConcurrentSkipListSet[V] /** @@ -41,7 +46,7 @@ class Index[K, V] { } } } else { - val newSet = new ConcurrentSkipListSet[V] + val newSet = new ConcurrentSkipListSet[V](valueComparator) newSet add v // Parry for two simultaneous putIfAbsent(id,newSet) @@ -172,4 +177,4 @@ class Index[K, V] { * * @author Viktor Klang */ -class ConcurrentMultiMap[K, V] extends Index[K, V] +class ConcurrentMultiMap[K, V](mapSize: Int, valueComparator: Comparator[V]) extends Index[K, V](mapSize, valueComparator) diff --git a/akka-docs/conf.py b/akka-docs/conf.py index 020feafde7..8a9e8c3240 100644 --- a/akka-docs/conf.py +++ b/akka-docs/conf.py @@ -48,17 +48,22 @@ htmlhelp_basename = 'Akkadoc' # -- Options for LaTeX output -------------------------------------------------- +def setup(app): + from sphinx.util.texescape import tex_replacements + tex_replacements.append((u'⇒', ur'\(\Rightarrow\)')) + latex_paper_size = 'a4' latex_font_size = '10pt' latex_documents = [ ('index', 'Akka.tex', u' Akka Documentation', - u'Scalable Solutions AB', 'manual'), + u'Typesafe Inc', 'manual'), ] latex_elements = { 'classoptions': ',oneside,openany', 'babel': '\\usepackage[english]{babel}', + 'fontpkg': '\\PassOptionsToPackage{warn}{textcomp} \\usepackage{times}', 'preamble': '\\definecolor{VerbatimColor}{rgb}{0.935,0.935,0.935}' } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index add91d8a82..4a4ae90182 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -35,6 +35,7 @@ import java.net.InetSocketAddress import java.util.concurrent._ import java.util.concurrent.atomic._ import akka.AkkaException +import java.util.Comparator class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) { def this(msg: String) = this(msg, null); @@ -57,7 +58,6 @@ object RemoteEncoder { trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement ⇒ private val remoteClients = new HashMap[RemoteAddress, RemoteClient] - private val remoteActors = new Index[RemoteAddress, Uuid] private val lock = new ReadWriteGuard protected[akka] def send[T](message: Any, @@ -147,18 +147,11 @@ abstract class RemoteClient private[akka] ( val module: NettyRemoteClientModule, val remoteAddress: InetSocketAddress) { - val useTransactionLog = config.getBool("akka.remote.client.buffering.retry-message-send-on-failure", false) - val transactionLogCapacity = config.getInt("akka.remote.client.buffering.capacity", -1) - val name = this.getClass.getSimpleName + "@" + remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort protected val futures = new ConcurrentHashMap[Uuid, Promise[_]] - protected val pendingRequests = { - if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] - else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity) - } private[remote] val runSwitch = new Switch() @@ -172,19 +165,6 @@ abstract class RemoteClient private[akka] ( def shutdown(): Boolean - /** - * Returns an array with the current pending messages not yet delivered. - */ - def pendingMessages: Array[Any] = { - var messages = Vector[Any]() - val iter = pendingRequests.iterator - while (iter.hasNext) { - val (_, _, message) = iter.next - messages = messages :+ MessageSerializer.deserialize(message.getMessage) - } - messages.toArray - } - /** * Converts the message to the wireprotocol and sends the message across the wire */ @@ -220,13 +200,7 @@ abstract class RemoteClient private[akka] ( notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) } } catch { - case e: Exception ⇒ - notifyListeners(RemoteClientError(e, module, remoteAddress)) - - if (useTransactionLog && !pendingRequests.offer((true, null, request))) { // Add the request to the tx log after a failing send - pendingRequests.clear() - throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached") - } + case e: Exception ⇒ notifyListeners(RemoteClientError(e, module, remoteAddress)) } None @@ -240,14 +214,8 @@ abstract class RemoteClient private[akka] ( futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails def handleRequestReplyError(future: ChannelFuture) = { - if (useTransactionLog && !pendingRequests.offer((false, futureUuid, request))) { // Add the request to the tx log after a failing send - pendingRequests.clear() - throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached") - - } else { - val f = futures.remove(futureUuid) // Clean up future - if (f ne null) f.completeWithException(future.getCause) - } + val f = futures.remove(futureUuid) // Clean up future + if (f ne null) f.completeWithException(future.getCause) } var future: ChannelFuture = null @@ -275,41 +243,6 @@ abstract class RemoteClient private[akka] ( throw exception } } - - private[remote] def sendPendingRequests() = pendingRequests synchronized { - // ensure only one thread at a time can flush the log - val nrOfMessages = pendingRequests.size - if (nrOfMessages > 0) EventHandler.info(this, "Resending [%s] previously failed messages after remote client reconnect" format nrOfMessages) - var pendingRequest = pendingRequests.peek - - while (pendingRequest ne null) { - val (isOneWay, futureUuid, message) = pendingRequest - - if (isOneWay) { - // tell - val future = currentChannel.write(RemoteEncoder.encode(message)) - future.awaitUninterruptibly() - - if (future.isCancelled && !future.isSuccess) { - notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress)) - } - - } else { - // ask - val future = currentChannel.write(RemoteEncoder.encode(message)) - future.awaitUninterruptibly() - - if (future.isCancelled || !future.isSuccess) { - val f = futures.remove(futureUuid) // Clean up future - if (f ne null) f.completeWithException(future.getCause) - notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress)) - } - } - - pendingRequests.remove(pendingRequest) - pendingRequest = pendingRequests.peek // try to grab next message - } - } } /** @@ -440,7 +373,6 @@ class ActiveRemoteClient private[akka] ( bootstrap.releaseExternalResources() bootstrap = null connection = null - pendingRequests.clear() EventHandler.info(this, "[%s] has been shut down".format(name)) } @@ -555,7 +487,6 @@ class ActiveRemoteClientHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { try { - if (client.useTransactionLog) client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) EventHandler.debug(this, "Remote client connected to [%s]".format(ctx.getChannel.getRemoteAddress)) client.resetReconnectionTimeWindow