Merge branch 'master' into wip-remote-connection-failover
This commit is contained in:
commit
2dea305136
53 changed files with 238 additions and 266 deletions
|
|
@ -64,45 +64,13 @@ object Futures {
|
|||
def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] =
|
||||
Future(body.call)(dispatcher, timeout)
|
||||
|
||||
/**
|
||||
* Returns a Future to the result of the first future in the list that is completed
|
||||
*/
|
||||
def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.never): Future[T] = {
|
||||
val futureResult = new DefaultPromise[T](timeout)
|
||||
|
||||
val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _)
|
||||
futures.foreach(_ onComplete completeFirst)
|
||||
|
||||
futureResult
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
|
||||
*/
|
||||
def find[T](predicate: T ⇒ Boolean, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]]): Future[Option[T]] = {
|
||||
if (futures.isEmpty) new KeptPromise[Option[T]](Right(None))
|
||||
else {
|
||||
val result = new DefaultPromise[Option[T]](timeout)
|
||||
val ref = new AtomicInteger(futures.size)
|
||||
val search: Future[T] ⇒ Unit = f ⇒ try {
|
||||
f.result.filter(predicate).foreach(r ⇒ result completeWithResult Some(r))
|
||||
} finally {
|
||||
if (ref.decrementAndGet == 0)
|
||||
result completeWithResult None
|
||||
}
|
||||
futures.foreach(_ onComplete search)
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API.
|
||||
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
|
||||
*/
|
||||
def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], timeout: Timeout): Future[JOption[T]] = {
|
||||
val pred: T ⇒ Boolean = predicate.apply(_)
|
||||
find[T](pred, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures)).map(JOption.fromScalaOption(_))
|
||||
Future.find[T](pred, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures)).map(JOption.fromScalaOption(_))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -110,59 +78,7 @@ object Futures {
|
|||
* Returns a Future to the result of the first future in the list that is completed
|
||||
*/
|
||||
def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], timeout: Timeout): Future[T] =
|
||||
firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)
|
||||
|
||||
/**
|
||||
* A non-blocking fold over the specified futures.
|
||||
* The fold is performed on the thread where the last future is completed,
|
||||
* the result will be the first failure of any of the futures, or any failure in the actual fold,
|
||||
* or the result of the fold.
|
||||
* Example:
|
||||
* <pre>
|
||||
* val result = Futures.fold(0)(futures)(_ + _).await.result
|
||||
* </pre>
|
||||
*/
|
||||
def fold[T, R](zero: R, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]])(foldFun: (R, T) ⇒ R): Future[R] = {
|
||||
if (futures.isEmpty) {
|
||||
new KeptPromise[R](Right(zero))
|
||||
} else {
|
||||
val result = new DefaultPromise[R](timeout)
|
||||
val results = new ConcurrentLinkedQueue[T]()
|
||||
val done = new Switch(false)
|
||||
val allDone = futures.size
|
||||
|
||||
val aggregate: Future[T] ⇒ Unit = f ⇒ if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature?
|
||||
f.value.get match {
|
||||
case Right(value) ⇒
|
||||
val added = results add value
|
||||
if (added && results.size == allDone) { //Only one thread can get here
|
||||
if (done.switchOn) {
|
||||
try {
|
||||
val i = results.iterator
|
||||
var currentValue = zero
|
||||
while (i.hasNext) { currentValue = foldFun(currentValue, i.next) }
|
||||
result completeWithResult currentValue
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
EventHandler.error(e, this, e.getMessage)
|
||||
result completeWithException e
|
||||
} finally {
|
||||
results.clear
|
||||
}
|
||||
}
|
||||
}
|
||||
case Left(exception) ⇒
|
||||
if (done.switchOn) {
|
||||
result completeWithException exception
|
||||
results.clear
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
futures foreach { _ onComplete aggregate }
|
||||
result
|
||||
}
|
||||
}
|
||||
Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)
|
||||
|
||||
/**
|
||||
* Java API
|
||||
|
|
@ -172,50 +88,24 @@ object Futures {
|
|||
* or the result of the fold.
|
||||
*/
|
||||
def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Timeout, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] =
|
||||
fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)
|
||||
Future.fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)
|
||||
|
||||
def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout: Timeout, futures, fun)
|
||||
|
||||
def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, Timeout.default, futures, fun)
|
||||
|
||||
/**
|
||||
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
|
||||
* Example:
|
||||
* <pre>
|
||||
* val result = Futures.reduce(futures)(_ + _).await.result
|
||||
* </pre>
|
||||
*/
|
||||
def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.default)(op: (R, T) ⇒ T): Future[R] = {
|
||||
if (futures.isEmpty)
|
||||
new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left")))
|
||||
else {
|
||||
val result = new DefaultPromise[R](timeout)
|
||||
val seedFound = new AtomicBoolean(false)
|
||||
val seedFold: Future[T] ⇒ Unit = f ⇒ {
|
||||
if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
|
||||
f.value.get match {
|
||||
case Right(value) ⇒ result.completeWith(fold(value, timeout)(futures.filterNot(_ eq f))(op))
|
||||
case Left(exception) ⇒ result.completeWithException(exception)
|
||||
}
|
||||
}
|
||||
}
|
||||
for (f ← futures) f onComplete seedFold //Attach the listener to the Futures
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API.
|
||||
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
|
||||
*/
|
||||
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Timeout, fun: akka.japi.Function2[R, T, T]): Future[R] =
|
||||
reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _)
|
||||
Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _)
|
||||
|
||||
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout: Timeout, fun)
|
||||
|
||||
/**
|
||||
* Java API.
|
||||
* Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]].
|
||||
* Simple version of Future.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]].
|
||||
* Useful for reducing many Futures into a single Future.
|
||||
*/
|
||||
def sequence[A](in: JIterable[Future[A]], timeout: Timeout): Future[JIterable[A]] =
|
||||
|
|
@ -298,6 +188,116 @@ object Future {
|
|||
def sequence[A, M[_] <: Traversable[_]](timeout: Timeout)(in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] =
|
||||
sequence(in)(cbf, timeout)
|
||||
|
||||
/**
|
||||
* Returns a Future to the result of the first future in the list that is completed
|
||||
*/
|
||||
def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.never): Future[T] = {
|
||||
val futureResult = new DefaultPromise[T](timeout)
|
||||
|
||||
val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _)
|
||||
futures.foreach(_ onComplete completeFirst)
|
||||
|
||||
futureResult
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
|
||||
*/
|
||||
def find[T](predicate: T ⇒ Boolean, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]]): Future[Option[T]] = {
|
||||
if (futures.isEmpty) new KeptPromise[Option[T]](Right(None))
|
||||
else {
|
||||
val result = new DefaultPromise[Option[T]](timeout)
|
||||
val ref = new AtomicInteger(futures.size)
|
||||
val search: Future[T] ⇒ Unit = f ⇒ try {
|
||||
f.result.filter(predicate).foreach(r ⇒ result completeWithResult Some(r))
|
||||
} finally {
|
||||
if (ref.decrementAndGet == 0)
|
||||
result completeWithResult None
|
||||
}
|
||||
futures.foreach(_ onComplete search)
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A non-blocking fold over the specified futures.
|
||||
* The fold is performed on the thread where the last future is completed,
|
||||
* the result will be the first failure of any of the futures, or any failure in the actual fold,
|
||||
* or the result of the fold.
|
||||
* Example:
|
||||
* <pre>
|
||||
* val result = Futures.fold(0)(futures)(_ + _).await.result
|
||||
* </pre>
|
||||
*/
|
||||
def fold[T, R](zero: R, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]])(foldFun: (R, T) ⇒ R): Future[R] = {
|
||||
if (futures.isEmpty) {
|
||||
new KeptPromise[R](Right(zero))
|
||||
} else {
|
||||
val result = new DefaultPromise[R](timeout)
|
||||
val results = new ConcurrentLinkedQueue[T]()
|
||||
val done = new Switch(false)
|
||||
val allDone = futures.size
|
||||
|
||||
val aggregate: Future[T] ⇒ Unit = f ⇒ if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature?
|
||||
f.value.get match {
|
||||
case Right(value) ⇒
|
||||
val added = results add value
|
||||
if (added && results.size == allDone) { //Only one thread can get here
|
||||
if (done.switchOn) {
|
||||
try {
|
||||
val i = results.iterator
|
||||
var currentValue = zero
|
||||
while (i.hasNext) { currentValue = foldFun(currentValue, i.next) }
|
||||
result completeWithResult currentValue
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
EventHandler.error(e, this, e.getMessage)
|
||||
result completeWithException e
|
||||
} finally {
|
||||
results.clear
|
||||
}
|
||||
}
|
||||
}
|
||||
case Left(exception) ⇒
|
||||
if (done.switchOn) {
|
||||
result completeWithException exception
|
||||
results.clear
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
futures foreach { _ onComplete aggregate }
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
|
||||
* Example:
|
||||
* <pre>
|
||||
* val result = Futures.reduce(futures)(_ + _).await.result
|
||||
* </pre>
|
||||
*/
|
||||
def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.default)(op: (R, T) ⇒ T): Future[R] = {
|
||||
if (futures.isEmpty)
|
||||
new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left")))
|
||||
else {
|
||||
val result = new DefaultPromise[R](timeout)
|
||||
val seedFound = new AtomicBoolean(false)
|
||||
val seedFold: Future[T] ⇒ Unit = f ⇒ {
|
||||
if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
|
||||
f.value.get match {
|
||||
case Right(value) ⇒ result.completeWith(fold(value, timeout)(futures.filterNot(_ eq f))(op))
|
||||
case Left(exception) ⇒ result.completeWithException(exception)
|
||||
}
|
||||
}
|
||||
}
|
||||
for (f ← futures) f onComplete seedFold //Attach the listener to the Futures
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A ⇒ Future[B].
|
||||
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
|
||||
|
|
@ -397,9 +397,12 @@ sealed trait Future[+T] extends japi.Future[T] {
|
|||
|
||||
/**
|
||||
* Blocks the current thread until the Future has been completed or the
|
||||
* timeout has expired. The timeout will be the least value of 'atMost' and the timeout
|
||||
* supplied at the constructuion of this Future.
|
||||
* In the case of the timeout expiring a FutureTimeoutException will be thrown.
|
||||
* timeout has expired, additionally bounding the waiting period according to
|
||||
* the <code>atMost</code> parameter. The timeout will be the lesser value of
|
||||
* 'atMost' and the timeout supplied at the constructuion of this Future. In
|
||||
* the case of the timeout expiring a FutureTimeoutException will be thrown.
|
||||
* Other callers of this method are not affected by the additional bound
|
||||
* imposed by <code>atMost</code>.
|
||||
*/
|
||||
def await(atMost: Duration): Future[T]
|
||||
|
||||
|
|
@ -878,12 +881,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
val runnable = new Runnable {
|
||||
def run() {
|
||||
if (!isCompleted) {
|
||||
if (!isExpired) Scheduler.scheduleOnce(this, timeLeft, NANOS)
|
||||
if (!isExpired) Scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS)
|
||||
else func(DefaultPromise.this)
|
||||
}
|
||||
}
|
||||
}
|
||||
Scheduler.scheduleOnce(runnable, timeLeft, NANOS)
|
||||
Scheduler.scheduleOnce(runnable, timeLeft(), NANOS)
|
||||
false
|
||||
} else true
|
||||
} else false
|
||||
|
|
@ -904,12 +907,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
val runnable = new Runnable {
|
||||
def run() {
|
||||
if (!isCompleted) {
|
||||
if (!isExpired) Scheduler.scheduleOnce(this, timeLeft, NANOS)
|
||||
if (!isExpired) Scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS)
|
||||
else promise complete (try { Right(fallback) } catch { case e: Exception ⇒ Left(e) })
|
||||
}
|
||||
}
|
||||
}
|
||||
Scheduler.scheduleOnce(runnable, timeLeft, NANOS)
|
||||
Scheduler.scheduleOnce(runnable, timeLeft(), NANOS)
|
||||
promise
|
||||
}
|
||||
} else this
|
||||
|
|
@ -923,6 +926,8 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
//TODO: the danger of Math.abs is that it could break the ordering of time. So I would not recommend an abs.
|
||||
@inline
|
||||
private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
|
||||
|
||||
private def timeLeftNoinline(): Long = timeLeft()
|
||||
}
|
||||
|
||||
class ActorPromise(timeout: Timeout)(implicit dispatcher: MessageDispatcher) extends DefaultPromise[Any](timeout)(dispatcher) with ForwardableChannel with ExceptionChannel[Any] {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue