From db79e2bd6244308d58529e33e9850f36e437e07e Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Fri, 11 Feb 2011 14:46:39 -0700 Subject: [PATCH 1/9] Use an Option[Either[Throwable, T]] to hold the value of a Future --- .../src/main/scala/akka/dispatch/Future.scala | 117 +++++------------- .../test/scala/akka/dispatch/FutureSpec.scala | 18 +-- 2 files changed, 30 insertions(+), 105 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index f4f304606c..60a16218c7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -136,68 +136,48 @@ sealed trait Future[T] { def awaitBlocking : Future[T] - def isCompleted: Boolean + def isCompleted: Boolean = value.isDefined def isExpired: Boolean def timeoutInNanos: Long - def result: Option[T] + def value: Option[Either[Throwable, T]] + + def result: Option[T] = value flatMap (_.right.toOption) /** * Returns the result of the Future if one is available within the specified time, * if the time left on the future is less than the specified time, the time left on the future will be used instead * of the specified time. - * returns None if no result, Some(Left(t)) if a result, and Some(Right(error)) if there was an exception + * returns None if no result, Some(Right(t)) if a result, and Some(Left(error)) if there was an exception */ - def resultWithin(time: Long, unit: TimeUnit): Option[Either[T,Throwable]] + def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] - def exception: Option[Throwable] + def exception: Option[Throwable] = value flatMap (_.left.toOption) def onComplete(func: Future[T] => Unit): Future[T] /** * Returns the current result, throws the exception is one has been raised, else returns None */ - def resultOrException: Option[T] = resultWithin(0, TimeUnit.MILLISECONDS) match { - case None => None - case Some(Left(t)) => Some(t) - case Some(Right(t)) => throw t - } + def resultOrException: Option[T] = value map (_.fold(t => throw t, identity)) /* Java API */ def onComplete(proc: Procedure[Future[T]]): Future[T] = onComplete(proc(_)) - def map[O](f: (T) => O): Future[O] = { - val wrapped = this - new Future[O] { - def await = { wrapped.await; this } - def awaitBlocking = { wrapped.awaitBlocking; this } - def isCompleted = wrapped.isCompleted - def isExpired = wrapped.isExpired - def timeoutInNanos = wrapped.timeoutInNanos - def result: Option[O] = { wrapped.result map f } - def exception: Option[Throwable] = wrapped.exception - def resultWithin(time: Long, unit: TimeUnit): Option[Either[O,Throwable]] = wrapped.resultWithin(time, unit) match { - case None => None - case Some(Left(t)) => Some(Left(f(t))) - case Some(Right(t)) => Some(Right(t)) - } - def onComplete(func: Future[O] => Unit): Future[O] = { wrapped.onComplete(_ => func(this)); this } - } - } } trait CompletableFuture[T] extends Future[T] { - def completeWithResult(result: T): CompletableFuture[T] - def completeWithException(exception: Throwable): CompletableFuture[T] + def completeWithValue(value: Either[Throwable, T]): CompletableFuture[T] + def completeWithResult(result: T): CompletableFuture[T] = completeWithValue(Right(result)) + def completeWithException(exception: Throwable): CompletableFuture[T] = completeWithValue(Left(exception)) def completeWith(other: Future[T]): CompletableFuture[T] = { - val result = other.result - val exception = other.exception - if (result.isDefined) completeWithResult(result.get) - else if (exception.isDefined) completeWithException(exception.get) - //else TODO how to handle this case? - this + val value = other.value + if (value.isDefined) + completeWithValue(value.get) + else + this } } @@ -211,15 +191,13 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { private val _startTimeInNanos = currentTimeInNanos private val _lock = new ReentrantLock private val _signal = _lock.newCondition - private var _completed: Boolean = _ - private var _result: Option[T] = None - private var _exception: Option[Throwable] = None + private var _value: Option[Either[Throwable, T]] = None private var _listeners: List[Future[T] => Unit] = Nil - def resultWithin(time: Long, unit: TimeUnit): Option[Either[T,Throwable]] = try { + def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = try { _lock.lock var wait = unit.toNanos(time).min(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)) - while (!_completed && wait > 0) { + while (!_value.isDefined && wait > 0) { val start = currentTimeInNanos try { wait = _signal.awaitNanos(wait) @@ -228,10 +206,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { wait = wait - (currentTimeInNanos - start) } } - if(_completed) { - if (_result.isDefined) Some(Left(_result.get)) - else Some(Right(_exception.get)) - } else None + _value } finally { _lock.unlock } @@ -239,7 +214,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { def await = try { _lock.lock var wait = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) - while (!_completed && wait > 0) { + while (!_value.isDefined && wait > 0) { val start = currentTimeInNanos try { wait = _signal.awaitNanos(wait) @@ -256,7 +231,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { def awaitBlocking = try { _lock.lock - while (!_completed) { + while (!_value.isDefined) { _signal.await } this @@ -264,13 +239,6 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { _lock.unlock } - def isCompleted: Boolean = try { - _lock.lock - _completed - } finally { - _lock.unlock - } - def isExpired: Boolean = try { _lock.lock timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) <= 0 @@ -278,47 +246,18 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { _lock.unlock } - def result: Option[T] = try { + def value: Option[Either[Throwable, T]] = try { _lock.lock - _result + _value } finally { _lock.unlock } - def exception: Option[Throwable] = try { - _lock.lock - _exception - } finally { - _lock.unlock - } - - def completeWithResult(result: T): DefaultCompletableFuture[T] = { + def completeWithValue(value: Either[Throwable, T]): DefaultCompletableFuture[T] = { val notifyTheseListeners = try { _lock.lock - if (!_completed) { - _completed = true - _result = Some(result) - val all = _listeners - _listeners = Nil - all - } else Nil - } finally { - _signal.signalAll - _lock.unlock - } - - if (notifyTheseListeners.nonEmpty) - notifyTheseListeners foreach notify - - this - } - - def completeWithException(exception: Throwable): DefaultCompletableFuture[T] = { - val notifyTheseListeners = try { - _lock.lock - if (!_completed) { - _completed = true - _exception = Some(exception) + if (!_value.isDefined) { + _value = Some(value) val all = _listeners _listeners = Nil all @@ -337,7 +276,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { def onComplete(func: Future[T] => Unit): CompletableFuture[T] = { val notifyNow = try { _lock.lock - if (!_completed) { + if (!_value.isDefined) { _listeners ::= func false } diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala index f7906106af..4aa25d3f05 100644 --- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala @@ -117,20 +117,6 @@ class FutureSpec extends JUnitSuite { actor2.stop } - @Test def shouldFutureMapBeDeferred { - val latch = new StandardLatch - val actor1 = actorOf(new TestDelayActor(latch)).start - - val mappedFuture = (actor1.!!![String]("Hello")).map(x => 5) - assert(mappedFuture.isCompleted === false) - assert(mappedFuture.isExpired === false) - latch.open - mappedFuture.await - assert(mappedFuture.isCompleted === true) - assert(mappedFuture.isExpired === false) - assert(mappedFuture.result === Some(5)) - } - @Test def shouldFuturesAwaitMapHandleEmptySequence { assert(Futures.awaitMap[Nothing,Unit](Nil)(x => ()) === Nil) } @@ -211,9 +197,9 @@ class FutureSpec extends JUnitSuite { def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!!![Int]((idx, if(idx >= 5) 5000 else 0 )) } val result = for(f <- futures) yield f.resultWithin(2, TimeUnit.SECONDS) - val done = result collect { case Some(Left(x)) => x } + val done = result collect { case Some(Right(x)) => x } val undone = result collect { case None => None } - val errors = result collect { case Some(Right(t)) => t } + val errors = result collect { case Some(Left(t)) => t } assert(done.size === 5) assert(undone.size === 5) assert(errors.size === 0) From b625b56ee3e08e136cfa7c4f938fef8710d1a2b2 Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Fri, 11 Feb 2011 15:12:35 -0700 Subject: [PATCH 2/9] Throw an exception if Future.await is called on an expired and uncompleted Future. Ref #659 --- akka-actor/src/main/scala/akka/dispatch/Future.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 60a16218c7..fa3a1964d4 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -224,6 +224,8 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { wait = wait - (currentTimeInNanos - start) } } + if (!_value.isDefined) + throw new FutureTimeoutException("Futures timed out after [" + timeout + "] milliseconds") this } finally { _lock.unlock From c9449a0f48551f382a14b95a372d89d9a2eff3aa Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Sat, 12 Feb 2011 09:01:15 -0700 Subject: [PATCH 3/9] Rename completeWithValue to complete --- akka-actor/src/main/scala/akka/dispatch/Future.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index fa3a1964d4..5046626a6e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -169,13 +169,13 @@ sealed trait Future[T] { } trait CompletableFuture[T] extends Future[T] { - def completeWithValue(value: Either[Throwable, T]): CompletableFuture[T] - def completeWithResult(result: T): CompletableFuture[T] = completeWithValue(Right(result)) - def completeWithException(exception: Throwable): CompletableFuture[T] = completeWithValue(Left(exception)) + def complete(value: Either[Throwable, T]): CompletableFuture[T] + def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result)) + def completeWithException(exception: Throwable): CompletableFuture[T] = complete(Left(exception)) def completeWith(other: Future[T]): CompletableFuture[T] = { val value = other.value if (value.isDefined) - completeWithValue(value.get) + complete(value.get) else this } @@ -255,7 +255,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { _lock.unlock } - def completeWithValue(value: Either[Throwable, T]): DefaultCompletableFuture[T] = { + def complete(value: Either[Throwable, T]): DefaultCompletableFuture[T] = { val notifyTheseListeners = try { _lock.lock if (!_value.isDefined) { From 6285ad14d6632ea19427bc05393b811b2dcfc070 Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Sat, 12 Feb 2011 09:16:29 -0700 Subject: [PATCH 4/9] move repeated code to it's own method, replace loop with tailrec --- .../src/main/scala/akka/dispatch/Future.scala | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 5046626a6e..0d3bfc9a31 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -194,18 +194,24 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { private var _value: Option[Either[Throwable, T]] = None private var _listeners: List[Future[T] => Unit] = Nil - def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = try { - _lock.lock - var wait = unit.toNanos(time).min(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)) - while (!_value.isDefined && wait > 0) { + @scala.annotation.tailrec + private def awaitUnsafe(wait: Long): Boolean = { + if (!_value.isDefined && wait > 0) { val start = currentTimeInNanos - try { - wait = _signal.awaitNanos(wait) + awaitUnsafe(try { + _signal.awaitNanos(wait) } catch { case e: InterruptedException => - wait = wait - (currentTimeInNanos - start) - } + wait - (currentTimeInNanos - start) + }) + } else { + _value.isDefined } + } + + def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = try { + _lock.lock + awaitUnsafe(unit.toNanos(time).min(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos))) _value } finally { _lock.unlock @@ -213,20 +219,10 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { def await = try { _lock.lock - var wait = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) - while (!_value.isDefined && wait > 0) { - val start = currentTimeInNanos - try { - wait = _signal.awaitNanos(wait) - if (wait <= 0) throw new FutureTimeoutException("Futures timed out after [" + timeout + "] milliseconds") - } catch { - case e: InterruptedException => - wait = wait - (currentTimeInNanos - start) - } - } - if (!_value.isDefined) + if (awaitUnsafe(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos))) + this + else throw new FutureTimeoutException("Futures timed out after [" + timeout + "] milliseconds") - this } finally { _lock.unlock } From 58359e00c63fe707b690bac8ffb49b9564e3c1a3 Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Sat, 12 Feb 2011 11:01:08 -0700 Subject: [PATCH 5/9] Add method on Future to await and return the result. Works like resultWithin, but does not need an explicit timeout. --- akka-actor/src/main/scala/akka/dispatch/Future.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 0d3bfc9a31..ab0e064c4a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -146,6 +146,8 @@ sealed trait Future[T] { def result: Option[T] = value flatMap (_.right.toOption) + def awaitResult: Option[Either[Throwable, T]] + /** * Returns the result of the Future if one is available within the specified time, * if the time left on the future is less than the specified time, the time left on the future will be used instead @@ -209,6 +211,14 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { } } + def awaitResult: Option[Either[Throwable, T]] = try { + _lock.lock + awaitUnsafe(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)) + _value + } finally { + _lock.unlock + } + def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = try { _lock.lock awaitUnsafe(unit.toNanos(time).min(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos))) From 94c4546d57baef85f6b3d6d659d9e7a0e669c442 Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Sat, 12 Feb 2011 12:26:56 -0700 Subject: [PATCH 6/9] Allow specifying the timeunit of a Future's timeout. The compiler should also no longer store the timeout field since it is not referenced in any methods anymore --- .../src/main/scala/akka/dispatch/Future.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index ab0e064c4a..5fb4755331 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -11,6 +11,7 @@ import akka.routing.Dispatcher import java.util.concurrent.locks.ReentrantLock import akka.japi.Procedure import java.util.concurrent. {ConcurrentLinkedQueue, TimeUnit} +import java.util.concurrent.TimeUnit.{NANOSECONDS => NANOS, MILLISECONDS => MILLIS} import akka.actor.Actor import annotation.tailrec import java.util.concurrent.atomic. {AtomicBoolean, AtomicInteger} @@ -184,19 +185,20 @@ trait CompletableFuture[T] extends Future[T] { } // Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. -class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { - import TimeUnit.{MILLISECONDS => TIME_UNIT} +class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends CompletableFuture[T] { - def this() = this(0) + def this() = this(0, MILLIS) - val timeoutInNanos = TIME_UNIT.toNanos(timeout) + def this(timeout: Long) = this(timeout, MILLIS) + + val timeoutInNanos = timeunit.toNanos(timeout) private val _startTimeInNanos = currentTimeInNanos private val _lock = new ReentrantLock private val _signal = _lock.newCondition private var _value: Option[Either[Throwable, T]] = None private var _listeners: List[Future[T] => Unit] = Nil - @scala.annotation.tailrec + @tailrec private def awaitUnsafe(wait: Long): Boolean = { if (!_value.isDefined && wait > 0) { val start = currentTimeInNanos @@ -232,7 +234,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { if (awaitUnsafe(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos))) this else - throw new FutureTimeoutException("Futures timed out after [" + timeout + "] milliseconds") + throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds") } finally { _lock.unlock } @@ -304,5 +306,5 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { func(this) } - private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis) + private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis) } From 1e09ea8d266b695242200325045c5959c72bfd48 Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Sun, 13 Feb 2011 19:59:54 -0700 Subject: [PATCH 7/9] Refactoring based on Viktor's suggestions --- .../src/main/scala/akka/dispatch/Future.scala | 65 ++++++++++--------- 1 file changed, 34 insertions(+), 31 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 5fb4755331..26375bddf1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -137,7 +137,7 @@ sealed trait Future[T] { def awaitBlocking : Future[T] - def isCompleted: Boolean = value.isDefined + final def isCompleted: Boolean = value.isDefined def isExpired: Boolean @@ -145,7 +145,11 @@ sealed trait Future[T] { def value: Option[Either[Throwable, T]] - def result: Option[T] = value flatMap (_.right.toOption) + final def result: Option[T] = { + val v = value + if (v.isDefined) v.get.right.toOption + else None + } def awaitResult: Option[Either[Throwable, T]] @@ -157,30 +161,39 @@ sealed trait Future[T] { */ def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] - def exception: Option[Throwable] = value flatMap (_.left.toOption) + final def exception: Option[Throwable] = { + val v = value + if (v.isDefined) v.get.left.toOption + else None + } def onComplete(func: Future[T] => Unit): Future[T] /** * Returns the current result, throws the exception is one has been raised, else returns None */ - def resultOrException: Option[T] = value map (_.fold(t => throw t, identity)) + final def resultOrException: Option[T] = { + val v = value + if (v.isDefined) { + val r = v.get + if (r.isLeft) throw r.left.get + else r.right.toOption + } else None + } /* Java API */ - def onComplete(proc: Procedure[Future[T]]): Future[T] = onComplete(proc(_)) + final def onComplete(proc: Procedure[Future[T]]): Future[T] = onComplete(proc(_)) } trait CompletableFuture[T] extends Future[T] { def complete(value: Either[Throwable, T]): CompletableFuture[T] - def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result)) - def completeWithException(exception: Throwable): CompletableFuture[T] = complete(Left(exception)) - def completeWith(other: Future[T]): CompletableFuture[T] = { - val value = other.value - if (value.isDefined) - complete(value.get) - else - this + final def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result)) + final def completeWithException(exception: Throwable): CompletableFuture[T] = complete(Left(exception)) + final def completeWith(other: Future[T]): CompletableFuture[T] = { + val v = other.value + if (v.isDefined) complete(v.get) + else this } } @@ -200,7 +213,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com @tailrec private def awaitUnsafe(wait: Long): Boolean = { - if (!_value.isDefined && wait > 0) { + if (_value.isEmpty && wait > 0) { val start = currentTimeInNanos awaitUnsafe(try { _signal.awaitNanos(wait) @@ -241,7 +254,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com def awaitBlocking = try { _lock.lock - while (!_value.isDefined) { + while (_value.isEmpty) { _signal.await } this @@ -249,12 +262,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com _lock.unlock } - def isExpired: Boolean = try { - _lock.lock - timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) <= 0 - } finally { - _lock.unlock - } + def isExpired: Boolean = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) <= 0 def value: Option[Either[Throwable, T]] = try { _lock.lock @@ -266,7 +274,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com def complete(value: Either[Throwable, T]): DefaultCompletableFuture[T] = { val notifyTheseListeners = try { _lock.lock - if (!_value.isDefined) { + if (_value.isEmpty) { _value = Some(value) val all = _listeners _listeners = Nil @@ -284,21 +292,16 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } def onComplete(func: Future[T] => Unit): CompletableFuture[T] = { - val notifyNow = try { + if (try { _lock.lock - if (!_value.isDefined) { + if (_value.isEmpty) { _listeners ::= func false } - else - true + else true } finally { _lock.unlock - } - - if (notifyNow) - notify(func) - + }) notify(func) this } From e7ad2a9d5d56d16a7eb4c54083d0a9fa3a368a05 Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Sun, 13 Feb 2011 21:11:37 -0700 Subject: [PATCH 8/9] Add Future.receive(pf: PartialFunction[Any,Unit]), closes #636 --- akka-actor/src/main/scala/akka/dispatch/Future.scala | 11 +++++++++++ .../src/test/scala/akka/dispatch/FutureSpec.scala | 8 ++++++++ 2 files changed, 19 insertions(+) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 26375bddf1..2a5ff54ff5 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -169,6 +169,17 @@ sealed trait Future[T] { def onComplete(func: Future[T] => Unit): Future[T] + /** + * When the future is compeleted, apply the result to the provided PartialFunction if a match is found + */ + final def receive(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f => + val optr = f.result + if (optr.isDefined) { + val r = optr.get + if (pf.isDefinedAt(r)) pf(r) + } + } + /** * Returns the current result, throws the exception is one has been raised, else returns None */ diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala index 4aa25d3f05..d2b442c656 100644 --- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala @@ -204,4 +204,12 @@ class FutureSpec extends JUnitSuite { assert(undone.size === 5) assert(errors.size === 0) } + + @Test def receiveShouldExecuteOnComplete { + val latch = new StandardLatch + val actor = actorOf[TestActor].start + actor !!! "Hello" receive { case "World" => latch.open } + assert(latch.tryAwait(5, TimeUnit.SECONDS)) + actor.stop + } } From ce5ad5eb9769f18fe15fd1b26362ab4587e47773 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 14 Feb 2011 22:23:38 +0100 Subject: [PATCH 9/9] Adding support for PoisonPill --- .../src/main/scala/akka/actor/Actor.scala | 14 ++++++++++ .../scala/akka/actor/actor/ActorRefSpec.scala | 26 +++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index ffea228a74..607f54204b 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -54,6 +54,8 @@ case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage case object ReceiveTimeout extends LifeCycleMessage +case object PoisonPill extends LifeCycleMessage + case class MaximumNumberOfRestartsWithinTimeRangeReached( @BeanProperty val victim: ActorRef, @BeanProperty val maxNrOfRetries: Option[Int], @@ -443,6 +445,12 @@ trait Actor extends Logging { case Unlink(child) => self.unlink(child) case UnlinkAndStop(child) => self.unlink(child); child.stop case Restart(reason) => throw reason + case PoisonPill => if(self.senderFuture.isDefined) { + self.senderFuture.get.completeWithException( + new ActorKilledException("PoisonPill") + ) + } + self.stop case msg if !self.hotswap.isEmpty && self.hotswap.head.isDefinedAt(msg) => self.hotswap.head.apply(msg) case msg if self.hotswap.isEmpty && @@ -461,6 +469,12 @@ trait Actor extends Logging { case Unlink(child) => self.unlink(child) case UnlinkAndStop(child) => self.unlink(child); child.stop case Restart(reason) => throw reason + case PoisonPill => if(self.senderFuture.isDefined) { + self.senderFuture.get.completeWithException( + new ActorKilledException("PoisonPill") + ) + } + self.stop case msg if !self.hotswap.isEmpty && self.hotswap.head.isDefinedAt(msg) => self.hotswap.head.apply(msg) case msg if self.hotswap.isEmpty && diff --git a/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala index b1f715c65c..345bc0457f 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala @@ -11,6 +11,7 @@ import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith import akka.actor._ +import akka.dispatch.Future import java.util.concurrent.{CountDownLatch, TimeUnit} object ActorRefSpec { @@ -97,5 +98,30 @@ class ActorRefSpec extends clientRef.stop serverRef.stop } + + it("should stop when sent a poison pill") { + val ref = Actor.actorOf( + new Actor { + def receive = { + case 5 => self reply_? "five" + case null => self reply_? "null" + } + } + ).start + + val ffive: Future[String] = ref !!! 5 + val fnull: Future[String] = ref !!! null + + intercept[ActorKilledException] { + ref !! PoisonPill + fail("shouldn't get here") + } + + assert(ffive.resultOrException.get == "five") + assert(fnull.resultOrException.get == "null") + + assert(ref.isRunning == false) + assert(ref.isShutdown == true) + } } }