From e86144499ca98f21d944d65414a04d0acc426c74 Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 16 Oct 2012 12:06:03 +0200 Subject: [PATCH] remove some FIXMEs some were left-over residue, some were fixed --- .../test/scala/akka/dispatch/FutureSpec.scala | 269 ------------------ .../scala/akka/actor/ActorSelection.scala | 1 - .../src/main/scala/akka/actor/Deployer.scala | 6 +- .../main/scala/akka/actor/FaultHandling.scala | 4 +- .../akka/dispatch/AbstractDispatcher.scala | 2 +- .../main/scala/akka/dispatch/Dispatcher.scala | 2 +- .../main/scala/akka/pattern/AskSupport.scala | 2 +- .../scala/akka/pattern/CircuitBreaker.scala | 3 +- .../src/main/scala/akka/routing/Routing.scala | 6 +- 9 files changed, 15 insertions(+), 280 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 28a5fe6a70..9c732d7279 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -521,268 +521,6 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa filterException[TimeoutException] { intercept[TimeoutException] { FutureSpec.ready(f3, 0 millis) } } } - //FIXME DATAFLOW - /*"futureComposingWithContinuations" in { - import Future.flow - - val actor = system.actorOf(Props[TestActor]) - - val x = Future("Hello") - val y = x flatMap (actor ? _) mapTo manifest[String] - - val r = flow(x() + " " + y() + "!") - - assert(Await.result(r, timeout.duration) === "Hello World!") - - system.stop(actor) - } - - "futureComposingWithContinuationsFailureDivideZero" in { - filterException[ArithmeticException] { - import Future.flow - - val x = Future("Hello") - val y = x map (_.length) - - val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply, 100) - - intercept[java.lang.ArithmeticException](Await.result(r, timeout.duration)) - } - } - - "futureComposingWithContinuationsFailureCastInt" in { - filterException[ClassCastException] { - import Future.flow - - val actor = system.actorOf(Props[TestActor]) - - val x = Future(3) - val y = (actor ? "Hello").mapTo[Int] - - val r = flow(x() + y(), 100) - - intercept[ClassCastException](Await.result(r, timeout.duration)) - } - } - - "futureComposingWithContinuationsFailureCastNothing" in { - filterException[ClassCastException] { - import Future.flow - - val actor = system.actorOf(Props[TestActor]) - - val x = Future("Hello") - val y = actor ? "Hello" mapTo manifest[Nothing] - - val r = flow(x() + y()) - - intercept[ClassCastException](Await.result(r, timeout.duration)) - } - } - - "futureCompletingWithContinuations" in { - import Future.flow - - val x, y, z = Promise[Int]() - val ly, lz = new TestLatch - - val result = flow { - y completeWith x - ly.open() // not within continuation - - z << x - lz.open() // within continuation, will wait for 'z' to complete - z() + y() - } - - FutureSpec.ready(ly, 100 milliseconds) - intercept[TimeoutException] { FutureSpec.ready(lz, 100 milliseconds) } - - flow { x << 5 } - - assert(Await.result(y, timeout.duration) === 5) - assert(Await.result(z, timeout.duration) === 5) - FutureSpec.ready(lz, timeout.duration) - assert(Await.result(result, timeout.duration) === 10) - - val a, b, c = Promise[Int]() - - val result2 = flow { - val n = (a << c).value.get.right.get + 10 - b << (c() - 2) - a() + n * b() - } - - c completeWith Future(5) - - assert(Await.result(a, timeout.duration) === 5) - assert(Await.result(b, timeout.duration) === 3) - assert(Await.result(result2, timeout.duration) === 50) - } - - "futureDataFlowShouldEmulateBlocking1" in { - import Future.flow - - val one, two = Promise[Int]() - val simpleResult = flow { - one() + two() - } - - assert(List(one, two, simpleResult).forall(_.isCompleted == false)) - - flow { one << 1 } - - FutureSpec.ready(one, 1 minute) - - assert(one.isCompleted) - assert(List(two, simpleResult).forall(_.isCompleted == false)) - - flow { two << 9 } - - FutureSpec.ready(two, 1 minute) - - assert(List(one, two).forall(_.isCompleted == true)) - assert(Await.result(simpleResult, timeout.duration) === 10) - - } - - "futureDataFlowShouldEmulateBlocking2" in { - import Future.flow - val x1, x2, y1, y2 = Promise[Int]() - val lx, ly, lz = new TestLatch - val result = flow { - lx.open() - x1 << y1 - ly.open() - x2 << y2 - lz.open() - x1() + x2() - } - FutureSpec.ready(lx, 2 seconds) - assert(!ly.isOpen) - assert(!lz.isOpen) - assert(List(x1, x2, y1, y2).forall(_.isCompleted == false)) - - flow { y1 << 1 } // When this is set, it should cascade down the line - - FutureSpec.ready(ly, 2 seconds) - assert(Await.result(x1, 1 minute) === 1) - assert(!lz.isOpen) - - flow { y2 << 9 } // When this is set, it should cascade down the line - - FutureSpec.ready(lz, 2 seconds) - assert(Await.result(x2, 1 minute) === 9) - - assert(List(x1, x2, y1, y2).forall(_.isCompleted)) - - assert(Await.result(result, 1 minute) === 10) - } - - "dataFlowAPIshouldbeSlick" in { - import Future.flow - - val i1, i2, s1, s2 = new TestLatch - - val callService1 = Future { i1.open(); FutureSpec.ready(s1, TestLatch.DefaultTimeout); 1 } - val callService2 = Future { i2.open(); FutureSpec.ready(s2, TestLatch.DefaultTimeout); 9 } - - val result = flow { callService1() + callService2() } - - assert(!s1.isOpen) - assert(!s2.isOpen) - assert(!result.isCompleted) - FutureSpec.ready(i1, 2 seconds) - FutureSpec.ready(i2, 2 seconds) - s1.open() - s2.open() - assert(Await.result(result, timeout.duration) === 10) - } - - "futureCompletingWithContinuationsFailure" in { - filterException[ArithmeticException] { - import Future.flow - - val x, y, z = Promise[Int]() - val ly, lz = new TestLatch - - val result = flow { - y << x - ly.open() - val oops = 1 / 0 - z << x - lz.open() - z() + y() + oops - } - intercept[TimeoutException] { FutureSpec.ready(ly, 100 milliseconds) } - intercept[TimeoutException] { FutureSpec.ready(lz, 100 milliseconds) } - flow { x << 5 } - - assert(Await.result(y, timeout.duration) === 5) - intercept[java.lang.ArithmeticException](Await.result(result, timeout.duration)) - assert(z.value === None) - assert(!lz.isOpen) - } - } - - "futureContinuationsShouldNotBlock" in { - import Future.flow - - val latch = new TestLatch - val future = Future { - FutureSpec.ready(latch, TestLatch.DefaultTimeout) - "Hello" - } - - val result = flow { - Some(future()).filter(_ == "Hello") - } - - assert(!result.isCompleted) - - latch.open() - - assert(Await.result(result, timeout.duration) === Some("Hello")) - } - - "futureFlowShouldBeTypeSafe" in { - import Future.flow - - val rString = flow { - val x = Future(5) - x().toString - } - - val rInt = flow { - val x = rString.apply - val y = Future(5) - x.length + y() - } - - assert(checkType(rString, manifest[String])) - assert(checkType(rInt, manifest[Int])) - assert(!checkType(rInt, manifest[String])) - assert(!checkType(rInt, manifest[Nothing])) - assert(!checkType(rInt, manifest[Any])) - - Await.result(rString, timeout.duration) - Await.result(rInt, timeout.duration) - } - - "futureFlowSimpleAssign" in { - import Future.flow - - val x, y, z = Promise[Int]() - - flow { - z << x() + y() - } - flow { x << 40 } - flow { y << 2 } - - assert(Await.result(z, timeout.duration) === 42) - }*/ - "run callbacks async" in { val latch = Vector.fill(10)(new TestLatch) @@ -872,13 +610,6 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa // failCount.get must be(0) } - //FIXME DATAFLOW - /*"should capture first exception with dataflow" in { - import Future.flow - val f1 = flow { 40 / 0 } - intercept[java.lang.ArithmeticException](Await result (f1, TestLatch.DefaultTimeout)) - }*/ - } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSelection.scala b/akka-actor/src/main/scala/akka/actor/ActorSelection.scala index 2f4fd4219b..e329af556b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSelection.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSelection.scala @@ -23,7 +23,6 @@ abstract class ActorSelection { def tell(msg: Any, sender: ActorRef): Unit = target.tell(toMessage(msg, path), sender) - // FIXME make this so that "next" instead is the remaining path private def toMessage(msg: Any, path: Array[AnyRef]): Any = { var acc = msg var index = path.length - 1 diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index deb6285826..06d3b01a1b 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -79,7 +79,11 @@ trait Scope { @SerialVersionUID(1L) abstract class LocalScope extends Scope -//FIXME docs +/** + * The Local Scope is the default one, which is assumed on all deployments + * which do not set a different scope. It is also the only scope handled by + * the LocalActorRefProvider. + */ case object LocalScope extends LocalScope { /** * Java API: get the singleton instance diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index ee882dd765..ab93cfee88 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -234,6 +234,8 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { private[akka] def maxNrOfRetriesOption(maxNrOfRetries: Int): Option[Int] = if (maxNrOfRetries < 0) None else Some(maxNrOfRetries) + + private[akka] val escalateDefault = (_: Any) => Escalate } /** @@ -280,7 +282,7 @@ abstract class SupervisorStrategy { * @param children is a lazy collection (a view) */ def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { - val directive = if (decider.isDefinedAt(cause)) decider(cause) else Escalate //FIXME applyOrElse in Scala 2.10 + val directive = decider.applyOrElse(cause, escalateDefault) directive match { case Resume ⇒ resumeChild(child, cause); true case Restart ⇒ processFailure(context, true, child, cause, stats, children); true diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 2d19949335..36afc8a24c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -288,7 +288,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext if (debug) actors.remove(this, actor.self) addInhabitants(-1) val mailBox = actor.swapMailbox(deadLetterMailbox) - mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up + mailBox.becomeClosed() mailBox.cleanUp() } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 67e70291b7..6577e217a1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -93,7 +93,7 @@ class Dispatcher( */ protected[akka] def shutdown: Unit = { val newDelegate = executorServiceDelegate.copy() // Doesn't matter which one we copy - val es = synchronized { // FIXME getAndSet using ARFU or Unsafe + val es = synchronized { val service = executorServiceDelegate executorServiceDelegate = newDelegate // just a quick getAndSet service diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 43e899f81f..2ff45b0290 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -281,7 +281,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide val watchers = clearWatchers() if (!watchers.isEmpty) { val termination = Terminated(this)(existenceConfirmed = true, addressTerminated = false) - watchers foreach { w ⇒ try w.tell(termination, this) catch { case NonFatal(t) ⇒ /* FIXME LOG THIS */ } } + watchers foreach { _.tell(termination, this) } } } state match { diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index 2ce607d483..bfcf3a3783 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -288,8 +288,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite val iterator = listeners.iterator while (iterator.hasNext) { val listener = iterator.next - //FIXME per @viktorklang: it's a bit wasteful to create Futures for one-offs, just use EC.execute instead - Future(listener())(executor) + executor.execute(new Runnable { def run = listener() }) } } } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 36a1fa65a8..ac4f55bd7c 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -446,9 +446,9 @@ case class Destination(sender: ActorRef, recipient: ActorRef) @SerialVersionUID(1L) abstract class NoRouter extends RouterConfig case object NoRouter extends NoRouter { - def createRoute(routeeProvider: RouteeProvider): Route = null // FIXME, null, really?? - def routerDispatcher: String = "" - def supervisorStrategy = null // FIXME null, really?? + def createRoute(routeeProvider: RouteeProvider): Route = throw new UnsupportedOperationException("NoRouter does not createRoute") + def routerDispatcher: String = throw new UnsupportedOperationException("NoRouter has no dispatcher") + def supervisorStrategy = throw new UnsupportedOperationException("NoRouter has no strategy") override def withFallback(other: RouterConfig): RouterConfig = other /**