Merge pull request #807 from akka/wip-FIXMEs-∂π

remove some FIXMEs
This commit is contained in:
Viktor Klang (√) 2012-10-16 05:45:03 -07:00
commit f5404e107f
12 changed files with 44 additions and 308 deletions

View file

@ -521,268 +521,6 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
filterException[TimeoutException] { intercept[TimeoutException] { FutureSpec.ready(f3, 0 millis) } }
}
//FIXME DATAFLOW
/*"futureComposingWithContinuations" in {
import Future.flow
val actor = system.actorOf(Props[TestActor])
val x = Future("Hello")
val y = x flatMap (actor ? _) mapTo manifest[String]
val r = flow(x() + " " + y() + "!")
assert(Await.result(r, timeout.duration) === "Hello World!")
system.stop(actor)
}
"futureComposingWithContinuationsFailureDivideZero" in {
filterException[ArithmeticException] {
import Future.flow
val x = Future("Hello")
val y = x map (_.length)
val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply, 100)
intercept[java.lang.ArithmeticException](Await.result(r, timeout.duration))
}
}
"futureComposingWithContinuationsFailureCastInt" in {
filterException[ClassCastException] {
import Future.flow
val actor = system.actorOf(Props[TestActor])
val x = Future(3)
val y = (actor ? "Hello").mapTo[Int]
val r = flow(x() + y(), 100)
intercept[ClassCastException](Await.result(r, timeout.duration))
}
}
"futureComposingWithContinuationsFailureCastNothing" in {
filterException[ClassCastException] {
import Future.flow
val actor = system.actorOf(Props[TestActor])
val x = Future("Hello")
val y = actor ? "Hello" mapTo manifest[Nothing]
val r = flow(x() + y())
intercept[ClassCastException](Await.result(r, timeout.duration))
}
}
"futureCompletingWithContinuations" in {
import Future.flow
val x, y, z = Promise[Int]()
val ly, lz = new TestLatch
val result = flow {
y completeWith x
ly.open() // not within continuation
z << x
lz.open() // within continuation, will wait for 'z' to complete
z() + y()
}
FutureSpec.ready(ly, 100 milliseconds)
intercept[TimeoutException] { FutureSpec.ready(lz, 100 milliseconds) }
flow { x << 5 }
assert(Await.result(y, timeout.duration) === 5)
assert(Await.result(z, timeout.duration) === 5)
FutureSpec.ready(lz, timeout.duration)
assert(Await.result(result, timeout.duration) === 10)
val a, b, c = Promise[Int]()
val result2 = flow {
val n = (a << c).value.get.right.get + 10
b << (c() - 2)
a() + n * b()
}
c completeWith Future(5)
assert(Await.result(a, timeout.duration) === 5)
assert(Await.result(b, timeout.duration) === 3)
assert(Await.result(result2, timeout.duration) === 50)
}
"futureDataFlowShouldEmulateBlocking1" in {
import Future.flow
val one, two = Promise[Int]()
val simpleResult = flow {
one() + two()
}
assert(List(one, two, simpleResult).forall(_.isCompleted == false))
flow { one << 1 }
FutureSpec.ready(one, 1 minute)
assert(one.isCompleted)
assert(List(two, simpleResult).forall(_.isCompleted == false))
flow { two << 9 }
FutureSpec.ready(two, 1 minute)
assert(List(one, two).forall(_.isCompleted == true))
assert(Await.result(simpleResult, timeout.duration) === 10)
}
"futureDataFlowShouldEmulateBlocking2" in {
import Future.flow
val x1, x2, y1, y2 = Promise[Int]()
val lx, ly, lz = new TestLatch
val result = flow {
lx.open()
x1 << y1
ly.open()
x2 << y2
lz.open()
x1() + x2()
}
FutureSpec.ready(lx, 2 seconds)
assert(!ly.isOpen)
assert(!lz.isOpen)
assert(List(x1, x2, y1, y2).forall(_.isCompleted == false))
flow { y1 << 1 } // When this is set, it should cascade down the line
FutureSpec.ready(ly, 2 seconds)
assert(Await.result(x1, 1 minute) === 1)
assert(!lz.isOpen)
flow { y2 << 9 } // When this is set, it should cascade down the line
FutureSpec.ready(lz, 2 seconds)
assert(Await.result(x2, 1 minute) === 9)
assert(List(x1, x2, y1, y2).forall(_.isCompleted))
assert(Await.result(result, 1 minute) === 10)
}
"dataFlowAPIshouldbeSlick" in {
import Future.flow
val i1, i2, s1, s2 = new TestLatch
val callService1 = Future { i1.open(); FutureSpec.ready(s1, TestLatch.DefaultTimeout); 1 }
val callService2 = Future { i2.open(); FutureSpec.ready(s2, TestLatch.DefaultTimeout); 9 }
val result = flow { callService1() + callService2() }
assert(!s1.isOpen)
assert(!s2.isOpen)
assert(!result.isCompleted)
FutureSpec.ready(i1, 2 seconds)
FutureSpec.ready(i2, 2 seconds)
s1.open()
s2.open()
assert(Await.result(result, timeout.duration) === 10)
}
"futureCompletingWithContinuationsFailure" in {
filterException[ArithmeticException] {
import Future.flow
val x, y, z = Promise[Int]()
val ly, lz = new TestLatch
val result = flow {
y << x
ly.open()
val oops = 1 / 0
z << x
lz.open()
z() + y() + oops
}
intercept[TimeoutException] { FutureSpec.ready(ly, 100 milliseconds) }
intercept[TimeoutException] { FutureSpec.ready(lz, 100 milliseconds) }
flow { x << 5 }
assert(Await.result(y, timeout.duration) === 5)
intercept[java.lang.ArithmeticException](Await.result(result, timeout.duration))
assert(z.value === None)
assert(!lz.isOpen)
}
}
"futureContinuationsShouldNotBlock" in {
import Future.flow
val latch = new TestLatch
val future = Future {
FutureSpec.ready(latch, TestLatch.DefaultTimeout)
"Hello"
}
val result = flow {
Some(future()).filter(_ == "Hello")
}
assert(!result.isCompleted)
latch.open()
assert(Await.result(result, timeout.duration) === Some("Hello"))
}
"futureFlowShouldBeTypeSafe" in {
import Future.flow
val rString = flow {
val x = Future(5)
x().toString
}
val rInt = flow {
val x = rString.apply
val y = Future(5)
x.length + y()
}
assert(checkType(rString, manifest[String]))
assert(checkType(rInt, manifest[Int]))
assert(!checkType(rInt, manifest[String]))
assert(!checkType(rInt, manifest[Nothing]))
assert(!checkType(rInt, manifest[Any]))
Await.result(rString, timeout.duration)
Await.result(rInt, timeout.duration)
}
"futureFlowSimpleAssign" in {
import Future.flow
val x, y, z = Promise[Int]()
flow {
z << x() + y()
}
flow { x << 40 }
flow { y << 2 }
assert(Await.result(z, timeout.duration) === 42)
}*/
"run callbacks async" in {
val latch = Vector.fill(10)(new TestLatch)
@ -872,13 +610,6 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
// failCount.get must be(0)
}
//FIXME DATAFLOW
/*"should capture first exception with dataflow" in {
import Future.flow
val f1 = flow { 40 / 0 }
intercept[java.lang.ArithmeticException](Await result (f1, TestLatch.DefaultTimeout))
}*/
}
}

