Merge branch 'master' of github.com:jboner/akka into wip-ebedd-tune
This commit is contained in:
commit
86e6ffc321
4 changed files with 140 additions and 143 deletions
|
|
@ -54,6 +54,8 @@ case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage
|
|||
|
||||
case object ReceiveTimeout extends LifeCycleMessage
|
||||
|
||||
case object PoisonPill extends LifeCycleMessage
|
||||
|
||||
case class MaximumNumberOfRestartsWithinTimeRangeReached(
|
||||
@BeanProperty val victim: ActorRef,
|
||||
@BeanProperty val maxNrOfRetries: Option[Int],
|
||||
|
|
@ -443,6 +445,12 @@ trait Actor extends Logging {
|
|||
case Unlink(child) => self.unlink(child)
|
||||
case UnlinkAndStop(child) => self.unlink(child); child.stop
|
||||
case Restart(reason) => throw reason
|
||||
case PoisonPill => if(self.senderFuture.isDefined) {
|
||||
self.senderFuture.get.completeWithException(
|
||||
new ActorKilledException("PoisonPill")
|
||||
)
|
||||
}
|
||||
self.stop
|
||||
case msg if !self.hotswap.isEmpty &&
|
||||
self.hotswap.head.isDefinedAt(msg) => self.hotswap.head.apply(msg)
|
||||
case msg if self.hotswap.isEmpty &&
|
||||
|
|
@ -461,6 +469,12 @@ trait Actor extends Logging {
|
|||
case Unlink(child) => self.unlink(child)
|
||||
case UnlinkAndStop(child) => self.unlink(child); child.stop
|
||||
case Restart(reason) => throw reason
|
||||
case PoisonPill => if(self.senderFuture.isDefined) {
|
||||
self.senderFuture.get.completeWithException(
|
||||
new ActorKilledException("PoisonPill")
|
||||
)
|
||||
}
|
||||
self.stop
|
||||
case msg if !self.hotswap.isEmpty &&
|
||||
self.hotswap.head.isDefinedAt(msg) => self.hotswap.head.apply(msg)
|
||||
case msg if self.hotswap.isEmpty &&
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import akka.routing.Dispatcher
|
|||
import java.util.concurrent.locks.ReentrantLock
|
||||
import akka.japi.Procedure
|
||||
import java.util.concurrent. {ConcurrentLinkedQueue, TimeUnit}
|
||||
import java.util.concurrent.TimeUnit.{NANOSECONDS => NANOS, MILLISECONDS => MILLIS}
|
||||
import akka.actor.Actor
|
||||
import annotation.tailrec
|
||||
import java.util.concurrent.atomic. {AtomicBoolean, AtomicInteger}
|
||||
|
|
@ -136,127 +137,135 @@ sealed trait Future[T] {
|
|||
|
||||
def awaitBlocking : Future[T]
|
||||
|
||||
def isCompleted: Boolean
|
||||
final def isCompleted: Boolean = value.isDefined
|
||||
|
||||
def isExpired: Boolean
|
||||
|
||||
def timeoutInNanos: Long
|
||||
|
||||
def result: Option[T]
|
||||
def value: Option[Either[Throwable, T]]
|
||||
|
||||
final def result: Option[T] = {
|
||||
val v = value
|
||||
if (v.isDefined) v.get.right.toOption
|
||||
else None
|
||||
}
|
||||
|
||||
def awaitResult: Option[Either[Throwable, T]]
|
||||
|
||||
/**
|
||||
* Returns the result of the Future if one is available within the specified time,
|
||||
* if the time left on the future is less than the specified time, the time left on the future will be used instead
|
||||
* of the specified time.
|
||||
* returns None if no result, Some(Left(t)) if a result, and Some(Right(error)) if there was an exception
|
||||
* returns None if no result, Some(Right(t)) if a result, and Some(Left(error)) if there was an exception
|
||||
*/
|
||||
def resultWithin(time: Long, unit: TimeUnit): Option[Either[T,Throwable]]
|
||||
def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]]
|
||||
|
||||
def exception: Option[Throwable]
|
||||
final def exception: Option[Throwable] = {
|
||||
val v = value
|
||||
if (v.isDefined) v.get.left.toOption
|
||||
else None
|
||||
}
|
||||
|
||||
def onComplete(func: Future[T] => Unit): Future[T]
|
||||
|
||||
/**
|
||||
* When the future is compeleted, apply the result to the provided PartialFunction if a match is found
|
||||
*/
|
||||
final def receive(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f =>
|
||||
val optr = f.result
|
||||
if (optr.isDefined) {
|
||||
val r = optr.get
|
||||
if (pf.isDefinedAt(r)) pf(r)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current result, throws the exception is one has been raised, else returns None
|
||||
*/
|
||||
def resultOrException: Option[T] = resultWithin(0, TimeUnit.MILLISECONDS) match {
|
||||
case None => None
|
||||
case Some(Left(t)) => Some(t)
|
||||
case Some(Right(t)) => throw t
|
||||
final def resultOrException: Option[T] = {
|
||||
val v = value
|
||||
if (v.isDefined) {
|
||||
val r = v.get
|
||||
if (r.isLeft) throw r.left.get
|
||||
else r.right.toOption
|
||||
} else None
|
||||
}
|
||||
|
||||
/* Java API */
|
||||
def onComplete(proc: Procedure[Future[T]]): Future[T] = onComplete(proc(_))
|
||||
final def onComplete(proc: Procedure[Future[T]]): Future[T] = onComplete(proc(_))
|
||||
|
||||
def map[O](f: (T) => O): Future[O] = {
|
||||
val wrapped = this
|
||||
new Future[O] {
|
||||
def await = { wrapped.await; this }
|
||||
def awaitBlocking = { wrapped.awaitBlocking; this }
|
||||
def isCompleted = wrapped.isCompleted
|
||||
def isExpired = wrapped.isExpired
|
||||
def timeoutInNanos = wrapped.timeoutInNanos
|
||||
def result: Option[O] = { wrapped.result map f }
|
||||
def exception: Option[Throwable] = wrapped.exception
|
||||
def resultWithin(time: Long, unit: TimeUnit): Option[Either[O,Throwable]] = wrapped.resultWithin(time, unit) match {
|
||||
case None => None
|
||||
case Some(Left(t)) => Some(Left(f(t)))
|
||||
case Some(Right(t)) => Some(Right(t))
|
||||
}
|
||||
def onComplete(func: Future[O] => Unit): Future[O] = { wrapped.onComplete(_ => func(this)); this }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trait CompletableFuture[T] extends Future[T] {
|
||||
def completeWithResult(result: T): CompletableFuture[T]
|
||||
def completeWithException(exception: Throwable): CompletableFuture[T]
|
||||
def completeWith(other: Future[T]): CompletableFuture[T] = {
|
||||
val result = other.result
|
||||
val exception = other.exception
|
||||
if (result.isDefined) completeWithResult(result.get)
|
||||
else if (exception.isDefined) completeWithException(exception.get)
|
||||
//else TODO how to handle this case?
|
||||
this
|
||||
def complete(value: Either[Throwable, T]): CompletableFuture[T]
|
||||
final def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result))
|
||||
final def completeWithException(exception: Throwable): CompletableFuture[T] = complete(Left(exception))
|
||||
final def completeWith(other: Future[T]): CompletableFuture[T] = {
|
||||
val v = other.value
|
||||
if (v.isDefined) complete(v.get)
|
||||
else this
|
||||
}
|
||||
}
|
||||
|
||||
// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
|
||||
class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
|
||||
import TimeUnit.{MILLISECONDS => TIME_UNIT}
|
||||
class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends CompletableFuture[T] {
|
||||
|
||||
def this() = this(0)
|
||||
def this() = this(0, MILLIS)
|
||||
|
||||
val timeoutInNanos = TIME_UNIT.toNanos(timeout)
|
||||
def this(timeout: Long) = this(timeout, MILLIS)
|
||||
|
||||
val timeoutInNanos = timeunit.toNanos(timeout)
|
||||
private val _startTimeInNanos = currentTimeInNanos
|
||||
private val _lock = new ReentrantLock
|
||||
private val _signal = _lock.newCondition
|
||||
private var _completed: Boolean = _
|
||||
private var _result: Option[T] = None
|
||||
private var _exception: Option[Throwable] = None
|
||||
private var _value: Option[Either[Throwable, T]] = None
|
||||
private var _listeners: List[Future[T] => Unit] = Nil
|
||||
|
||||
def resultWithin(time: Long, unit: TimeUnit): Option[Either[T,Throwable]] = try {
|
||||
_lock.lock
|
||||
var wait = unit.toNanos(time).min(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos))
|
||||
while (!_completed && wait > 0) {
|
||||
@tailrec
|
||||
private def awaitUnsafe(wait: Long): Boolean = {
|
||||
if (_value.isEmpty && wait > 0) {
|
||||
val start = currentTimeInNanos
|
||||
try {
|
||||
wait = _signal.awaitNanos(wait)
|
||||
awaitUnsafe(try {
|
||||
_signal.awaitNanos(wait)
|
||||
} catch {
|
||||
case e: InterruptedException =>
|
||||
wait = wait - (currentTimeInNanos - start)
|
||||
}
|
||||
wait - (currentTimeInNanos - start)
|
||||
})
|
||||
} else {
|
||||
_value.isDefined
|
||||
}
|
||||
if(_completed) {
|
||||
if (_result.isDefined) Some(Left(_result.get))
|
||||
else Some(Right(_exception.get))
|
||||
} else None
|
||||
}
|
||||
|
||||
def awaitResult: Option[Either[Throwable, T]] = try {
|
||||
_lock.lock
|
||||
awaitUnsafe(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos))
|
||||
_value
|
||||
} finally {
|
||||
_lock.unlock
|
||||
}
|
||||
|
||||
def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = try {
|
||||
_lock.lock
|
||||
awaitUnsafe(unit.toNanos(time).min(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)))
|
||||
_value
|
||||
} finally {
|
||||
_lock.unlock
|
||||
}
|
||||
|
||||
def await = try {
|
||||
_lock.lock
|
||||
var wait = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
|
||||
while (!_completed && wait > 0) {
|
||||
val start = currentTimeInNanos
|
||||
try {
|
||||
wait = _signal.awaitNanos(wait)
|
||||
if (wait <= 0) throw new FutureTimeoutException("Futures timed out after [" + timeout + "] milliseconds")
|
||||
} catch {
|
||||
case e: InterruptedException =>
|
||||
wait = wait - (currentTimeInNanos - start)
|
||||
}
|
||||
}
|
||||
this
|
||||
if (awaitUnsafe(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)))
|
||||
this
|
||||
else
|
||||
throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds")
|
||||
} finally {
|
||||
_lock.unlock
|
||||
}
|
||||
|
||||
def awaitBlocking = try {
|
||||
_lock.lock
|
||||
while (!_completed) {
|
||||
while (_value.isEmpty) {
|
||||
_signal.await
|
||||
}
|
||||
this
|
||||
|
|
@ -264,61 +273,20 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
|
|||
_lock.unlock
|
||||
}
|
||||
|
||||
def isCompleted: Boolean = try {
|
||||
def isExpired: Boolean = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) <= 0
|
||||
|
||||
def value: Option[Either[Throwable, T]] = try {
|
||||
_lock.lock
|
||||
_completed
|
||||
_value
|
||||
} finally {
|
||||
_lock.unlock
|
||||
}
|
||||
|
||||
def isExpired: Boolean = try {
|
||||
_lock.lock
|
||||
timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) <= 0
|
||||
} finally {
|
||||
_lock.unlock
|
||||
}
|
||||
|
||||
def result: Option[T] = try {
|
||||
_lock.lock
|
||||
_result
|
||||
} finally {
|
||||
_lock.unlock
|
||||
}
|
||||
|
||||
def exception: Option[Throwable] = try {
|
||||
_lock.lock
|
||||
_exception
|
||||
} finally {
|
||||
_lock.unlock
|
||||
}
|
||||
|
||||
def completeWithResult(result: T): DefaultCompletableFuture[T] = {
|
||||
def complete(value: Either[Throwable, T]): DefaultCompletableFuture[T] = {
|
||||
val notifyTheseListeners = try {
|
||||
_lock.lock
|
||||
if (!_completed) {
|
||||
_completed = true
|
||||
_result = Some(result)
|
||||
val all = _listeners
|
||||
_listeners = Nil
|
||||
all
|
||||
} else Nil
|
||||
} finally {
|
||||
_signal.signalAll
|
||||
_lock.unlock
|
||||
}
|
||||
|
||||
if (notifyTheseListeners.nonEmpty)
|
||||
notifyTheseListeners foreach notify
|
||||
|
||||
this
|
||||
}
|
||||
|
||||
def completeWithException(exception: Throwable): DefaultCompletableFuture[T] = {
|
||||
val notifyTheseListeners = try {
|
||||
_lock.lock
|
||||
if (!_completed) {
|
||||
_completed = true
|
||||
_exception = Some(exception)
|
||||
if (_value.isEmpty) {
|
||||
_value = Some(value)
|
||||
val all = _listeners
|
||||
_listeners = Nil
|
||||
all
|
||||
|
|
@ -335,21 +303,16 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
|
|||
}
|
||||
|
||||
def onComplete(func: Future[T] => Unit): CompletableFuture[T] = {
|
||||
val notifyNow = try {
|
||||
if (try {
|
||||
_lock.lock
|
||||
if (!_completed) {
|
||||
if (_value.isEmpty) {
|
||||
_listeners ::= func
|
||||
false
|
||||
}
|
||||
else
|
||||
true
|
||||
else true
|
||||
} finally {
|
||||
_lock.unlock
|
||||
}
|
||||
|
||||
if (notifyNow)
|
||||
notify(func)
|
||||
|
||||
}) notify(func)
|
||||
this
|
||||
}
|
||||
|
||||
|
|
@ -357,5 +320,5 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
|
|||
func(this)
|
||||
}
|
||||
|
||||
private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis)
|
||||
private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import org.scalatest.junit.JUnitRunner
|
|||
import org.junit.runner.RunWith
|
||||
|
||||
import akka.actor._
|
||||
import akka.dispatch.Future
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
|
||||
object ActorRefSpec {
|
||||
|
|
@ -97,5 +98,30 @@ class ActorRefSpec extends
|
|||
clientRef.stop
|
||||
serverRef.stop
|
||||
}
|
||||
|
||||
it("should stop when sent a poison pill") {
|
||||
val ref = Actor.actorOf(
|
||||
new Actor {
|
||||
def receive = {
|
||||
case 5 => self reply_? "five"
|
||||
case null => self reply_? "null"
|
||||
}
|
||||
}
|
||||
).start
|
||||
|
||||
val ffive: Future[String] = ref !!! 5
|
||||
val fnull: Future[String] = ref !!! null
|
||||
|
||||
intercept[ActorKilledException] {
|
||||
ref !! PoisonPill
|
||||
fail("shouldn't get here")
|
||||
}
|
||||
|
||||
assert(ffive.resultOrException.get == "five")
|
||||
assert(fnull.resultOrException.get == "null")
|
||||
|
||||
assert(ref.isRunning == false)
|
||||
assert(ref.isShutdown == true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -117,20 +117,6 @@ class FutureSpec extends JUnitSuite {
|
|||
actor2.stop
|
||||
}
|
||||
|
||||
@Test def shouldFutureMapBeDeferred {
|
||||
val latch = new StandardLatch
|
||||
val actor1 = actorOf(new TestDelayActor(latch)).start
|
||||
|
||||
val mappedFuture = (actor1.!!).map(x => 5)
|
||||
assert(mappedFuture.isCompleted === false)
|
||||
assert(mappedFuture.isExpired === false)
|
||||
latch.open
|
||||
mappedFuture.await
|
||||
assert(mappedFuture.isCompleted === true)
|
||||
assert(mappedFuture.isExpired === false)
|
||||
assert(mappedFuture.result === Some(5))
|
||||
}
|
||||
|
||||
@Test def shouldFuturesAwaitMapHandleEmptySequence {
|
||||
assert(Futures.awaitMap[Nothing,Unit](Nil)(x => ()) === Nil)
|
||||
}
|
||||
|
|
@ -211,11 +197,19 @@ class FutureSpec extends JUnitSuite {
|
|||
|
||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!! 5000 else 0 )) }
|
||||
val result = for(f <- futures) yield f.resultWithin(2, TimeUnit.SECONDS)
|
||||
val done = result collect { case Some(Left(x)) => x }
|
||||
val done = result collect { case Some(Right(x)) => x }
|
||||
val undone = result collect { case None => None }
|
||||
val errors = result collect { case Some(Right(t)) => t }
|
||||
val errors = result collect { case Some(Left(t)) => t }
|
||||
assert(done.size === 5)
|
||||
assert(undone.size === 5)
|
||||
assert(errors.size === 0)
|
||||
}
|
||||
|
||||
@Test def receiveShouldExecuteOnComplete {
|
||||
val latch = new StandardLatch
|
||||
val actor = actorOf[TestActor].start
|
||||
actor !!! "Hello" receive { case "World" => latch.open }
|
||||
assert(latch.tryAwait(5, TimeUnit.SECONDS))
|
||||
actor.stop
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue