Initial work in moving to scala.concurrent.Await + Awaitable

This commit is contained in:
Viktor Klang 2012-06-29 16:06:26 +02:00
parent 1f6f2c1897
commit 0bf45a9403
87 changed files with 170 additions and 260 deletions

View file

@ -4,6 +4,7 @@ import akka.util.Timeout;
import akka.actor.ActorSystem;
import akka.japi.*;
import scala.concurrent.Await;
import scala.concurrent.util.Duration;
import akka.testkit.TestKitExtension;
import org.junit.AfterClass;

View file

@ -7,7 +7,7 @@ package akka.actor
import akka.testkit._
import org.scalatest.BeforeAndAfterEach
import scala.concurrent.util.duration._
import akka.dispatch.Await
import scala.concurrent.Await
import akka.pattern.ask
object ActorFireForgetRequestReplySpec {

View file

@ -13,7 +13,7 @@ import akka.actor.Actor._
import akka.testkit._
import scala.concurrent.util.duration._
import java.util.concurrent.atomic._
import akka.dispatch.Await
import scala.concurrent.Await
import akka.pattern.ask
import java.util.UUID.{ randomUUID newUuid }

View file

@ -7,7 +7,7 @@ import language.postfixOps
import akka.testkit._
import scala.concurrent.util.duration._
import akka.dispatch.Await
import scala.concurrent.Await
import akka.pattern.ask
import java.net.MalformedURLException

View file

@ -12,12 +12,11 @@ import org.scalatest.matchers.MustMatchers
import akka.testkit._
import akka.util.Timeout
import scala.concurrent.util.duration._
import scala.concurrent.Await
import java.lang.IllegalStateException
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.dispatch.{ Await, DefaultPromise, Promise, Future }
import akka.dispatch.Promise
import akka.pattern.ask
import akka.serialization.JavaSerializer
import akka.actor.NonPublicClass
object ActorRefSpec {

View file

@ -8,7 +8,7 @@ import language.postfixOps
import akka.testkit._
import org.scalatest.junit.JUnitSuite
import com.typesafe.config.ConfigFactory
import akka.dispatch.Await
import scala.concurrent.Await
import scala.concurrent.util.duration._
import scala.collection.JavaConverters
import java.util.concurrent.{ TimeUnit, RejectedExecutionException, CountDownLatch, ConcurrentLinkedQueue }

View file

@ -5,7 +5,7 @@ package akka.actor
import scala.concurrent.util.duration._
import akka.testkit._
import akka.dispatch.Await
import scala.concurrent.Await
import akka.util.Timeout
import akka.pattern.{ ask, AskTimeoutException }

View file

@ -8,8 +8,9 @@ import language.postfixOps
import akka.testkit._
import akka.testkit.DefaultTimeout
import akka.testkit.TestEvent._
import akka.dispatch.{ Await, BoundedDequeBasedMailbox }
import akka.dispatch.BoundedDequeBasedMailbox
import akka.pattern.ask
import scala.concurrent.Await
import scala.concurrent.util.duration._
import akka.actor.ActorSystem.Settings
import com.typesafe.config.{ Config, ConfigFactory }

View file

@ -8,7 +8,7 @@ import language.postfixOps
import akka.testkit._
import akka.testkit.DefaultTimeout
import akka.testkit.TestEvent._
import akka.dispatch.Await
import scala.concurrent.Await
import akka.pattern.ask
import scala.concurrent.util.duration._
import com.typesafe.config.{ Config, ConfigFactory }

View file

@ -9,7 +9,7 @@ import language.postfixOps
import akka.testkit._
import scala.concurrent.util.duration._
import java.util.concurrent.atomic._
import akka.dispatch.Await
import scala.concurrent.Await
import akka.pattern.ask
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])

View file

@ -12,7 +12,7 @@ import TestEvent.Mute
import scala.concurrent.util.duration._
import akka.event._
import com.typesafe.config.ConfigFactory
import akka.dispatch.Await
import scala.concurrent.Await
import akka.util.Timeout
import scala.concurrent.util.Duration

View file