View file

@ -23,7 +23,6 @@ abstract class ActorSelection {
def tell(msg: Any, sender: ActorRef): Unit = target.tell(toMessage(msg, path), sender)
// FIXME make this so that "next" instead is the remaining path
private def toMessage(msg: Any, path: Array[AnyRef]): Any = {
var acc = msg
var index = path.length - 1

View file

@ -79,7 +79,11 @@ trait Scope {
@SerialVersionUID(1L)
abstract class LocalScope extends Scope
//FIXME docs
/**
* The Local Scope is the default one, which is assumed on all deployments
* which do not set a different scope. It is also the only scope handled by
* the LocalActorRefProvider.
*/
case object LocalScope extends LocalScope {
/**
* Java API: get the singleton instance

View file

@ -234,6 +234,8 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
private[akka] def maxNrOfRetriesOption(maxNrOfRetries: Int): Option[Int] =
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries)
private[akka] val escalateDefault = (_: Any) Escalate
}
/**
@ -280,7 +282,7 @@ abstract class SupervisorStrategy {
* @param children is a lazy collection (a view)
*/
def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = {
val directive = if (decider.isDefinedAt(cause)) decider(cause) else Escalate //FIXME applyOrElse in Scala 2.10
val directive = decider.applyOrElse(cause, escalateDefault)
directive match {
case Resume resumeChild(child, cause); true
case Restart processFailure(context, true, child, cause, stats, children); true

View file

@ -288,7 +288,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
if (debug) actors.remove(this, actor.self)
addInhabitants(-1)
val mailBox = actor.swapMailbox(deadLetterMailbox)
mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up
mailBox.becomeClosed()
mailBox.cleanUp()
}

View file

@ -93,7 +93,7 @@ class Dispatcher(
*/
protected[akka] def shutdown: Unit = {
val newDelegate = executorServiceDelegate.copy() // Doesn't matter which one we copy
val es = synchronized { // FIXME getAndSet using ARFU or Unsafe
val es = synchronized {
val service = executorServiceDelegate
executorServiceDelegate = newDelegate // just a quick getAndSet
service

View file

@ -281,7 +281,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
val watchers = clearWatchers()
if (!watchers.isEmpty) {
val termination = Terminated(this)(existenceConfirmed = true, addressTerminated = false)
watchers foreach { w try w.tell(termination, this) catch { case NonFatal(t) /* FIXME LOG THIS */ } }
watchers foreach { _.tell(termination, this) }
}
}
state match {

View file

@ -155,22 +155,20 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
* The callback is run in the [[scala.concurrent.ExecutionContext]] supplied in the constructor.
*
* @param callback Handler to be invoked on state change
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage
*/
def onOpen[T](callback: T): CircuitBreaker = {
Open.addListener(() callback)
this
}
def onOpen(callback: Unit): CircuitBreaker = onOpen(new Runnable { def run = callback })
/**
* Java API for onOpen
*
* @param callback Handler to be invoked on state change
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage
*/
def onOpen[T](callback: Callable[T]): CircuitBreaker = onOpen(callback.call)
def onOpen(callback: Runnable): CircuitBreaker = {
Open addListener callback
this
}
/**
* Adds a callback to execute when circuit breaker transitions to half-open
@ -178,22 +176,20 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
* The callback is run in the [[scala.concurrent.ExecutionContext]] supplied in the constructor.
*
* @param callback Handler to be invoked on state change
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage
*/
def onHalfOpen[T](callback: T): CircuitBreaker = {
HalfOpen.addListener(() callback)
this
}
def onHalfOpen(callback: Unit): CircuitBreaker = onHalfOpen(new Runnable { def run = callback })
/**
* JavaAPI for onHalfOpen
*
* @param callback Handler to be invoked on state change
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage
*/
def onHalfOpen[T](callback: Callable[T]): CircuitBreaker = onHalfOpen(callback.call)
def onHalfOpen(callback: Runnable): CircuitBreaker = {
HalfOpen addListener callback
this
}
/**
* Adds a callback to execute when circuit breaker state closes
@ -201,22 +197,20 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
* The callback is run in the [[scala.concurrent.ExecutionContext]] supplied in the constructor.
*
* @param callback Handler to be invoked on state change
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage
*/
def onClose[T](callback: T): CircuitBreaker = {
Closed.addListener(() callback)
this
}
def onClose(callback: Unit): CircuitBreaker = onClose(new Runnable { def run = callback })
/**
* JavaAPI for onClose
*
* @param callback Handler to be invoked on state change
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage
*/
def onClose[T](callback: Callable[T]): CircuitBreaker = onClose(callback.call)
def onClose(callback: Runnable): CircuitBreaker = {
Closed addListener callback
this
}
/**
* Retrieves current failure count.
@ -261,7 +255,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
* Internal state abstraction
*/
private sealed trait State {
private val listeners = new CopyOnWriteArrayList[() _]
private val listeners = new CopyOnWriteArrayList[Runnable]
/**
* Add a listener function which is invoked on state entry
@ -269,7 +263,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
* @param listener listener implementation
* @tparam T return type of listener, not used - but supplied for type inference purposes
*/
def addListener[T](listener: () T): Unit = listeners add listener
def addListener(listener: Runnable): Unit = listeners add listener
/**
* Test for whether listeners exist
@ -288,8 +282,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
val iterator = listeners.iterator
while (iterator.hasNext) {
val listener = iterator.next
//FIXME per @viktorklang: it's a bit wasteful to create Futures for one-offs, just use EC.execute instead
Future(listener())(executor)
executor.execute(listener)
}
}
}

View file

@ -446,9 +446,9 @@ case class Destination(sender: ActorRef, recipient: ActorRef)
@SerialVersionUID(1L)
abstract class NoRouter extends RouterConfig
case object NoRouter extends NoRouter {
def createRoute(routeeProvider: RouteeProvider): Route = null // FIXME, null, really??
def routerDispatcher: String = ""
def supervisorStrategy = null // FIXME null, really??
def createRoute(routeeProvider: RouteeProvider): Route = throw new UnsupportedOperationException("NoRouter does not createRoute")
def routerDispatcher: String = throw new UnsupportedOperationException("NoRouter has no dispatcher")
def supervisorStrategy = throw new UnsupportedOperationException("NoRouter has no strategy")
override def withFallback(other: RouterConfig): RouterConfig = other
/**

View file

@ -29,10 +29,9 @@ public class DangerousJavaActor extends UntypedActor {
this.breaker = new CircuitBreaker(
getContext().dispatcher(), getContext().system().scheduler(),
5, Duration.create(10, "s"), Duration.create(1, "m"))
.onOpen(new Callable<Object>() {
public Object call() throws Exception {
.onOpen(new Runnable() {
public void run() {
notifyMeOnOpen();
return null;
}
});
}

View file

@ -7,7 +7,11 @@ package docs.actor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.SupervisorStrategy;
import static akka.actor.SupervisorStrategy.*;
import static akka.actor.SupervisorStrategy.resume;
import static akka.actor.SupervisorStrategy.restart;
import static akka.actor.SupervisorStrategy.stop;
import static akka.actor.SupervisorStrategy.escalate;
import akka.actor.SupervisorStrategy.Directive;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.Terminated;

View file

@ -22,7 +22,11 @@ import com.typesafe.config.ConfigFactory;
import static akka.japi.Util.classTag;
import static akka.actor.SupervisorStrategy.*;
import static akka.actor.SupervisorStrategy.resume;
import static akka.actor.SupervisorStrategy.restart;
import static akka.actor.SupervisorStrategy.stop;
import static akka.actor.SupervisorStrategy.escalate;
import akka.actor.SupervisorStrategy.Directive;
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;