diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 28a5fe6a70..9c732d7279 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -521,268 +521,6 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa filterException[TimeoutException] { intercept[TimeoutException] { FutureSpec.ready(f3, 0 millis) } } } - //FIXME DATAFLOW - /*"futureComposingWithContinuations" in { - import Future.flow - - val actor = system.actorOf(Props[TestActor]) - - val x = Future("Hello") - val y = x flatMap (actor ? _) mapTo manifest[String] - - val r = flow(x() + " " + y() + "!") - - assert(Await.result(r, timeout.duration) === "Hello World!") - - system.stop(actor) - } - - "futureComposingWithContinuationsFailureDivideZero" in { - filterException[ArithmeticException] { - import Future.flow - - val x = Future("Hello") - val y = x map (_.length) - - val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply, 100) - - intercept[java.lang.ArithmeticException](Await.result(r, timeout.duration)) - } - } - - "futureComposingWithContinuationsFailureCastInt" in { - filterException[ClassCastException] { - import Future.flow - - val actor = system.actorOf(Props[TestActor]) - - val x = Future(3) - val y = (actor ? "Hello").mapTo[Int] - - val r = flow(x() + y(), 100) - - intercept[ClassCastException](Await.result(r, timeout.duration)) - } - } - - "futureComposingWithContinuationsFailureCastNothing" in { - filterException[ClassCastException] { - import Future.flow - - val actor = system.actorOf(Props[TestActor]) - - val x = Future("Hello") - val y = actor ? "Hello" mapTo manifest[Nothing] - - val r = flow(x() + y()) - - intercept[ClassCastException](Await.result(r, timeout.duration)) - } - } - - "futureCompletingWithContinuations" in { - import Future.flow - - val x, y, z = Promise[Int]() - val ly, lz = new TestLatch - - val result = flow { - y completeWith x - ly.open() // not within continuation - - z << x - lz.open() // within continuation, will wait for 'z' to complete - z() + y() - } - - FutureSpec.ready(ly, 100 milliseconds) - intercept[TimeoutException] { FutureSpec.ready(lz, 100 milliseconds) } - - flow { x << 5 } - - assert(Await.result(y, timeout.duration) === 5) - assert(Await.result(z, timeout.duration) === 5) - FutureSpec.ready(lz, timeout.duration) - assert(Await.result(result, timeout.duration) === 10) - - val a, b, c = Promise[Int]() - - val result2 = flow { - val n = (a << c).value.get.right.get + 10 - b << (c() - 2) - a() + n * b() - } - - c completeWith Future(5) - - assert(Await.result(a, timeout.duration) === 5) - assert(Await.result(b, timeout.duration) === 3) - assert(Await.result(result2, timeout.duration) === 50) - } - - "futureDataFlowShouldEmulateBlocking1" in { - import Future.flow - - val one, two = Promise[Int]() - val simpleResult = flow { - one() + two() - } - - assert(List(one, two, simpleResult).forall(_.isCompleted == false)) - - flow { one << 1 } - - FutureSpec.ready(one, 1 minute) - - assert(one.isCompleted) - assert(List(two, simpleResult).forall(_.isCompleted == false)) - - flow { two << 9 } - - FutureSpec.ready(two, 1 minute) - - assert(List(one, two).forall(_.isCompleted == true)) - assert(Await.result(simpleResult, timeout.duration) === 10) - - } - - "futureDataFlowShouldEmulateBlocking2" in { - import Future.flow - val x1, x2, y1, y2 = Promise[Int]() - val lx, ly, lz = new TestLatch - val result = flow { - lx.open() - x1 << y1 - ly.open() - x2 << y2 - lz.open() - x1() + x2() - } - FutureSpec.ready(lx, 2 seconds) - assert(!ly.isOpen) - assert(!lz.isOpen) - assert(List(x1, x2, y1, y2).forall(_.isCompleted == false)) - - flow { y1 << 1 } // When this is set, it should cascade down the line - - FutureSpec.ready(ly, 2 seconds) - assert(Await.result(x1, 1 minute) === 1) - assert(!lz.isOpen) - - flow { y2 << 9 } // When this is set, it should cascade down the line - - FutureSpec.ready(lz, 2 seconds) - assert(Await.result(x2, 1 minute) === 9) - - assert(List(x1, x2, y1, y2).forall(_.isCompleted)) - - assert(Await.result(result, 1 minute) === 10) - } - - "dataFlowAPIshouldbeSlick" in { - import Future.flow - - val i1, i2, s1, s2 = new TestLatch - - val callService1 = Future { i1.open(); FutureSpec.ready(s1, TestLatch.DefaultTimeout); 1 } - val callService2 = Future { i2.open(); FutureSpec.ready(s2, TestLatch.DefaultTimeout); 9 } - - val result = flow { callService1() + callService2() } - - assert(!s1.isOpen) - assert(!s2.isOpen) - assert(!result.isCompleted) - FutureSpec.ready(i1, 2 seconds) - FutureSpec.ready(i2, 2 seconds) - s1.open() - s2.open() - assert(Await.result(result, timeout.duration) === 10) - } - - "futureCompletingWithContinuationsFailure" in { - filterException[ArithmeticException] { - import Future.flow - - val x, y, z = Promise[Int]() - val ly, lz = new TestLatch - - val result = flow { - y << x - ly.open() - val oops = 1 / 0 - z << x - lz.open() - z() + y() + oops - } - intercept[TimeoutException] { FutureSpec.ready(ly, 100 milliseconds) } - intercept[TimeoutException] { FutureSpec.ready(lz, 100 milliseconds) } - flow { x << 5 } - - assert(Await.result(y, timeout.duration) === 5) - intercept[java.lang.ArithmeticException](Await.result(result, timeout.duration)) - assert(z.value === None) - assert(!lz.isOpen) - } - } - - "futureContinuationsShouldNotBlock" in { - import Future.flow - - val latch = new TestLatch - val future = Future { - FutureSpec.ready(latch, TestLatch.DefaultTimeout) - "Hello" - } - - val result = flow { - Some(future()).filter(_ == "Hello") - } - - assert(!result.isCompleted) - - latch.open() - - assert(Await.result(result, timeout.duration) === Some("Hello")) - } - - "futureFlowShouldBeTypeSafe" in { - import Future.flow - - val rString = flow { - val x = Future(5) - x().toString - } - - val rInt = flow { - val x = rString.apply - val y = Future(5) - x.length + y() - } - - assert(checkType(rString, manifest[String])) - assert(checkType(rInt, manifest[Int])) - assert(!checkType(rInt, manifest[String])) - assert(!checkType(rInt, manifest[Nothing])) - assert(!checkType(rInt, manifest[Any])) - - Await.result(rString, timeout.duration) - Await.result(rInt, timeout.duration) - } - - "futureFlowSimpleAssign" in { - import Future.flow - - val x, y, z = Promise[Int]() - - flow { - z << x() + y() - } - flow { x << 40 } - flow { y << 2 } - - assert(Await.result(z, timeout.duration) === 42) - }*/ - "run callbacks async" in { val latch = Vector.fill(10)(new TestLatch) @@ -872,13 +610,6 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa // failCount.get must be(0) } - //FIXME DATAFLOW - /*"should capture first exception with dataflow" in { - import Future.flow - val f1 = flow { 40 / 0 } - intercept[java.lang.ArithmeticException](Await result (f1, TestLatch.DefaultTimeout)) - }*/ - } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSelection.scala b/akka-actor/src/main/scala/akka/actor/ActorSelection.scala index 2f4fd4219b..e329af556b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSelection.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSelection.scala @@ -23,7 +23,6 @@ abstract class ActorSelection { def tell(msg: Any, sender: ActorRef): Unit = target.tell(toMessage(msg, path), sender) - // FIXME make this so that "next" instead is the remaining path private def toMessage(msg: Any, path: Array[AnyRef]): Any = { var acc = msg var index = path.length - 1 diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index deb6285826..06d3b01a1b 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -79,7 +79,11 @@ trait Scope { @SerialVersionUID(1L) abstract class LocalScope extends Scope -//FIXME docs +/** + * The Local Scope is the default one, which is assumed on all deployments + * which do not set a different scope. It is also the only scope handled by + * the LocalActorRefProvider. + */ case object LocalScope extends LocalScope { /** * Java API: get the singleton instance diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index ee882dd765..fbee491664 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -234,6 +234,8 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { private[akka] def maxNrOfRetriesOption(maxNrOfRetries: Int): Option[Int] = if (maxNrOfRetries < 0) None else Some(maxNrOfRetries) + + private[akka] val escalateDefault = (_: Any) ⇒ Escalate } /** @@ -280,7 +282,7 @@ abstract class SupervisorStrategy { * @param children is a lazy collection (a view) */ def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { - val directive = if (decider.isDefinedAt(cause)) decider(cause) else Escalate //FIXME applyOrElse in Scala 2.10 + val directive = decider.applyOrElse(cause, escalateDefault) directive match { case Resume ⇒ resumeChild(child, cause); true case Restart ⇒ processFailure(context, true, child, cause, stats, children); true diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 2d19949335..36afc8a24c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -288,7 +288,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext if (debug) actors.remove(this, actor.self) addInhabitants(-1) val mailBox = actor.swapMailbox(deadLetterMailbox) - mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up + mailBox.becomeClosed() mailBox.cleanUp() } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 67e70291b7..6577e217a1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -93,7 +93,7 @@ class Dispatcher( */ protected[akka] def shutdown: Unit = { val newDelegate = executorServiceDelegate.copy() // Doesn't matter which one we copy - val es = synchronized { // FIXME getAndSet using ARFU or Unsafe + val es = synchronized { val service = executorServiceDelegate executorServiceDelegate = newDelegate // just a quick getAndSet service diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 43e899f81f..2ff45b0290 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -281,7 +281,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide val watchers = clearWatchers() if (!watchers.isEmpty) { val termination = Terminated(this)(existenceConfirmed = true, addressTerminated = false) - watchers foreach { w ⇒ try w.tell(termination, this) catch { case NonFatal(t) ⇒ /* FIXME LOG THIS */ } } + watchers foreach { _.tell(termination, this) } } } state match { diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index 2ce607d483..8a423c12b3 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -155,22 +155,20 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite * The callback is run in the [[scala.concurrent.ExecutionContext]] supplied in the constructor. * * @param callback Handler to be invoked on state change - * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation * @return CircuitBreaker for fluent usage */ - def onOpen[T](callback: ⇒ T): CircuitBreaker = { - Open.addListener(() ⇒ callback) - this - } + def onOpen(callback: ⇒ Unit): CircuitBreaker = onOpen(new Runnable { def run = callback }) /** * Java API for onOpen * * @param callback Handler to be invoked on state change - * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation * @return CircuitBreaker for fluent usage */ - def onOpen[T](callback: Callable[T]): CircuitBreaker = onOpen(callback.call) + def onOpen(callback: Runnable): CircuitBreaker = { + Open addListener callback + this + } /** * Adds a callback to execute when circuit breaker transitions to half-open @@ -178,22 +176,20 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite * The callback is run in the [[scala.concurrent.ExecutionContext]] supplied in the constructor. * * @param callback Handler to be invoked on state change - * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation * @return CircuitBreaker for fluent usage */ - def onHalfOpen[T](callback: ⇒ T): CircuitBreaker = { - HalfOpen.addListener(() ⇒ callback) - this - } + def onHalfOpen(callback: ⇒ Unit): CircuitBreaker = onHalfOpen(new Runnable { def run = callback }) /** * JavaAPI for onHalfOpen * * @param callback Handler to be invoked on state change - * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation * @return CircuitBreaker for fluent usage */ - def onHalfOpen[T](callback: Callable[T]): CircuitBreaker = onHalfOpen(callback.call) + def onHalfOpen(callback: Runnable): CircuitBreaker = { + HalfOpen addListener callback + this + } /** * Adds a callback to execute when circuit breaker state closes @@ -201,22 +197,20 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite * The callback is run in the [[scala.concurrent.ExecutionContext]] supplied in the constructor. * * @param callback Handler to be invoked on state change - * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation * @return CircuitBreaker for fluent usage */ - def onClose[T](callback: ⇒ T): CircuitBreaker = { - Closed.addListener(() ⇒ callback) - this - } + def onClose(callback: ⇒ Unit): CircuitBreaker = onClose(new Runnable { def run = callback }) /** * JavaAPI for onClose * * @param callback Handler to be invoked on state change - * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation * @return CircuitBreaker for fluent usage */ - def onClose[T](callback: Callable[T]): CircuitBreaker = onClose(callback.call) + def onClose(callback: Runnable): CircuitBreaker = { + Closed addListener callback + this + } /** * Retrieves current failure count. @@ -261,7 +255,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite * Internal state abstraction */ private sealed trait State { - private val listeners = new CopyOnWriteArrayList[() ⇒ _] + private val listeners = new CopyOnWriteArrayList[Runnable] /** * Add a listener function which is invoked on state entry @@ -269,7 +263,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite * @param listener listener implementation * @tparam T return type of listener, not used - but supplied for type inference purposes */ - def addListener[T](listener: () ⇒ T): Unit = listeners add listener + def addListener(listener: Runnable): Unit = listeners add listener /** * Test for whether listeners exist @@ -288,8 +282,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite val iterator = listeners.iterator while (iterator.hasNext) { val listener = iterator.next - //FIXME per @viktorklang: it's a bit wasteful to create Futures for one-offs, just use EC.execute instead - Future(listener())(executor) + executor.execute(listener) } } } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 36a1fa65a8..ac4f55bd7c 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -446,9 +446,9 @@ case class Destination(sender: ActorRef, recipient: ActorRef) @SerialVersionUID(1L) abstract class NoRouter extends RouterConfig case object NoRouter extends NoRouter { - def createRoute(routeeProvider: RouteeProvider): Route = null // FIXME, null, really?? - def routerDispatcher: String = "" - def supervisorStrategy = null // FIXME null, really?? + def createRoute(routeeProvider: RouteeProvider): Route = throw new UnsupportedOperationException("NoRouter does not createRoute") + def routerDispatcher: String = throw new UnsupportedOperationException("NoRouter has no dispatcher") + def supervisorStrategy = throw new UnsupportedOperationException("NoRouter has no strategy") override def withFallback(other: RouterConfig): RouterConfig = other /** diff --git a/akka-docs/rst/common/code/docs/circuitbreaker/DangerousJavaActor.java b/akka-docs/rst/common/code/docs/circuitbreaker/DangerousJavaActor.java index 412e742849..f3347937fc 100644 --- a/akka-docs/rst/common/code/docs/circuitbreaker/DangerousJavaActor.java +++ b/akka-docs/rst/common/code/docs/circuitbreaker/DangerousJavaActor.java @@ -29,10 +29,9 @@ public class DangerousJavaActor extends UntypedActor { this.breaker = new CircuitBreaker( getContext().dispatcher(), getContext().system().scheduler(), 5, Duration.create(10, "s"), Duration.create(1, "m")) - .onOpen(new Callable() { - public Object call() throws Exception { + .onOpen(new Runnable() { + public void run() { notifyMeOnOpen(); - return null; } }); } diff --git a/akka-docs/rst/java/code/docs/actor/FaultHandlingTestBase.java b/akka-docs/rst/java/code/docs/actor/FaultHandlingTestBase.java index fdd1937014..c78da61fb1 100644 --- a/akka-docs/rst/java/code/docs/actor/FaultHandlingTestBase.java +++ b/akka-docs/rst/java/code/docs/actor/FaultHandlingTestBase.java @@ -7,7 +7,11 @@ package docs.actor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.SupervisorStrategy; -import static akka.actor.SupervisorStrategy.*; +import static akka.actor.SupervisorStrategy.resume; +import static akka.actor.SupervisorStrategy.restart; +import static akka.actor.SupervisorStrategy.stop; +import static akka.actor.SupervisorStrategy.escalate; +import akka.actor.SupervisorStrategy.Directive; import akka.actor.OneForOneStrategy; import akka.actor.Props; import akka.actor.Terminated; diff --git a/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java b/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java index f3db04cfdf..5b7a3073c3 100644 --- a/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java +++ b/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java @@ -22,7 +22,11 @@ import com.typesafe.config.ConfigFactory; import static akka.japi.Util.classTag; -import static akka.actor.SupervisorStrategy.*; +import static akka.actor.SupervisorStrategy.resume; +import static akka.actor.SupervisorStrategy.restart; +import static akka.actor.SupervisorStrategy.stop; +import static akka.actor.SupervisorStrategy.escalate; +import akka.actor.SupervisorStrategy.Directive; import static akka.pattern.Patterns.ask; import static akka.pattern.Patterns.pipe;