Merge branch 'master' of github.com:jboner/akka
This commit is contained in:
commit
06fe2d590e
2 changed files with 148 additions and 221 deletions
|
|
@ -21,7 +21,9 @@ import java.util.{ LinkedList ⇒ JLinkedList }
|
|||
import scala.annotation.tailrec
|
||||
import scala.collection.mutable.Stack
|
||||
import akka.util.{ Switch, Duration, BoxedType }
|
||||
import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean }
|
||||
|
||||
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
|
||||
import scala.Math
|
||||
|
||||
class FutureTimeoutException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
|
||||
|
||||
|
|
@ -204,7 +206,7 @@ object Futures {
|
|||
|
||||
/**
|
||||
* Java API.
|
||||
* Transforms a java.lang.Iterable[A] into a Future[java.lang.Iterable[B]] using the provided Function A => Future[B].
|
||||
* Transforms a java.lang.Iterable[A] into a Future[java.lang.Iterable[B]] using the provided Function A ⇒ Future[B].
|
||||
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
|
||||
* in parallel.
|
||||
*/
|
||||
|
|
@ -219,15 +221,9 @@ object Futures {
|
|||
|
||||
/**
|
||||
* Java API.
|
||||
* Transforms a java.lang.Iterable[A] into a Future[java.lang.Iterable[B]] using the provided Function A => Future[B].
|
||||
* Transforms a java.lang.Iterable[A] into a Future[java.lang.Iterable[B]] using the provided Function A ⇒ Future[B].
|
||||
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
|
||||
* in parallel.
|
||||
*
|
||||
* def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] =
|
||||
* in.foldLeft(new DefaultPromise[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) =>
|
||||
* val fb = fn(a.asInstanceOf[A])
|
||||
* for (r <- fr; b <-fb) yield (r += b)
|
||||
* }.map(_.result)
|
||||
*/
|
||||
def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JIterable[B]] = traverse(in, Timeout.default, fn)
|
||||
}
|
||||
|
|
@ -275,11 +271,11 @@ object Future {
|
|||
sequence(in)(cbf, timeout)
|
||||
|
||||
/**
|
||||
* Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A => Future[B].
|
||||
* Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A ⇒ Future[B].
|
||||
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
|
||||
* in parallel:
|
||||
* <pre>
|
||||
* val myFutureList = Futures.traverse(myList)(x => Future(myFunc(x)))
|
||||
* val myFutureList = Futures.traverse(myList)(x ⇒ Future(myFunc(x)))
|
||||
* </pre>
|
||||
*/
|
||||
def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], timeout: Timeout): Future[M[B]] =
|
||||
|
|
@ -379,23 +375,6 @@ sealed trait Future[+T] extends japi.Future[T] {
|
|||
*/
|
||||
def await(atMost: Duration): Future[T]
|
||||
|
||||
/**
|
||||
* Await completion of this Future (as `await`) and return its value if it
|
||||
* conforms to A's erased type.
|
||||
*
|
||||
* def as[A](implicit m: Manifest[A]): Option[A] =
|
||||
* try {
|
||||
* await
|
||||
* value match {
|
||||
* case None ⇒ None
|
||||
* case Some(_: Left[_, _]) ⇒ None
|
||||
* case Some(Right(v)) ⇒ Some(BoxedType(m.erasure).cast(v).asInstanceOf[A])
|
||||
* }
|
||||
* } catch {
|
||||
* case _: Exception ⇒ None
|
||||
* }
|
||||
*/
|
||||
|
||||
/**
|
||||
* Tests whether this Future has been completed.
|
||||
*/
|
||||
|
|
@ -427,19 +406,17 @@ sealed trait Future[+T] extends japi.Future[T] {
|
|||
/**
|
||||
* Returns the successful result of this Future if it exists.
|
||||
*/
|
||||
final def result: Option[T] = {
|
||||
val v = value
|
||||
if (v.isDefined) v.get.right.toOption
|
||||
else None
|
||||
final def result: Option[T] = value match {
|
||||
case Some(Right(r)) ⇒ Some(r)
|
||||
case _ ⇒ None
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the contained exception of this Future if it exists.
|
||||
*/
|
||||
final def exception: Option[Throwable] = {
|
||||
val v = value
|
||||
if (v.isDefined) v.get.left.toOption
|
||||
else None
|
||||
final def exception: Option[Throwable] = value match {
|
||||
case Some(Left(e)) ⇒ Some(e)
|
||||
case _ ⇒ None
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -453,17 +430,16 @@ sealed trait Future[+T] extends japi.Future[T] {
|
|||
* When the future is completed with a valid result, apply the provided
|
||||
* PartialFunction to the result.
|
||||
* <pre>
|
||||
* val result = future onResult {
|
||||
* case Foo => "foo"
|
||||
* case Bar => "bar"
|
||||
* future onResult {
|
||||
* case Foo ⇒ target ! "foo"
|
||||
* case Bar ⇒ target ! "bar"
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
final def onResult(pf: PartialFunction[T, Unit]): this.type = onComplete { f ⇒
|
||||
val optr = f.result
|
||||
if (optr.isDefined) {
|
||||
val r = optr.get
|
||||
if (pf isDefinedAt r) pf(r)
|
||||
final def onResult(pf: PartialFunction[T, Unit]): this.type = onComplete {
|
||||
_.value match {
|
||||
case Some(Right(r)) if pf isDefinedAt r ⇒ pf(r)
|
||||
case _ ⇒
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -471,17 +447,15 @@ sealed trait Future[+T] extends japi.Future[T] {
|
|||
* When the future is completed with an exception, apply the provided
|
||||
* PartialFunction to the exception.
|
||||
* <pre>
|
||||
* val result = future onException {
|
||||
* case Foo => "foo"
|
||||
* case Bar => "bar"
|
||||
* future onException {
|
||||
* case NumberFormatException ⇒ target ! "wrong format"
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
final def onException(pf: PartialFunction[Throwable, Unit]): Future[T] = onComplete { f ⇒
|
||||
val opte = f.exception
|
||||
if (opte.isDefined) {
|
||||
val e = opte.get
|
||||
if (pf isDefinedAt e) pf(e)
|
||||
final def onException(pf: PartialFunction[Throwable, Unit]): this.type = onComplete {
|
||||
_.value match {
|
||||
case Some(Left(ex)) if pf isDefinedAt ex ⇒ pf(ex)
|
||||
case _ ⇒
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -489,49 +463,23 @@ sealed trait Future[+T] extends japi.Future[T] {
|
|||
|
||||
def orElse[A >: T](fallback: ⇒ A): Future[A]
|
||||
|
||||
/**
|
||||
* Creates a new Future by applying a PartialFunction to the successful
|
||||
* result of this Future if a match is found, or else return a MatchError.
|
||||
* If this Future is completed with an exception then the new Future will
|
||||
* also contain this exception.
|
||||
* Example:
|
||||
* <pre>
|
||||
* val future1 = for {
|
||||
* a <- actor ? Req("Hello") collect { case Res(x: Int) => x }
|
||||
* b <- actor ? Req(a) collect { case Res(x: String) => x }
|
||||
* c <- actor ? Req(7) collect { case Res(x: String) => x }
|
||||
* } yield b + "-" + c
|
||||
* </pre>
|
||||
*/
|
||||
@deprecated("No longer needed, use 'map' instead. Removed in 2.0", "2.0")
|
||||
final def collect[A](pf: PartialFunction[T, A])(implicit timeout: Timeout): Future[A] = this map pf
|
||||
|
||||
/**
|
||||
* Creates a new Future that will handle any matching Throwable that this
|
||||
* Future might contain. If there is no match, or if this Future contains
|
||||
* a valid result then the new Future will contain the same.
|
||||
* Example:
|
||||
* <pre>
|
||||
* Future(6 / 0) recover { case e: ArithmeticException => 0 } // result: 0
|
||||
* Future(6 / 0) recover { case e: NotFoundException => 0 } // result: exception
|
||||
* Future(6 / 2) recover { case e: ArithmeticException => 0 } // result: 3
|
||||
* Future(6 / 0) failure { case e: ArithmeticException ⇒ 0 } // result: 0
|
||||
* Future(6 / 0) failure { case e: NotFoundException ⇒ 0 } // result: exception
|
||||
* Future(6 / 2) failure { case e: ArithmeticException ⇒ 0 } // result: 3
|
||||
* </pre>
|
||||
*/
|
||||
final def recover[A >: T](pf: PartialFunction[Throwable, A])(implicit timeout: Timeout): Future[A] = {
|
||||
val future = new DefaultPromise[A](timeout)
|
||||
onComplete { self ⇒
|
||||
future complete {
|
||||
self.value.get match {
|
||||
case Left(e) ⇒
|
||||
try {
|
||||
if (pf isDefinedAt e) Right(pf(e))
|
||||
else Left(e)
|
||||
} catch {
|
||||
case x: Exception ⇒
|
||||
Left(x)
|
||||
}
|
||||
case v ⇒ v
|
||||
}
|
||||
onComplete {
|
||||
_.value.get match {
|
||||
case Left(e) if pf isDefinedAt e ⇒ future.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) })
|
||||
case otherwise ⇒ future complete otherwise
|
||||
}
|
||||
}
|
||||
future
|
||||
|
|
@ -552,19 +500,17 @@ sealed trait Future[+T] extends japi.Future[T] {
|
|||
*/
|
||||
final def map[A](f: T ⇒ A)(implicit timeout: Timeout): Future[A] = {
|
||||
val future = new DefaultPromise[A](timeout)
|
||||
onComplete { self ⇒
|
||||
future complete {
|
||||
self.value.get match {
|
||||
case Right(r) ⇒
|
||||
try {
|
||||
Right(f(r))
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
EventHandler.error(e, this, e.getMessage)
|
||||
Left(e)
|
||||
}
|
||||
case v ⇒ v.asInstanceOf[Either[Throwable, A]]
|
||||
}
|
||||
onComplete {
|
||||
_.value.get match {
|
||||
case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]]
|
||||
case Right(res) ⇒
|
||||
future complete (try {
|
||||
Right(f(res))
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
EventHandler.error(e, this, e.getMessage)
|
||||
Left(e)
|
||||
})
|
||||
}
|
||||
}
|
||||
future
|
||||
|
|
@ -606,26 +552,26 @@ sealed trait Future[+T] extends japi.Future[T] {
|
|||
*/
|
||||
final def flatMap[A](f: T ⇒ Future[A])(implicit timeout: Timeout): Future[A] = {
|
||||
val future = new DefaultPromise[A](timeout)
|
||||
|
||||
onComplete {
|
||||
_.value.get match {
|
||||
case Right(r) ⇒
|
||||
try {
|
||||
future completeWith f(r)
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
EventHandler.error(e, this, e.getMessage)
|
||||
future complete Left(e)
|
||||
}
|
||||
case v ⇒ future complete v.asInstanceOf[Either[Throwable, A]]
|
||||
case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]]
|
||||
case Right(r) ⇒ try {
|
||||
future.completeWith(f(r))
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
EventHandler.error(e, this, e.getMessage)
|
||||
future complete Left(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
future
|
||||
}
|
||||
|
||||
final def foreach(f: T ⇒ Unit): Unit = onComplete {
|
||||
_.result match {
|
||||
case Some(v) ⇒ f(v)
|
||||
case None ⇒
|
||||
_.value.get match {
|
||||
case Right(r) ⇒ f(r)
|
||||
case _ ⇒
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -640,22 +586,16 @@ sealed trait Future[+T] extends japi.Future[T] {
|
|||
|
||||
final def filter(p: T ⇒ Boolean)(implicit timeout: Timeout): Future[T] = {
|
||||
val future = new DefaultPromise[T](timeout)
|
||||
onComplete { self ⇒
|
||||
future complete {
|
||||
self.value.get match {
|
||||
case Right(r) ⇒
|
||||
try {
|
||||
if (p(r))
|
||||
Right(r)
|
||||
else
|
||||
Left(new MatchError(r))
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
EventHandler.error(e, this, e.getMessage)
|
||||
Left(e)
|
||||
}
|
||||
case v ⇒ v
|
||||
}
|
||||
onComplete {
|
||||
_.value.get match {
|
||||
case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, T]]
|
||||
case r @ Right(res) ⇒ future complete (try {
|
||||
if (p(res)) r else Left(new MatchError(res))
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
EventHandler.error(e, this, e.getMessage)
|
||||
Left(e)
|
||||
})
|
||||
}
|
||||
}
|
||||
future
|
||||
|
|
@ -664,13 +604,10 @@ sealed trait Future[+T] extends japi.Future[T] {
|
|||
/**
|
||||
* Returns the current result, throws the exception if one has been raised, else returns None
|
||||
*/
|
||||
final def resultOrException: Option[T] = {
|
||||
val v = value
|
||||
if (v.isDefined) {
|
||||
val r = v.get
|
||||
if (r.isLeft) throw r.left.get
|
||||
else r.right.toOption
|
||||
} else None
|
||||
final def resultOrException: Option[T] = value match {
|
||||
case Some(Left(e)) ⇒ throw e
|
||||
case Some(Right(r)) ⇒ Some(r)
|
||||
case _ ⇒ None
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -771,6 +708,17 @@ trait Promise[T] extends Future[T] {
|
|||
|
||||
}
|
||||
|
||||
//Companion object to FState, just to provide a cheap, immutable default entry
|
||||
private[akka] object FState {
|
||||
val empty = new FState[Nothing]()
|
||||
def apply[T](): FState[T] = empty.asInstanceOf[FState[T]]
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents the internal state of the DefaultCompletableFuture
|
||||
*/
|
||||
private[akka] case class FState[T](value: Option[Either[Throwable, T]] = None, listeners: List[Future[T] ⇒ Unit] = Nil)
|
||||
|
||||
/**
|
||||
* The default concrete Future implementation.
|
||||
*/
|
||||
|
|
@ -784,78 +732,62 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
def this(timeout: Long, timeunit: TimeUnit)(implicit dispatcher: MessageDispatcher) = this(Timeout(timeout, timeunit))
|
||||
|
||||
private val _startTimeInNanos = currentTimeInNanos
|
||||
private val _lock = new ReentrantLock
|
||||
private val _signal = _lock.newCondition
|
||||
private var _value: Option[Either[Throwable, T]] = None
|
||||
private var _listeners: List[Future[T] ⇒ Unit] = Nil
|
||||
private val ref = new AtomicReference[FState[T]](FState())
|
||||
|
||||
/**
|
||||
* Must be called inside _lock.lock<->_lock.unlock
|
||||
* Returns true if completed within the timeout
|
||||
*/
|
||||
@tailrec
|
||||
private def awaitUnsafe(waitTimeNanos: Long): Boolean = {
|
||||
if (_value.isEmpty && waitTimeNanos > 0) {
|
||||
if (value.isEmpty && waitTimeNanos > 0) {
|
||||
val ms = NANOS.toMillis(waitTimeNanos)
|
||||
val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec
|
||||
val start = currentTimeInNanos
|
||||
val remainingNanos = try {
|
||||
_signal.awaitNanos(waitTimeNanos)
|
||||
} catch {
|
||||
case e: InterruptedException ⇒
|
||||
waitTimeNanos - (currentTimeInNanos - start)
|
||||
}
|
||||
awaitUnsafe(remainingNanos)
|
||||
try { ref.synchronized { if (value.isEmpty) ref.wait(ms, ns) } } catch { case e: InterruptedException ⇒ }
|
||||
|
||||
awaitUnsafe(waitTimeNanos - (currentTimeInNanos - start))
|
||||
} else {
|
||||
_value.isDefined
|
||||
value.isDefined
|
||||
}
|
||||
}
|
||||
|
||||
def await(atMost: Duration) = {
|
||||
_lock.lock()
|
||||
try {
|
||||
if (!atMost.isFinite && !timeout.duration.isFinite) { //If wait until infinity
|
||||
while (_value.isEmpty) { _signal.await }
|
||||
this
|
||||
} else { //Limited wait
|
||||
val time = if (!atMost.isFinite) timeLeft() //If atMost is infinity, use preset timeout
|
||||
else if (!timeout.duration.isFinite) atMost.toNanos //If preset timeout is infinite, use atMost
|
||||
else atMost.toNanos min timeLeft() //Otherwise use the smallest of them
|
||||
if (awaitUnsafe(time)) this
|
||||
else throw new FutureTimeoutException("Future timed out after [" + NANOS.toMillis(time) + "] ms")
|
||||
}
|
||||
} finally { _lock.unlock }
|
||||
def await(atMost: Duration): this.type = {
|
||||
val waitNanos =
|
||||
if (timeout.duration.isFinite && atMost.isFinite)
|
||||
atMost.toNanos min timeLeft()
|
||||
else if (atMost.isFinite)
|
||||
atMost.toNanos
|
||||
else if (timeout.duration.isFinite)
|
||||
timeLeft()
|
||||
else Long.MaxValue //If both are infinite, use Long.MaxValue
|
||||
|
||||
if (awaitUnsafe(waitNanos)) this
|
||||
else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(waitNanos) + "] milliseconds")
|
||||
}
|
||||
|
||||
def await = await(timeout.duration)
|
||||
|
||||
def isExpired: Boolean = if (timeout.duration.isFinite) timeLeft() <= 0 else false
|
||||
|
||||
def value: Option[Either[Throwable, T]] = {
|
||||
_lock.lock
|
||||
try {
|
||||
_value
|
||||
} finally {
|
||||
_lock.unlock
|
||||
}
|
||||
}
|
||||
def value: Option[Either[Throwable, T]] = ref.get.value
|
||||
|
||||
def complete(value: Either[Throwable, T]): this.type = {
|
||||
val callbacks = {
|
||||
_lock.lock
|
||||
try {
|
||||
if (_value.isEmpty) { //Only complete if we aren't expired
|
||||
if (!isExpired) {
|
||||
_value = Some(value)
|
||||
val existingListeners = _listeners
|
||||
_listeners = Nil
|
||||
existingListeners
|
||||
} else {
|
||||
_listeners = Nil
|
||||
@tailrec
|
||||
def tryComplete: List[Future[T] ⇒ Unit] = {
|
||||
val cur = ref.get
|
||||
if (cur.value.isDefined) Nil
|
||||
else if ( /*cur.value.isEmpty && */ isExpired) {
|
||||
//Empty and expired, so remove listeners
|
||||
//TODO Perhaps cancel existing onTimeout listeners in the future here?
|
||||
ref.compareAndSet(cur, FState()) //Try to reset the state to the default, doesn't matter if it fails
|
||||
Nil
|
||||
} else {
|
||||
if (ref.compareAndSet(cur, FState(Option(value), Nil))) cur.listeners
|
||||
else tryComplete
|
||||
}
|
||||
} else Nil
|
||||
}
|
||||
tryComplete
|
||||
} finally {
|
||||
_signal.signalAll
|
||||
_lock.unlock
|
||||
ref.synchronized { ref.notifyAll() } //Notify any evil blockers
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -865,44 +797,39 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
}
|
||||
|
||||
def onComplete(func: Future[T] ⇒ Unit): this.type = {
|
||||
_lock.lock
|
||||
val notifyNow = try {
|
||||
if (_value.isEmpty) {
|
||||
if (!isExpired) { //Only add the listener if the future isn't expired
|
||||
_listeners ::= func
|
||||
false
|
||||
} else false //Will never run the callback since the future is expired
|
||||
} else true
|
||||
} finally {
|
||||
_lock.unlock
|
||||
@tailrec //Returns whether the future has already been completed or not
|
||||
def tryAddCallback(): Boolean = {
|
||||
val cur = ref.get
|
||||
if (cur.value.isDefined) true
|
||||
else if (isExpired) false
|
||||
else if (ref.compareAndSet(cur, cur.copy(listeners = func :: cur.listeners))) false
|
||||
else tryAddCallback()
|
||||
}
|
||||
|
||||
if (notifyNow) Future.dispatchTask(() ⇒ notifyCompleted(func))
|
||||
if (tryAddCallback()) Future.dispatchTask(() ⇒ notifyCompleted(func))
|
||||
|
||||
this
|
||||
}
|
||||
|
||||
def onTimeout(func: Future[T] ⇒ Unit): this.type = {
|
||||
if (timeout.duration.isFinite) {
|
||||
_lock.lock
|
||||
val runNow = try {
|
||||
if (_value.isEmpty) {
|
||||
if (!isExpired) {
|
||||
val runnable = new Runnable {
|
||||
def run() {
|
||||
if (!isCompleted) func(self)
|
||||
val runNow =
|
||||
if (!timeout.duration.isFinite) false //Not possible
|
||||
else if (value.isEmpty) {
|
||||
if (!isExpired) {
|
||||
val runnable = new Runnable {
|
||||
def run() {
|
||||
if (!isCompleted) {
|
||||
if (!isExpired) Scheduler.scheduleOnce(this, timeLeft, NANOS)
|
||||
else func(DefaultPromise.this)
|
||||
}
|
||||
}
|
||||
Scheduler.scheduleOnce(runnable, timeLeft, NANOS)
|
||||
false
|
||||
} else true
|
||||
} else false
|
||||
} finally {
|
||||
_lock.unlock
|
||||
}
|
||||
}
|
||||
Scheduler.scheduleOnce(runnable, timeLeft, NANOS)
|
||||
false
|
||||
} else true
|
||||
} else false
|
||||
|
||||
if (runNow) Future.dispatchTask(() ⇒ notifyCompleted(func))
|
||||
}
|
||||
if (runNow) Future.dispatchTask(() ⇒ notifyCompleted(func))
|
||||
|
||||
this
|
||||
}
|
||||
|
|
@ -913,11 +840,14 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
case Some(_) ⇒ this
|
||||
case _ if isExpired ⇒ Future[A](fallback)
|
||||
case _ ⇒
|
||||
val promise = new DefaultPromise[A](Timeout.never)
|
||||
val promise = new DefaultPromise[A](Timeout.never) //TODO FIXME We can't have infinite timeout here, doesn't make sense.
|
||||
promise completeWith this
|
||||
val runnable = new Runnable {
|
||||
def run() {
|
||||
if (!isCompleted) promise complete (try { Right(fallback) } catch { case e: Exception ⇒ Left(e) })
|
||||
if (!isCompleted) {
|
||||
if (!isExpired) Scheduler.scheduleOnce(this, timeLeft, NANOS)
|
||||
else promise complete (try { Right(fallback) } catch { case e: Exception ⇒ Left(e) })
|
||||
}
|
||||
}
|
||||
}
|
||||
Scheduler.scheduleOnce(runnable, timeLeft, NANOS)
|
||||
|
|
@ -926,15 +856,11 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
} else this
|
||||
|
||||
private def notifyCompleted(func: Future[T] ⇒ Unit) {
|
||||
try {
|
||||
func(this)
|
||||
} catch {
|
||||
case e ⇒ EventHandler.error(e, this, "Future onComplete-callback raised an exception")
|
||||
}
|
||||
try { func(this) } catch { case e ⇒ EventHandler notify EventHandler.Error(e, this, "Future onComplete-callback raised an exception") } //TODO catch, everything? Really?
|
||||
}
|
||||
|
||||
@inline
|
||||
private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis)
|
||||
private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis) //TODO Switch to math.abs(System.nanoTime)?
|
||||
@inline
|
||||
private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -298,7 +298,8 @@ object AkkaBuild extends Build {
|
|||
resolvers += "Twitter Public Repo" at "http://maven.twttr.com", // This will be going away with com.mongodb.async's next release
|
||||
|
||||
// compile options
|
||||
scalacOptions ++= Seq("-encoding", "UTF-8", "-optimise", "-deprecation", "-unchecked"),
|
||||
scalacOptions ++= Seq("-encoding", "UTF-8", "-deprecation", "-unchecked") ++ (
|
||||
if (System getProperty "java.runtime.version" startsWith "1.7") Seq() else Seq("-optimize")), // -optimize fails with jdk7
|
||||
javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"),
|
||||
|
||||
// add config dir to classpaths
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue