diff --git a/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala index e01a6e9eed..9e662b5535 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala @@ -8,10 +8,10 @@ import language.postfixOps import akka.testkit._ import scala.concurrent.util.duration._ -import Actor._ +import akka.actor.Actor._ import scala.concurrent.util.Duration import scala.concurrent.Await -import akka.pattern.ask +import akka.pattern.{ ask, pipe } object ForwardActorSpec { val ExpectedMessage = "FOO" @@ -33,12 +33,10 @@ object ForwardActorSpec { @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ForwardActorSpec extends AkkaSpec { import ForwardActorSpec._ - + implicit val ec = system.dispatcher "A Forward Actor" must { "forward actor reference when invoking forward on tell" in { - val latch = new TestLatch(1) - val replyTo = system.actorOf(Props(new Actor { def receive = { case ExpectedMessage ⇒ testActor ! ExpectedMessage } })) val chain = createForwardingChain(system) @@ -49,7 +47,7 @@ class ForwardActorSpec extends AkkaSpec { "forward actor reference when invoking forward on ask" in { val chain = createForwardingChain(system) - chain.ask(ExpectedMessage)(5 seconds) onSuccess { case ExpectedMessage ⇒ testActor ! ExpectedMessage } + chain.ask(ExpectedMessage)(5 seconds) pipeTo testActor expectMsg(5 seconds, ExpectedMessage) } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index 9cddba38a3..9d2af4ec7f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -275,6 +275,7 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout { } "an IO Actor" must { + implicit val ec = system.dispatcher "run echo server" in { filterException[java.net.ConnectException] { val addressPromise = Promise[SocketAddress]() diff --git a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala index 3894524487..426f940b5d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala @@ -41,7 +41,7 @@ class LocalActorRefProviderSpec extends AkkaSpec(LocalActorRefProviderSpec.confi } "An ActorRefFactory" must { - + implicit val ec = system.dispatcher "only create one instance of an actor with a specific address in a concurrent environment" in { val impl = system.asInstanceOf[ActorSystemImpl] val provider = impl.provider diff --git a/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala b/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala index 4c2cd5fd1e..ce605217d6 100644 --- a/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala +++ b/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala @@ -13,7 +13,7 @@ import akka.testkit.{ AkkaSpec, DefaultTimeout } import akka.pattern.{ ask, pipe } class Future2ActorSpec extends AkkaSpec with DefaultTimeout { - + implicit val ec = system.dispatcher "The Future2Actor bridge" must { "support convenient sending to multiple destinations" in { diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 0f8f1045fc..3556ee914f 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -11,6 +11,7 @@ import org.scalacheck.Gen._ import akka.actor._ import akka.testkit.{ EventFilter, filterEvents, filterException, AkkaSpec, DefaultTimeout, TestLatch } import scala.concurrent.{ Await, Future, Promise } +import scala.util.control.NonFatal import scala.concurrent.util.duration._ import scala.concurrent.ExecutionContext import org.scalatest.junit.JUnitSuite @@ -45,7 +46,7 @@ class JavaFutureSpec extends JavaFutureTests with JUnitSuite @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with DefaultTimeout { import FutureSpec._ - + implicit val ec = system.dispatcher "A Promise" when { "never completed" must { behave like emptyFuture(_(Promise().future)) @@ -85,10 +86,9 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } "have different ECs" in { - def namedCtx(n: String) = ExecutionContext.fromExecutorService( - Executors.newSingleThreadExecutor(new ThreadFactory { - def newThread(r: Runnable) = new Thread(r, n) - })) + def namedCtx(n: String) = + ExecutionContext.fromExecutorService( + Executors.newSingleThreadExecutor(new ThreadFactory { def newThread(r: Runnable) = new Thread(r, n) })) val A = namedCtx("A") val B = namedCtx("B") @@ -98,7 +98,10 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa // I would expect that any callback from p // is executed in the context of p - val result = p.future map { _ + Thread.currentThread().getName() } + val result = { + implicit val ec = A + p.future map { _ + Thread.currentThread().getName() } + } p.completeWith(Future { "Hi " }(B)) try { @@ -143,13 +146,13 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "pass checks" in { filterException[ArithmeticException] { check({ (future: Future[Int], actions: List[FutureAction]) ⇒ + def wrap[T](f: Future[T]): Either[Throwable, T] = try Await.ready(f, timeout.duration).value.get catch { case t ⇒ println(f.getClass + " - " + t.getClass + ": " + t.getMessage + ""); f.value.get } val result = (future /: actions)(_ /: _) - val expected = (Await.ready(future, timeout.duration).value.get /: actions)(_ /: _) - ((Await.ready(result, timeout.duration).value.get, expected) match { - case (Right(a), Right(b)) ⇒ a == b + val expected = (wrap(future) /: actions)(_ /: _) + ((wrap(result), expected) match { + case (Right(a), Right(b)) ⇒ a == b case (Left(a), Left(b)) if a.toString == b.toString ⇒ true - case (Left(a), Left(b)) if a.getStackTrace.isEmpty || b.getStackTrace.isEmpty ⇒ - a.getClass.toString == b.getClass.toString + case (Left(a), Left(b)) if a.getStackTrace.isEmpty || b.getStackTrace.isEmpty ⇒ a.getClass.toString == b.getClass.toString case _ ⇒ false }) :| result.value.get.toString + " is expected to be " + expected.toString }, minSuccessful(10000), workers(4)) @@ -862,7 +865,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val l1, l2 = new TestLatch val complex = Future() map { _ ⇒ - scala.concurrent.impl.InternalFutureUtil.releaseFutureStack(implicitly[ExecutionContext]) + //FIXME implement _taskStack for Futures val nested = Future(()) nested foreach (_ ⇒ l1.open()) Await.ready(l1, TestLatch.DefaultTimeout) // make sure nested is completed @@ -896,7 +899,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "filter result" in { f { (future, result) ⇒ Await.result((future filter (_ ⇒ true)), timeout.duration) must be(result) - (evaluating { Await.result((future filter (_ ⇒ false)), timeout.duration) } must produce[MatchError]).getMessage must startWith(result.toString) + (evaluating { Await.result((future filter (_ ⇒ false)), timeout.duration) } must produce[java.util.NoSuchElementException]).getMessage must endWith(result.toString) } } "transform result with map" in { f((future, result) ⇒ Await.result((future map (_.toString.length)), timeout.duration) must be(result.toString.length)) } @@ -927,7 +930,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa Await.result(p.future, timeout.duration) must be(result) } } - "not project a failure" in { f((future, result) ⇒ (evaluating { Await.result(future.failed, timeout.duration) } must produce[NoSuchElementException]).getMessage must be("Future.failed not completed with a throwable. Instead completed with: " + result)) } + "not project a failure" in { f((future, result) ⇒ (evaluating { Await.result(future.failed, timeout.duration) } must produce[NoSuchElementException]).getMessage must be("Future.failed not completed with a throwable.")) } "not perform action on exception" is pending "cast using mapTo" in { f((future, result) ⇒ Await.result(future.mapTo[Boolean].recover({ case _: ClassCastException ⇒ false }), timeout.duration) must be(false)) } } @@ -941,20 +944,20 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa future.value.get.left.get.getMessage must be(message) }) } - "throw exception with 'get'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } must produce[E]).getMessage must be(message)) } - "throw exception with 'Await.result'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } must produce[E]).getMessage must be(message)) } + "throw exception with 'get'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } must produce[java.lang.Exception]).getMessage must be(message)) } + "throw exception with 'Await.result'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } must produce[java.lang.Exception]).getMessage must be(message)) } "retain exception with filter" in { f { (future, message) ⇒ - (evaluating { Await.result(future filter (_ ⇒ true), timeout.duration) } must produce[E]).getMessage must be(message) - (evaluating { Await.result(future filter (_ ⇒ false), timeout.duration) } must produce[E]).getMessage must be(message) + (evaluating { Await.result(future filter (_ ⇒ true), timeout.duration) } must produce[java.lang.Exception]).getMessage must be(message) + (evaluating { Await.result(future filter (_ ⇒ false), timeout.duration) } must produce[java.lang.Exception]).getMessage must be(message) } } - "retain exception with map" in { f((future, message) ⇒ (evaluating { Await.result(future map (_.toString.length), timeout.duration) } must produce[E]).getMessage must be(message)) } - "retain exception with flatMap" in { f((future, message) ⇒ (evaluating { Await.result(future flatMap (_ ⇒ Promise.successful[Any]("foo").future), timeout.duration) } must produce[E]).getMessage must be(message)) } + "retain exception with map" in { f((future, message) ⇒ (evaluating { Await.result(future map (_.toString.length), timeout.duration) } must produce[java.lang.Exception]).getMessage must be(message)) } + "retain exception with flatMap" in { f((future, message) ⇒ (evaluating { Await.result(future flatMap (_ ⇒ Promise.successful[Any]("foo").future), timeout.duration) } must produce[java.lang.Exception]).getMessage must be(message)) } "not perform action with foreach" is pending "zip properly" in { - f { (future, message) ⇒ (evaluating { Await.result(future zip Promise.successful("foo").future, timeout.duration) } must produce[E]).getMessage must be(message) } + f { (future, message) ⇒ (evaluating { Await.result(future zip Promise.successful("foo").future, timeout.duration) } must produce[java.lang.Exception]).getMessage must be(message) } } "recover from exception" in { f((future, message) ⇒ Await.result(future.recover({ case e if e.getMessage == message ⇒ "pigdog" }), timeout.duration) must be("pigdog")) } "not perform action on result" is pending @@ -966,7 +969,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa Await.result(p.future, timeout.duration) must be(message) } } - "always cast successfully using mapTo" in { f((future, message) ⇒ (evaluating { Await.result(future.mapTo[java.lang.Thread], timeout.duration) } must produce[E]).getMessage must be(message)) } + "always cast successfully using mapTo" in { f((future, message) ⇒ (evaluating { Await.result(future.mapTo[java.lang.Thread], timeout.duration) } must produce[java.lang.Exception]).getMessage must be(message)) } } sealed trait IntAction { def apply(that: Int): Int } @@ -983,15 +986,15 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa case class MapAction(action: IntAction) extends FutureAction { def /:(that: Either[Throwable, Int]): Either[Throwable, Int] = that match { case Left(e) ⇒ that - case Right(r) ⇒ try { Right(action(r)) } catch { case e: RuntimeException ⇒ Left(e) } + case Right(r) ⇒ try { Right(action(r)) } catch { case e if NonFatal(e) ⇒ Left(e) } } - def /:(that: Future[Int]): Future[Int] = that map (action(_)) + def /:(that: Future[Int]): Future[Int] = that map action.apply } case class FlatMapAction(action: IntAction) extends FutureAction { def /:(that: Either[Throwable, Int]): Either[Throwable, Int] = that match { case Left(e) ⇒ that - case Right(r) ⇒ try { Right(action(r)) } catch { case e: RuntimeException ⇒ Left(e) } + case Right(r) ⇒ try { Right(action(r)) } catch { case e if NonFatal(e) ⇒ Left(e) } } def /:(that: Future[Int]): Future[Int] = that flatMap (n ⇒ Future(action(n))) } diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala index 29b1c9216b..7bf5afa6f9 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala @@ -8,7 +8,7 @@ import scala.concurrent.util.duration._ import scala.concurrent.{ Promise, Future, Await } class CircuitBreakerMTSpec extends AkkaSpec { - + implicit val ec = system.dispatcher "A circuit breaker being called by many threads" must { val breaker = new CircuitBreaker(system.scheduler, 5, 100.millis.dilated, 500.millis.dilated) diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala index d0c0fcc309..8567283df1 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala @@ -20,7 +20,7 @@ object CircuitBreakerSpec { @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter { - + implicit val ec = system.dispatcher import CircuitBreakerSpec.TestException val awaitTimeout = 2.seconds.dilated diff --git a/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala index caaf0908b9..1c41364d05 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala @@ -23,7 +23,7 @@ object PatternSpec { @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class PatternSpec extends AkkaSpec { - + implicit val ec = system.dispatcher import PatternSpec._ "pattern.gracefulStop" must { diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index dc6973c214..e0a60533c8 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -67,7 +67,7 @@ object RoutingSpec { @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with ImplicitSender { - + implicit val ec = system.dispatcher import akka.routing.RoutingSpec._ "routers in general" must { diff --git a/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala b/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala index daae7e7a94..d2d87b4c14 100644 --- a/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala @@ -11,7 +11,7 @@ import scala.util.Random import akka.testkit.DefaultTimeout class IndexSpec extends AkkaSpec with MustMatchers with DefaultTimeout { - + implicit val ec = system.dispatcher private def emptyIndex = new Index[String, Int](100, _ compareTo _) private def indexWithValues = { diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 85507ebcb8..bf088d4d21 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -587,6 +587,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, def start(): this.type = _start private lazy val terminationCallbacks = { + implicit val d = dispatcher val callbacks = new TerminationCallbacks terminationFuture onComplete (_ ⇒ callbacks.run) callbacks diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index f75c804440..21f4612750 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -81,25 +81,6 @@ class Dispatcher( } } - override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = { - scala.concurrent.impl.InternalFutureUtil.releaseFutureStack(this) - - executorService.executor match { - case fj: ForkJoinPool ⇒ - val result = new AtomicReference[Option[T]](None) - ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { - def block(): Boolean = { - result.set(Some(awaitable.result(atMost)(scala.concurrent.impl.InternalFutureUtil.canAwaitEvidence))) - true - } - def isReleasable = result.get.isDefined - }) - result.get.get // Exception intended if None - case _ ⇒ - awaitable.result(atMost)(scala.concurrent.impl.InternalFutureUtil.canAwaitEvidence) - } - } - /** * INTERNAL USE ONLY */ diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 6d94449020..53da0486c0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -41,6 +41,7 @@ object Futures { * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate */ def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], executor: ExecutionContext): Future[JOption[T]] = { + implicit val ec = executor Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)))(predicate.apply(_))(executor).map(JOption.fromScalaOption(_)) } @@ -95,26 +96,6 @@ object Futures { for (r ← fr; b ← fb) yield { r add b; r } } } - - /** - * Signals that the current thread of execution will potentially engage - * an action that will take a non-trivial amount of time, perhaps by using blocking.IO or using a lot of CPU time, - * giving the system a chance to spawn new threads, reuse old threads or otherwise, - * to prevent starvation and/or unfairness. - * - * Assures that any Future tasks initiated in the current thread will be - * executed asynchronously, including any tasks currently queued to be - * executed in the current thread. This is needed if the current task may - * block, causing delays in executing the remaining tasks which in some - * cases may cause a deadlock. - * - * Usage: Call this method in a callback (map, flatMap etc also count) to a Future, - * if you will be doing blocking in the callback. - * - * Note: Calling 'Await.result(future)' or 'Await.ready(future)' will automatically trigger this method. - * - */ - def blocking(): Unit = scala.concurrent.impl.InternalFutureUtil.releaseFutureStack(ExecutionContext.defaultExecutionContext) //FIXME NOT CORRECT EC } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index fca650da0c..20fae75ef1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -5,20 +5,23 @@ package akka.dispatch import java.util.Collection +import scala.concurrent.{ Awaitable, BlockContext } import scala.concurrent.util.Duration import scala.concurrent.forkjoin._ -import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.BlockingQueue -import java.util.concurrent.Callable -import java.util.concurrent.ExecutorService -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.RejectedExecutionHandler -import java.util.concurrent.RejectedExecutionException -import java.util.concurrent.SynchronousQueue -import java.util.concurrent.TimeUnit -import java.util.concurrent.ThreadFactory -import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.{ + ArrayBlockingQueue, + BlockingQueue, + Callable, + ExecutorService, + LinkedBlockingQueue, + RejectedExecutionHandler, + RejectedExecutionException, + SynchronousQueue, + TimeUnit, + ThreadFactory, + ThreadPoolExecutor +} +import java.util.concurrent.atomic.{ AtomicReference, AtomicLong } object ThreadPoolConfig { type QueueFactory = () ⇒ BlockingQueue[Runnable] @@ -154,6 +157,22 @@ case class ThreadPoolConfigBuilder(config: ThreadPoolConfig) { object MonitorableThreadFactory { val doNothing: Thread.UncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() { def uncaughtException(thread: Thread, cause: Throwable) = () } + + private[akka] class AkkaForkJoinWorkerThread(_pool: ForkJoinPool) extends ForkJoinWorkerThread(_pool) with BlockContext { + override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = { + val result = new AtomicReference[Option[T]](None) + ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { + def block(): Boolean = { + result.set(Some(awaitable.result(atMost)(scala.concurrent.impl.InternalFutureUtil.canAwaitEvidence))) + // FIXME replace with + //result.set(Some(BlockContext.DefaultBlockContext.internalBlockingCall(awaitable, atMost))) + true + } + def isReleasable = result.get.isDefined + }) + result.get.get // Exception intended if None + } + } } case class MonitorableThreadFactory(name: String, @@ -164,7 +183,7 @@ case class MonitorableThreadFactory(name: String, protected val counter = new AtomicLong def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = { - val t = wire(ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool)) + val t = wire(new MonitorableThreadFactory.AkkaForkJoinWorkerThread(pool)) // Name of the threads for the ForkJoinPool are not customizable. Change it here. t.setName(name + "-" + counter.incrementAndGet()) t diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index e4134e01da..2efd17aed9 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -9,7 +9,7 @@ import java.util.concurrent.TimeoutException import annotation.tailrec import akka.actor._ import akka.dispatch._ -import scala.concurrent.{ Future, Promise } +import scala.concurrent.{ Future, Promise, ExecutionContext } import akka.util.{ NonFatal, Timeout, Unsafe } /** @@ -305,6 +305,7 @@ private[akka] object PromiseActorRef { private case class StoppedWithPath(path: ActorPath) def apply(provider: ActorRefProvider, timeout: Timeout): PromiseActorRef = { + implicit val ec = provider.dispatcher // TODO should we take an ExecutionContext in the method signature? val result = Promise[Any]() val a = new PromiseActorRef(provider, result) val f = provider.scheduler.scheduleOnce(timeout.duration) { result.tryComplete(Left(new AskTimeoutException("Timed out"))) } diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index d6d17907b9..c1dbec4540 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -23,7 +23,6 @@ object CircuitBreaker { */ private[CircuitBreaker] val syncExecutionContext = new ExecutionContext { override def execute(runnable: Runnable): Unit = runnable.run() - override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost)(scala.concurrent.impl.InternalFutureUtil.canAwaitEvidence) override def reportFailure(t: Throwable): Unit = () } diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala index 240017dd76..43b21eda88 100644 --- a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -39,6 +39,7 @@ trait GracefulStopSupport { if (target.isTerminated) Promise.successful(true).future else system match { case e: ExtendedActorSystem ⇒ + implicit val d = e.dispatcher // TODO take implicit ExecutionContext/MessageDispatcher in method signature? val internalTarget = target.asInstanceOf[InternalActorRef] val ref = PromiseActorRef(e.provider, Timeout(timeout)) internalTarget.sendSystemMessage(Watch(target, ref)) diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index 348ae54e95..8dda900e35 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -91,7 +91,7 @@ object Patterns { * Patterns.pipe(transformed).to(nextActor); * }}} */ - def pipe[T](future: Future[T]): PipeableFuture[T] = scalaPipe(future) + def pipe[T](future: Future[T], context: ExecutionContext): PipeableFuture[T] = scalaPipe(future)(context) /** * Returns a [[scala.concurrent.Future]] that will be completed with success (value `true`) when diff --git a/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala b/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala index 20bb944478..8eb3859f12 100644 --- a/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala @@ -5,12 +5,12 @@ package akka.pattern import language.implicitConversions -import scala.concurrent.{ Future } +import scala.concurrent.{ Future, ExecutionContext } import akka.actor.{ Status, ActorRef } trait PipeToSupport { - final class PipeableFuture[T](val future: Future[T]) { + final class PipeableFuture[T](val future: Future[T])(implicit executionContext: ExecutionContext) { def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = null): Future[T] = { future onComplete { case Right(r) ⇒ recipient ! r @@ -39,5 +39,5 @@ trait PipeToSupport { * * }}} */ - implicit def pipe[T](future: Future[T]): PipeableFuture[T] = new PipeableFuture(future) + implicit def pipe[T](future: Future[T])(implicit executionContext: ExecutionContext): PipeableFuture[T] = new PipeableFuture(future) } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 5838babd11..4667da129a 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -1075,6 +1075,7 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ { case (sender, message) ⇒ val provider: ActorRefProvider = routeeProvider.context.asInstanceOf[ActorCell].systemImpl.provider + implicit val ec = provider.dispatcher val asker = akka.pattern.PromiseActorRef(provider, within) asker.result.future.pipeTo(sender) toAll(asker, routeeProvider.routees) diff --git a/akka-actor/src/main/scala/scala/concurrent/impl/InternalFutureUtil.scala b/akka-actor/src/main/scala/scala/concurrent/impl/InternalFutureUtil.scala index c39365f293..6d2653c559 100644 --- a/akka-actor/src/main/scala/scala/concurrent/impl/InternalFutureUtil.scala +++ b/akka-actor/src/main/scala/scala/concurrent/impl/InternalFutureUtil.scala @@ -3,6 +3,5 @@ package scala.concurrent.impl import scala.concurrent.ExecutionContext object InternalFutureUtil { - @inline final def releaseFutureStack(ec: ExecutionContext): Unit = Future.releaseStack(ec) @inline final def canAwaitEvidence = scala.concurrent.Await.canAwaitEvidence } \ No newline at end of file diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 84a433d01f..2c327575c0 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -208,8 +208,6 @@ class CallingThreadDispatcher( protected[akka] override def executeTask(invocation: TaskInvocation) { invocation.run } - override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost)(scala.concurrent.impl.InternalFutureUtil.canAwaitEvidence) - /* * This method must be called with this thread's queue, which must already * have been entered (active). When this method returns, the queue will be diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index c691c25eb9..9eeae87485 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -20,17 +20,17 @@ import Sphinx.{ sphinxDocs, sphinxHtml, sphinxLatex, sphinxPdf, sphinxPygments, object AkkaBuild extends Build { System.setProperty("akka.mode", "test") // Is there better place for this? - lazy val desiredScalaVersion = "2.10.0-M4" + lazy val desiredScalaVersion = "2.10.0-M5" lazy val buildSettings = Seq( organization := "com.typesafe.akka", version := "2.1-SNAPSHOT", - //scalaVersion := "2.10.0-M4" - scalaVersion := "2.10.0-SNAPSHOT", + scalaVersion := desiredScalaVersion + /*scalaVersion := "2.10.0-SNAPSHOT", scalaVersion in update <<= (scalaVersion) apply { case "2.10.0-SNAPSHOT" => desiredScalaVersion case x => x - } + }*/ ) lazy val akka = Project( @@ -545,8 +545,8 @@ object Dependency { val junit = "junit" % "junit" % "4.10" % "test" // Common Public License 1.0 val logback = "ch.qos.logback" % "logback-classic" % "1.0.4" % "test" // EPL 1.0 / LGPL 2.1 val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" // MIT - val scalatest = "org.scalatest" % v("scalatest") % "1.9-2.10.0-M4-B2" % "test" // ApacheV2 - val scalacheck = "org.scalacheck" % v("scalacheck") % "1.10.0-b1" % "test" // New BSD + val scalatest = "org.scalatest" % v("scalatest") % "1.9-2.10.0-M5-B2" % "test" // ApacheV2 + val scalacheck = "org.scalacheck" % v("scalacheck") % "1.10.0" % "test" // New BSD val specs2 = "org.specs2" % "specs2_2.10" % "1.11" % "test" // Modified BSD / ApacheV2 val ariesProxy = "org.apache.aries.proxy" % "org.apache.aries.proxy.impl" % "0.3" % "test" // ApacheV2 val pojosr = "com.googlecode.pojosr" % "de.kalpatec.pojosr.framework" % "0.1.4" % "test" // ApacheV2