diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index cb4c23f1e1..e6d34cafd6 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -624,7 +624,9 @@ class FutureSpec extends JUnitSuite { import Future.flow import akka.util.cps._ - val promises = List.fill(1000)(Promise[Int]()) + val count = 10000 + + val promises = List.fill(count)(Promise[Int]()) flow { var i = 0 @@ -641,7 +643,7 @@ class FutureSpec extends JUnitSuite { i += 1 } - assert(i === 1000) + assert(i === count) } diff --git a/akka-actor/src/main/scala/akka/util/cps/TailCalls.scala b/akka-actor/src/main/scala/akka/util/cps/TailCalls.scala deleted file mode 100644 index 28dd0bd789..0000000000 --- a/akka-actor/src/main/scala/akka/util/cps/TailCalls.scala +++ /dev/null @@ -1,13 +0,0 @@ -package akka.util.cps - -import scala.util.continuations._ - -object TailCalls { - sealed trait TailRec[A, B] - case class Return[A, B](result: A) extends TailRec[A, B] - case class Call[A, B](thunk: () ⇒ TailRec[A, B] @cps[B]) extends TailRec[A, B] - def tailcall[A, B](comp: TailRec[A, B]): A @cps[B] = comp match { - case Call(thunk) ⇒ tailcall(thunk()) - case Return(x) ⇒ x - } -} diff --git a/akka-actor/src/main/scala/akka/util/cps/package.scala b/akka-actor/src/main/scala/akka/util/cps/package.scala index 7287c72552..497ea564ce 100644 --- a/akka-actor/src/main/scala/akka/util/cps/package.scala +++ b/akka-actor/src/main/scala/akka/util/cps/package.scala @@ -5,36 +5,51 @@ import scala.util.continuations._ package object cps { def matchC[A, B, C, D](in: A)(pf: PartialFunction[A, B @cpsParam[C, D]]): B @cpsParam[C, D] = pf(in) - import TailCalls._ - def loopC[A](block: ⇒ Any @cps[A]): Unit @cps[A] = { - def f(): TailRec[Unit, A] @cps[A] = { - block - Call(() ⇒ f()) - } - tailcall[Unit, A](f()) + block + loopC(block) } - def whileC[A](test: ⇒ Boolean)(block: ⇒ Any @cps[A]): Unit @cps[A] = { - def f(): TailRec[Unit, A] @cps[A] = { - if (test) { - block - Call(() ⇒ f()) - } else shiftUnit(Return(())) - } - tailcall[Unit, A](f()) - } + def whileC[A](test: ⇒ Boolean)(block: ⇒ Unit @cps[A])(implicit loop: CPSLoop[A]): Unit @cps[A] = + loop.whileC(test)(block) def repeatC[A](times: Int)(block: ⇒ Any @cps[A]): Unit @cps[A] = { - var i = 0 - def f(): TailRec[Unit, A] @cps[A] = { - if (i < times) { - i += 1 - block - Call(() ⇒ f()) - } else shiftUnit(Return(())) + if (times > 0) { + block + repeatC(times - 1)(block) + } + } +} + +package cps { + object CPSLoop extends DefaultCPSLoop { + import akka.dispatch.{ Future, Promise } + + implicit val futureCPSLoop: CPSLoop[Future[Any]] = new CPSLoop[Future[Any]] { + def whileC(test: ⇒ Boolean)(block: ⇒ Unit @cps[Future[Any]]): Unit @cps[Future[Any]] = { + shift { c: (Unit ⇒ Future[Any]) ⇒ + Future.flow { + if (test) { + block + Future(()).apply + whileC(test)(block) + } + } flatMap c + } + } + } + } + trait CPSLoop[A] { + def whileC(test: ⇒ Boolean)(block: ⇒ Unit @cps[A]): Unit @cps[A] + } + trait DefaultCPSLoop { + implicit val defaultCPSLoop = new CPSLoop[Any] { + def whileC(test: ⇒ Boolean)(block: ⇒ Unit @cps[Any]): Unit @cps[Any] = { + if (test) { + block + whileC(test)(block) + } + } } - tailcall[Unit, A](f()) } - }