@ -10,7 +10,7 @@ import akka.testkit._
import scala.concurrent.util.duration._
import Actor._
import scala.concurrent.util.Duration
import akka.dispatch.Await
import scala.concurrent.Await
import akka.pattern.ask
object ForwardActorSpec {

View file

@ -7,11 +7,12 @@ package akka.actor
import language.postfixOps
import akka.util.ByteString
import scala.concurrent.{ ExecutionContext, Await }
import scala.concurrent.util.{ Duration, Deadline }
import scala.concurrent.util.duration._
import scala.util.continuations._
import akka.testkit._
import akka.dispatch.{ Await, Future, Promise, ExecutionContext, MessageDispatcher }
import akka.dispatch.{ Future, Promise, MessageDispatcher }
import java.net.{ SocketAddress }
import akka.pattern.ask

View file

@ -7,9 +7,10 @@ package akka.actor
import language.postfixOps
import akka.testkit._
import scala.concurrent.Await
import scala.concurrent.util.duration._
import akka.util.Timeout
import akka.dispatch.{ Await, Future }
import akka.dispatch.Future
object LocalActorRefProviderSpec {
val config = """

View file

@ -10,7 +10,7 @@ import akka.testkit._
import scala.concurrent.util.duration._
import java.util.concurrent.atomic.AtomicInteger
import akka.dispatch.Await
import scala.concurrent.Await
import java.util.concurrent.TimeoutException
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])

View file

@ -8,7 +8,7 @@ import language.postfixOps
import java.lang.Thread.sleep
import org.scalatest.BeforeAndAfterAll
import akka.dispatch.Await
import scala.concurrent.Await
import akka.testkit.TestEvent._
import akka.testkit.EventFilter
import java.util.concurrent.{ TimeUnit, CountDownLatch }

View file

@ -6,7 +6,7 @@ import org.scalatest.BeforeAndAfterEach
import scala.concurrent.util.duration._
import java.util.concurrent.{ CountDownLatch, ConcurrentLinkedQueue, TimeUnit }
import akka.testkit._
import akka.dispatch.Await
import scala.concurrent.Await
import akka.pattern.ask
import java.util.concurrent.atomic.AtomicInteger

View file

@ -8,7 +8,7 @@ import language.postfixOps
import akka.testkit._
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.dispatch.Await
import scala.concurrent.Await
import akka.pattern.ask
import scala.concurrent.util.Duration
import scala.concurrent.util.duration._

View file

@ -6,7 +6,8 @@ package akka.actor
import language.postfixOps
import akka.testkit.{ filterEvents, EventFilter }
import akka.dispatch.{ PinnedDispatcher, Dispatchers, Await }
import scala.concurrent.Await
import akka.dispatch.{ PinnedDispatcher, Dispatchers }
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout

View file

@ -12,7 +12,7 @@ import akka.{ Die, Ping }
import akka.testkit.TestEvent._
import akka.testkit._
import java.util.concurrent.atomic.AtomicInteger
import akka.dispatch.Await
import scala.concurrent.Await
import akka.pattern.ask
object SupervisorSpec {

View file

@ -7,13 +7,11 @@ import language.postfixOps
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import scala.concurrent.Await
import scala.concurrent.util.duration._
import akka.actor.Actor._
import akka.testkit.{ TestKit, EventFilter, filterEvents, filterException }
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import akka.testkit.DefaultTimeout
import akka.dispatch.{ Await, Dispatchers }
import akka.testkit.{ TestKit, EventFilter, filterEvents, filterException, AkkaSpec, ImplicitSender, DefaultTimeout }
import akka.dispatch.Dispatchers
import akka.pattern.ask
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])

View file

@ -12,7 +12,7 @@ import akka.testkit.{ TestKit, filterEvents, EventFilter }
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import akka.testkit.DefaultTimeout
import akka.dispatch.Await
import scala.concurrent.Await
import akka.pattern.ask
import scala.concurrent.util.duration._

View file

@ -6,8 +6,9 @@ package akka.actor
import language.postfixOps
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import scala.concurrent.util.Duration
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.util.Duration
import scala.concurrent.util.duration._
import java.util.concurrent.atomic.AtomicReference
import annotation.tailrec
@ -15,7 +16,7 @@ import akka.testkit.{ EventFilter, filterEvents, AkkaSpec }
import akka.serialization.SerializationExtension
import akka.japi.{ Creator, Option JOption }
import akka.testkit.DefaultTimeout
import akka.dispatch.{ Await, Dispatchers, Future, Promise }
import akka.dispatch.{ Dispatchers, Future, Promise }
import akka.pattern.ask
import akka.serialization.JavaSerializer
import akka.actor.TypedActor._

View file

@ -23,6 +23,7 @@ import akka.testkit._
import akka.util.{ Timeout, Switch }
import scala.concurrent.util.duration._
import scala.concurrent.util.Duration
import scala.concurrent.Await
import scala.annotation.tailrec
object ActorModelSpec {

View file

@ -6,10 +6,11 @@ import java.util.concurrent.{ CountDownLatch, TimeUnit }
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
import akka.testkit.{ filterEvents, EventFilter, AkkaSpec }
import akka.actor.{ Props, Actor }
import scala.concurrent.Await
import scala.concurrent.util.Duration
import scala.concurrent.util.duration._
import akka.testkit.DefaultTimeout
import akka.dispatch.{ Await, PinnedDispatcher, Dispatchers, Dispatcher }
import akka.dispatch.{ PinnedDispatcher, Dispatchers, Dispatcher }
import akka.pattern.ask
object DispatcherActorSpec {

View file

@ -6,7 +6,8 @@ import akka.testkit._
import akka.actor.{ Props, Actor }
import akka.testkit.AkkaSpec
import org.scalatest.BeforeAndAfterEach
import akka.dispatch.{ Await, PinnedDispatcher, Dispatchers }
import akka.dispatch.{ PinnedDispatcher, Dispatchers }
import scala.concurrent.Await
import akka.pattern.ask
object PinnedActorSpec {

View file

@ -5,7 +5,7 @@ import akka.actor._
import akka.actor.Actor._
import akka.routing._
import java.util.concurrent.atomic.AtomicInteger
import akka.dispatch.Await
import scala.concurrent.Await
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ListenerSpec extends AkkaSpec {

View file

@ -6,10 +6,10 @@ package akka.dataflow
import language.postfixOps
import akka.actor.{ Actor, Props }
import akka.dispatch.{ Future, Await }
import akka.dispatch.Future
import scala.concurrent.Await
import scala.concurrent.util.duration._
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
import akka.testkit.{ AkkaSpec, DefaultTimeout }
import akka.pattern.{ ask, pipe }
class Future2ActorSpec extends AkkaSpec with DefaultTimeout {

View file

@ -3,6 +3,7 @@ package akka.dispatch
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
import java.util.concurrent.{ ExecutorService, Executor, Executors }
import scala.concurrent.ExecutionContext
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
@ -18,12 +19,12 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
val executorService: ExecutorService with ExecutionContext = ExecutionContext.fromExecutorService(es)
executorService must not be (null)
val jExecutor: ExecutionContextExecutor = ExecutionContexts.fromExecutor(es)
/*val jExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(es)
jExecutor must not be (null)
val jExecutorService: ExecutionContextExecutorService = ExecutionContexts.fromExecutorService(es)
jExecutorService must not be (null)
*/
} finally {
es.shutdown
}

View file

@ -9,12 +9,11 @@ import org.scalacheck.Arbitrary._
import org.scalacheck.Prop._
import org.scalacheck.Gen._
import akka.actor._
import akka.testkit.{ EventFilter, filterEvents, filterException }
import akka.testkit.{ EventFilter, filterEvents, filterException, AkkaSpec, DefaultTimeout, TestLatch }
import scala.concurrent.Await
import scala.concurrent.util.duration._
import akka.testkit.AkkaSpec
import scala.concurrent.ExecutionContext
import org.scalatest.junit.JUnitSuite
import akka.testkit.DefaultTimeout
import akka.testkit.TestLatch
import scala.runtime.NonLocalReturnControl
import akka.pattern.ask
import java.lang.{ IllegalStateException, ArithmeticException }
@ -86,7 +85,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
"have different ECs" in {
def namedCtx(n: String) = ExecutionContexts.fromExecutorService(
def namedCtx(n: String) = ExecutionContext.fromExecutorService(
Executors.newSingleThreadExecutor(new ThreadFactory {
def newThread(r: Runnable) = new Thread(r, n)
}))
@ -105,8 +104,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
try {
Await.result(result, timeout.duration) must be === "Hi A"
} finally {
A.asInstanceOf[ExecutorService].shutdown()
B.asInstanceOf[ExecutorService].shutdown()
A.shutdown()
B.shutdown()
}
}
}

View file

@ -10,6 +10,7 @@ import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll }
import com.typesafe.config.Config
import akka.actor.{ RepointableRef, Props, DeadLetter, ActorSystem, ActorRefWithCell, ActorRef, ActorCell }
import akka.testkit.AkkaSpec
import scala.concurrent.Await
import scala.concurrent.util.duration.intToDurationInt
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])

View file

@ -9,6 +9,7 @@ import com.typesafe.config.Config
import akka.actor.{ Props, InternalActorRef, ActorSystem, Actor }
import akka.pattern.ask
import akka.testkit.{ DefaultTimeout, AkkaSpec }
import scala.concurrent.Await
import scala.concurrent.util.duration.intToDurationInt
object PriorityDispatcherSpec {

View file

@ -6,7 +6,8 @@ package akka.pattern
import akka.testkit._
import scala.concurrent.util.duration._
import org.scalatest.BeforeAndAfter
import akka.dispatch.{ Promise, Await, Future }
import akka.dispatch.{ Promise, Future }
import scala.concurrent.Await
class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter {

View file

@ -10,7 +10,7 @@ import scala.concurrent.util.duration._
import akka.testkit._
import org.scalatest.BeforeAndAfter
import akka.dispatch.Future
import akka.dispatch.Await
import scala.concurrent.Await
object CircuitBreakerSpec {

View file

@ -7,11 +7,11 @@ package akka.pattern
import language.postfixOps
import akka.testkit.AkkaSpec
import akka.actor.Props
import akka.actor.Actor
import akka.actor.{ Props, Actor }
import scala.concurrent.Await
import scala.concurrent.util.Duration
import scala.concurrent.util.duration._
import akka.dispatch.{ Future, Promise, Await }
import akka.dispatch.{ Future, Promise }
object PatternSpec {
case class Work(duration: Duration)

View file

@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicInteger
import org.junit.runner.RunWith
import akka.actor.{ Props, Deploy, Actor, ActorRef }
import akka.ConfigurationException
import akka.dispatch.Await
import scala.concurrent.Await
import akka.pattern.{ ask, gracefulStop }
import akka.testkit.{ TestLatch, ImplicitSender, DefaultTimeout, AkkaSpec }
import scala.concurrent.util.duration.intToDurationInt

View file

@ -8,7 +8,7 @@ import language.postfixOps
import akka.actor.Actor
import akka.testkit._
import akka.actor.Props
import akka.dispatch.Await
import scala.concurrent.Await
import scala.concurrent.util.duration._
import akka.actor.ActorRef
import java.util.concurrent.atomic.AtomicInteger

View file

@ -10,7 +10,7 @@ import akka.actor._
import scala.collection.mutable.LinkedList
import akka.testkit._
import scala.concurrent.util.duration._
import akka.dispatch.Await
import scala.concurrent.Await
import scala.concurrent.util.Duration
import akka.ConfigurationException
import com.typesafe.config.ConfigFactory

View file

@ -9,7 +9,7 @@ import language.postfixOps
import akka.testkit.{ AkkaSpec, EventFilter }
import akka.actor._
import java.io._
import akka.dispatch.Await
import scala.concurrent.Await
import akka.util.Timeout
import scala.concurrent.util.duration._
import scala.reflect.BeanInfo

View file

@ -4,8 +4,9 @@
package akka.util
import org.scalatest.matchers.MustMatchers
import akka.dispatch.{ Future, Await }
import akka.dispatch.Future
import akka.testkit.AkkaSpec
import scala.concurrent.Await
import scala.util.Random
import akka.testkit.DefaultTimeout

View file

@ -11,7 +11,7 @@ import com.typesafe.config.{ Config, ConfigFactory }
import scala.annotation.tailrec
import scala.concurrent.util.Duration
import java.io.Closeable
import akka.dispatch.Await.{ Awaitable, CanAwait }
import scala.concurrent.{ Await, Awaitable, CanAwait }
import akka.util._
import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap }
import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }

View file

@ -6,7 +6,8 @@ package akka.actor
import language.higherKinds
import language.postfixOps
import akka.dispatch.{ Future, ExecutionContext }
import akka.dispatch.Future
import scala.concurrent.ExecutionContext
import scala.concurrent.util.Duration
import akka.util.{ ByteString, NonFatal }
import java.net.{ SocketAddress, InetSocketAddress }

View file

@ -9,6 +9,7 @@ import akka.japi.{ Creator, Option ⇒ JOption }
import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy }
import akka.util.{ Timeout, NonFatal }
import scala.concurrent.util.Duration
import scala.concurrent.Await
import akka.util.Reflect.instantiator
import java.util.concurrent.atomic.{ AtomicReference AtomVar }
import akka.dispatch._

View file

@ -5,17 +5,17 @@
package akka.dispatch
import java.util.concurrent._
import akka.event.Logging.Error
import akka.event.Logging.{ Error, LogEventException }
import akka.actor._
import akka.actor.ActorSystem
import scala.annotation.tailrec
import akka.event.EventStream
import com.typesafe.config.Config
import akka.serialization.SerializationExtension
import akka.event.Logging.LogEventException
import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool }
import akka.util.{ Unsafe, NonFatal, Index }
import scala.concurrent.util.Duration
import scala.concurrent.ExecutionContext
import scala.concurrent.{ Await, Awaitable }
final case class Envelope private (val message: Any, val sender: ActorRef)
@ -123,88 +123,6 @@ final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cl
} finally cleanup()
}
/**
* Java API to create ExecutionContexts
*/
object ExecutionContexts {
/**
* Creates an ExecutionContext from the given ExecutorService
*/
def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService =
new ExecutionContext.WrappedExecutorService(e)
/**
* Creates an ExecutionContext from the given Executor
*/
def fromExecutor(e: Executor): ExecutionContextExecutor =
new ExecutionContext.WrappedExecutor(e)
}
object ExecutionContext {
implicit def defaultExecutionContext(implicit system: ActorSystem): ExecutionContext = system.dispatcher
/**
* Creates an ExecutionContext from the given ExecutorService
*/
def fromExecutorService(e: ExecutorService): ExecutionContext with ExecutorService = new WrappedExecutorService(e)
/**
* Creates an ExecutionContext from the given Executor
*/
def fromExecutor(e: Executor): ExecutionContext with Executor = new WrappedExecutor(e)
/**
* Internal Akka use only
*/
private[akka] class WrappedExecutorService(val executor: ExecutorService) extends ExecutorServiceDelegate with ExecutionContextExecutorService {
override def reportFailure(t: Throwable): Unit = t match {
case e: LogEventException e.getCause.printStackTrace()
case _ t.printStackTrace()
}
}
/**
* Internal Akka use only
*/
private[akka] class WrappedExecutor(val executor: Executor) extends ExecutionContextExecutor {
override final def execute(runnable: Runnable): Unit = executor.execute(runnable)
override def reportFailure(t: Throwable): Unit = t match {
case e: LogEventException e.getCause.printStackTrace()
case _ t.printStackTrace()
}
}
}
/**
* Union interface since Java does not support union types
*/
trait ExecutionContextExecutor extends ExecutionContext with Executor
/**
* Union interface since Java does not support union types
*/
trait ExecutionContextExecutorService extends ExecutionContextExecutor with ExecutorService
/**
* An ExecutionContext is essentially the same thing as a java.util.concurrent.Executor
* This interface/trait exists to decouple the concept of execution from Actors & MessageDispatchers
* It is also needed to provide a fallback implicit default instance (in the companion object).
*/
trait ExecutionContext {
/**
* Submits the runnable for execution
*/
def execute(runnable: Runnable): Unit
/**
* Failed tasks should call reportFailure to let the ExecutionContext
* log the problem or whatever is appropriate for the implementation.
*/
def reportFailure(t: Throwable): Unit
}
/**
* INTERNAL API
*/
@ -289,13 +207,9 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
/**
* Detaches the specified actor instance from this dispatcher
*/
final def detach(actor: ActorCell): Unit = try {
unregister(actor)
} finally {
ifSensibleToDoSoThenScheduleShutdown()
}
final def detach(actor: ActorCell): Unit = try unregister(actor) finally ifSensibleToDoSoThenScheduleShutdown()
final def execute(runnable: Runnable): Unit = {
final override def execute(runnable: Runnable): Unit = {
val invocation = TaskInvocation(eventStream, runnable, taskCleanup)
addInhabitants(+1)
try {
@ -307,7 +221,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
}
}
def reportFailure(t: Throwable): Unit = t match {
override def reportFailure(t: Throwable): Unit = t match {
case e: LogEventException prerequisites.eventStream.publish(e.event)
case _ prerequisites.eventStream.publish(Error(t, getClass.getName, getClass, t.getMessage))
}

View file

@ -5,11 +5,13 @@
package akka.dispatch
import akka.event.Logging.Error
import java.util.concurrent.atomic.AtomicReference
import akka.actor.ActorCell
import scala.concurrent.util.Duration
import java.util.concurrent._
import akka.event.Logging
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ ExecutorService, RejectedExecutionException }
import scala.concurrent.forkjoin.ForkJoinPool
import scala.concurrent.util.Duration
import scala.concurrent.Awaitable
/**
* The event-based ``Dispatcher`` binds a set of Actors to a thread pool backed up by a
@ -79,6 +81,25 @@ class Dispatcher(
}
}
override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
scala.concurrent.impl.InternalFutureUtil.releaseFutureStack(this)
executorService.executor match {
case fj: ForkJoinPool
val result = new AtomicReference[Option[T]](None)
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
def block(): Boolean = {
result.set(Some(awaitable.result(atMost)(scala.concurrent.impl.InternalFutureUtil.canAwaitEvidence)))
true
}
def isReleasable = result.get.isDefined
})
result.get.get // Exception intended if None
case _
awaitable.result(atMost)(scala.concurrent.impl.InternalFutureUtil.canAwaitEvidence)
}
}
/**
* INTERNAL USE ONLY
*/

View file

@ -16,7 +16,6 @@ import java.util.{ LinkedList ⇒ JLinkedList }
import scala.annotation.tailrec
import scala.collection.mutable.Stack
import akka.util.BoxedType
import akka.dispatch.Await.CanAwait
import akka.util.NonFatal
import akka.event.Logging.{ LogEventException, Debug, Error }
import java.util.concurrent.TimeUnit.NANOSECONDS
@ -25,58 +24,9 @@ import java.util.concurrent.atomic.{ AtomicInteger }
import akka.pattern.AskTimeoutException
import scala.util.DynamicVariable
import scala.concurrent.util.Duration
import scala.concurrent.ExecutionContext
import scala.runtime.{ BoxedUnit, AbstractPartialFunction }
object Await {
/**
* Internal Akka use only
*/
sealed trait CanAwait
/**
* Classes that implement Awaitable can be used with Await,
* this is used to do blocking operations (blocking in the "pause this thread" sense)
*/
trait Awaitable[+T] {
/**
* Should throw [[java.util.concurrent.TimeoutException]] if times out
* This method should not be called directly.
*/
@throws(classOf[TimeoutException])
def ready(atMost: Duration)(implicit permit: CanAwait): this.type
/**
* Throws exceptions if cannot produce a T within the specified time
* This method should not be called directly.
*/
@throws(classOf[Exception])
def result(atMost: Duration)(implicit permit: CanAwait): T
}
private[this] implicit final val permit = new CanAwait {}
/**
* Blocks the current Thread to wait for the given awaitable to be ready.
* WARNING: Blocking operation, use with caution.
*
* @throws [[java.util.concurrent.TimeoutException]] if times out
* @return The returned value as returned by Awaitable.ready
*/
@throws(classOf[TimeoutException])
def ready[T <: Awaitable[_]](awaitable: T, atMost: Duration): T = awaitable.ready(atMost)
/**
* Blocks the current Thread to wait for the given awaitable to have a result.
* WARNING: Blocking operation, use with caution.
*
* @throws [[java.util.concurrent.TimeoutException]] if times out
* @throws [[java.lang.Throwable]] (throws clause is Exception due to Java) if there was a problem
* @return The returned value as returned by Awaitable.result
*/
@throws(classOf[Exception])
def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost)
}
import scala.concurrent.{ Awaitable, Await, CanAwait }
/**
* Futures is the Java API for Futures and Promises
@ -415,7 +365,7 @@ object Future {
* and [[akka.dispatch.Future]].`fallbackTo` are some methods to consider
* using when possible, to avoid concurrent callbacks.
*/
sealed trait Future[+T] extends Await.Awaitable[T] {
sealed trait Future[+T] extends Awaitable[T] {
protected implicit def executor: ExecutionContext

View file

@ -13,7 +13,7 @@ import scala.concurrent.util.duration._
import java.util.concurrent.atomic.AtomicInteger
import scala.util.control.NoStackTrace
import java.util.concurrent.TimeoutException
import akka.dispatch.Await
import scala.concurrent.Await
import annotation.implicitNotFound
/**

View file

@ -6,12 +6,14 @@ package akka.pattern
import java.util.concurrent.atomic.{ AtomicInteger, AtomicLong, AtomicBoolean }
import akka.AkkaException
import akka.actor.Scheduler
import akka.dispatch.{ Future, ExecutionContext, Await, Promise }
import akka.dispatch.{ Future, Promise }
import akka.util.{ NonFatal, Unsafe }
import scala.concurrent.ExecutionContext
import scala.concurrent.util.duration._
import scala.concurrent.util.{ Duration, Deadline }
import util.control.NoStackTrace
import java.util.concurrent.{ Callable, CopyOnWriteArrayList }
import scala.concurrent.{ Awaitable, Await, CanAwait }
/**
* Companion object providing factory methods for Circuit Breaker which runs callbacks in caller's thread
@ -22,9 +24,9 @@ object CircuitBreaker {
* Synchronous execution context to run in caller's thread - used by companion object factory methods
*/
private[CircuitBreaker] val syncExecutionContext = new ExecutionContext {
def execute(runnable: Runnable): Unit = runnable.run()
def reportFailure(t: Throwable): Unit = ()
override def execute(runnable: Runnable): Unit = runnable.run()
override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost)(scala.concurrent.impl.InternalFutureUtil.canAwaitEvidence)
override def reportFailure(t: Throwable): Unit = ()
}
/**

View file

@ -5,8 +5,9 @@ package akka.pattern
*/
import scala.concurrent.util.Duration
import scala.concurrent.ExecutionContext
import akka.actor._
import akka.dispatch.{ ExecutionContext, Promise, Future }
import akka.dispatch.{ Promise, Future }
trait FutureTimeoutSupport {
/**

View file

@ -4,7 +4,7 @@
package akka.pattern
import akka.actor.Scheduler
import akka.dispatch.ExecutionContext
import scala.concurrent.ExecutionContext
import java.util.concurrent.Callable
object Patterns {

View file

@ -0,0 +1,8 @@
package scala.concurrent.impl
import scala.concurrent.ExecutionContext
object InternalFutureUtil {
@inline final def releaseFutureStack(ec: ExecutionContext): Unit = Future.releaseStack(ec)
@inline final def canAwaitEvidence = scala.concurrent.Await.canAwaitEvidence
}

View file

@ -2,7 +2,7 @@ package akka.agent
import language.postfixOps
import akka.dispatch.Await
import scala.concurrent.Await
import scala.concurrent.util.Duration
import scala.concurrent.util.duration._
import akka.util.Timeout

View file

@ -17,7 +17,7 @@ import scala.concurrent.util.Duration
* The Camel endpoints are activated asynchronously. This trait can signal when an endpoint is activated or de-activated.
*/
trait Activation {
import akka.dispatch.Await
import scala.concurrent.Await
def system: ActorSystem //FIXME Why is this here, what's it needed for and who should use it?

View file

@ -14,7 +14,7 @@ import akka.util.Timeout
import TestSupport._
import org.scalatest.WordSpec
import akka.testkit.TestLatch
import akka.dispatch.Await
import scala.concurrent.Await
class ActivationIntegrationTest extends WordSpec with MustMatchers with SharedCamelSystem {
implicit val timeout = Timeout(10 seconds)

View file

@ -17,7 +17,7 @@ import org.apache.camel.builder.Builder
import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException }
import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException }
import akka.testkit.TestLatch
import akka.dispatch.Await
import scala.concurrent.Await
import akka.actor.Status.Failure
class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem {

View file

@ -9,7 +9,7 @@ import language.postfixOps
import org.apache.camel.{ Exchange, Processor }
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.component.mock.MockEndpoint
import akka.dispatch.Await
import scala.concurrent.Await
import akka.camel.TestSupport.SharedCamelSystem
import akka.actor.SupervisorStrategy.Stop
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll, WordSpec }

View file

@ -13,7 +13,7 @@ import org.apache.camel.component.mock.MockEndpoint
import akka.camel.TestSupport.SharedCamelSystem
import akka.actor.Props
import akka.pattern._
import akka.dispatch.Await
import scala.concurrent.Await
import scala.concurrent.util.duration._
import org.scalatest._
import matchers.MustMatchers

View file

@ -8,7 +8,7 @@ import akka.testkit.AkkaSpec
import scala.concurrent.util.duration._
import akka.testkit.TimingTest
import akka.testkit.TestLatch
import akka.dispatch.Await
import scala.concurrent.Await
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FixedRateTaskSpec extends AkkaSpec {

View file

@ -12,7 +12,7 @@ import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.dispatch.Await;
import scala.concurrent.Await;
import static akka.pattern.Patterns.ask;
import scala.concurrent.util.Duration;
import akka.testkit.AkkaSpec;

View file

@ -13,7 +13,7 @@ import akka.actor.Props;
import akka.dispatch.Future;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.Await;
import scala.concurrent.Await;
import scala.concurrent.util.Duration;
import akka.util.Timeout;
//#import-future
@ -34,7 +34,7 @@ import akka.actor.Terminated;
//#import-gracefulStop
import static akka.pattern.Patterns.gracefulStop;
import akka.dispatch.Future;
import akka.dispatch.Await;
import scala.concurrent.Await;
import scala.concurrent.util.Duration;
import akka.pattern.AskTimeoutException;
//#import-gracefulStop

View file

@ -40,8 +40,8 @@ import static akka.dispatch.Futures.reduce;
//#imports6
//#imports7
import akka.dispatch.ExecutionContexts;
import akka.dispatch.ExecutionContextExecutorService;
import scala.concurrent.ExecutionContexts;
import scala.concurrent.ExecutionContextExecutorService;
//#imports7

View file

@ -15,7 +15,7 @@ import akka.actor.*;
import akka.routing.*;
import scala.concurrent.util.Duration;
import akka.util.Timeout;
import akka.dispatch.Await;
import scala.concurrent.Await;
import akka.dispatch.Future;
import akka.dispatch.Dispatchers;
import akka.testkit.AkkaSpec;

View file

@ -14,7 +14,7 @@ import akka.actor.Props;
import scala.concurrent.util.Duration;
import akka.util.Timeout;
import akka.dispatch.Future;
import akka.dispatch.Await;
import scala.concurrent.Await;
//#parentActor
public class ParentActor extends UntypedActor {

View file

@ -9,7 +9,7 @@ import org.junit.Test;
//#imports
import akka.actor.*;
import akka.dispatch.Await;
import scala.concurrent.Await;
import static akka.pattern.Patterns.ask;
import akka.transactor.Coordinated;
import scala.concurrent.util.Duration;

View file

@ -20,7 +20,7 @@ import akka.testkit._
import akka.util._
import scala.concurrent.util.duration._
import akka.actor.Actor.Receive
import akka.dispatch.Await
import scala.concurrent.Await
//#my-actor
class MyActor extends Actor {
@ -327,7 +327,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
val actorRef = system.actorOf(Props[MyActor])
//#gracefulStop
import akka.pattern.gracefulStop
import akka.dispatch.Await
import scala.concurrent.Await
try {
val stopped: Future[Boolean] = gracefulStop(actorRef, 5 seconds)(system)

View file

@ -113,7 +113,7 @@ class AgentDocSpec extends AkkaSpec {
val agent = Agent(0)
//#read-future
import akka.dispatch.Await
import scala.concurrent.Await
implicit val timeout = Timeout(5 seconds)
val future = agent.future

View file

@ -61,7 +61,7 @@ class FutureDocSpec extends AkkaSpec {
val actor = system.actorOf(Props[MyActor])
val msg = "hello"
//#ask-blocking
import akka.dispatch.Await
import scala.concurrent.Await
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.util.duration._
@ -88,7 +88,7 @@ class FutureDocSpec extends AkkaSpec {
"demonstrate usage of simple future eval" in {
//#future-eval
import akka.dispatch.Await
import scala.concurrent.Await
import akka.dispatch.Future
import scala.concurrent.util.duration._
@ -184,7 +184,7 @@ class FutureDocSpec extends AkkaSpec {
val msg1 = 1
val msg2 = 2
implicit val timeout = Timeout(5 seconds)
import akka.dispatch.Await
import scala.concurrent.Await
import akka.pattern.ask
//#composing-wrong
@ -208,7 +208,7 @@ class FutureDocSpec extends AkkaSpec {
val msg1 = 1
val msg2 = 2
implicit val timeout = Timeout(5 seconds)
import akka.dispatch.Await
import scala.concurrent.Await
import akka.pattern.ask
//#composing

View file

@ -10,7 +10,7 @@ import annotation.tailrec
import akka.actor.{ Props, Actor }
import scala.concurrent.util.duration._
import akka.util.Timeout
import akka.dispatch.Await
import scala.concurrent.Await
import akka.pattern.ask
import akka.routing.SmallestMailboxRouter

View file

@ -122,7 +122,7 @@ class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
//#test-behavior
import akka.testkit.TestActorRef
import scala.concurrent.util.duration._
import akka.dispatch.Await
import scala.concurrent.Await
import akka.pattern.ask
val actorRef = TestActorRef(new MyActor)

View file

@ -140,7 +140,7 @@ class TransactorDocSpec extends AkkaSpec {
import CoordinatedExample._
//#run-coordinated-example
import akka.dispatch.Await
import scala.concurrent.Await
import scala.concurrent.util.duration._
import akka.util.Timeout
import akka.pattern.ask

View file

@ -12,7 +12,7 @@ import org.jboss.netty.channel.{ Channel, SimpleChannelUpstreamHandler, ChannelH
import com.typesafe.config.ConfigFactory
import scala.concurrent.util.duration._
import akka.pattern.ask
import akka.dispatch.Await
import scala.concurrent.Await
import akka.event.{ LoggingAdapter, Logging }
import scala.util.control.NoStackTrace
import akka.event.LoggingReceive

View file

@ -8,7 +8,7 @@ import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.PoisonPill
import akka.actor.Address
import akka.dispatch.Await
import scala.concurrent.Await
import akka.pattern.ask
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec

View file

@ -8,7 +8,7 @@ import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.PoisonPill
import akka.actor.Address
import akka.dispatch.Await
import scala.concurrent.Await
import akka.pattern.ask
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec

View file

@ -6,7 +6,7 @@ package akka.remote.router
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.dispatch.Await
import scala.concurrent.Await
import akka.pattern.ask
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec

View file

@ -6,8 +6,8 @@ package akka.remote.testconductor
import com.typesafe.config.ConfigFactory
import akka.actor.Props
import akka.actor.Actor
import akka.dispatch.Await
import akka.dispatch.Await.Awaitable
import scala.concurrent.Await
import scala.concurrent.Await.Awaitable
import scala.concurrent.util.Duration
import scala.concurrent.util.duration._
import akka.testkit.ImplicitSender

View file

@ -10,8 +10,8 @@ import java.net.InetSocketAddress
import com.typesafe.config.{ ConfigObject, ConfigFactory, Config }
import akka.actor.{ RootActorPath, ActorPath, ActorSystem, ExtendedActorSystem }
import akka.dispatch.Await
import akka.dispatch.Await.Awaitable
import scala.concurrent.Await
import scala.concurrent.Await.Awaitable
import akka.util.{ Timeout, NonFatal }
import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName }
import akka.testkit.AkkaSpec

View file

@ -15,6 +15,7 @@ import akka.dispatch.{ MailboxType, TaskInvocation, SystemMessage, Suspend, Resu
import scala.concurrent.util.duration.intToDurationInt
import akka.util.{ Switch, NonFatal }
import scala.concurrent.util.Duration
import scala.concurrent.Awaitable
import akka.actor.ActorContext
import akka.dispatch.MessageQueue
@ -207,6 +208,8 @@ class CallingThreadDispatcher(
protected[akka] override def executeTask(invocation: TaskInvocation) { invocation.run }
override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost)(scala.concurrent.impl.InternalFutureUtil.canAwaitEvidence)
/*
* This method must be called with this thread's queue, which must already
* have been entered (active). When this method returns, the queue will be

View file

@ -7,6 +7,7 @@ package akka.testkit
import akka.actor._
import java.util.concurrent.atomic.AtomicLong
import akka.dispatch._
import scala.concurrent.Await
import akka.pattern.ask
/**

View file

@ -6,7 +6,7 @@ package akka.testkit
import scala.concurrent.util.Duration
import akka.actor.ActorSystem
import akka.dispatch.Await.{ CanAwait, Awaitable }
import scala.concurrent.{ Await, CanAwait, Awaitable }
import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit }
/**

View file

@ -7,17 +7,13 @@ import language.{ postfixOps, reflectiveCalls }
import org.scalatest.{ WordSpec, BeforeAndAfterAll, Tag }
import org.scalatest.matchers.MustMatchers
import akka.actor.ActorSystem
import akka.actor.{ Actor, ActorRef, Props }
import akka.actor.{ Actor, ActorRef, Props, ActorSystem, PoisonPill, DeadLetter }
import akka.event.{ Logging, LoggingAdapter }
import scala.concurrent.util.duration._
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.actor.PoisonPill
import akka.actor.DeadLetter
import scala.concurrent.Await
import com.typesafe.config.{ Config, ConfigFactory }
import java.util.concurrent.TimeoutException
import akka.dispatch.{ Await, MessageDispatcher }
import akka.dispatch.Dispatchers
import akka.dispatch.{ MessageDispatcher, Dispatchers }
import akka.pattern.ask
object TimingTest extends Tag("timing")

View file

@ -9,7 +9,8 @@ import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterEach, WordSpec }
import akka.actor._
import akka.event.Logging.Warning
import akka.dispatch.{ Future, Promise, Await }
import akka.dispatch.{ Future, Promise }
import scala.concurrent.Await
import scala.concurrent.util.duration._
import akka.actor.ActorSystem
import akka.pattern.ask

View file

@ -6,8 +6,9 @@ import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterEach, WordSpec }
import akka.actor._
import scala.concurrent.Await
import scala.concurrent.util.duration._
import akka.dispatch.{ Await, Future }
import akka.dispatch.Future
import akka.pattern.ask
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])

View file

@ -16,7 +16,7 @@ import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.dispatch.Await;
import scala.concurrent.Await;
import akka.dispatch.Future;
import static akka.pattern.Patterns.ask;
import akka.testkit.AkkaSpec;

View file

@ -16,7 +16,7 @@ import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.dispatch.Await;
import scala.concurrent.Await;
import akka.dispatch.Future;
import static akka.pattern.Patterns.ask;
import akka.testkit.AkkaSpec;

View file

@ -7,7 +7,7 @@ package akka.transactor
import org.scalatest.BeforeAndAfterAll
import akka.actor._
import akka.dispatch.Await
import scala.concurrent.Await
import scala.concurrent.util.duration._
import akka.util.Timeout
import akka.testkit._

View file

@ -9,7 +9,7 @@ import language.postfixOps
import org.scalatest.BeforeAndAfterAll
import akka.actor._
import akka.dispatch.Await
import scala.concurrent.Await
import scala.concurrent.util.duration._
import akka.testkit._
import akka.testkit.TestEvent.Mute

View file

@ -7,7 +7,7 @@ package akka.transactor
import language.postfixOps
import akka.actor._
import akka.dispatch.Await
import scala.concurrent.Await
import scala.concurrent.util.duration._
import akka.util.Timeout
import akka.testkit._