Added "def !!!: Future" to Actor + Futures.* with util methods

This commit is contained in:
Jonas Bonér 2010-02-23 10:05:47 +01:00
parent 4e8611f4ed
commit 3edbc165f6
3 changed files with 160 additions and 13 deletions

View file

@ -90,7 +90,7 @@ object Actor extends Logging {
*/
def actor(body: PartialFunction[Any, Unit]): Actor = new Actor() {
start
def receive = body
def receive: PartialFunction[Any, Unit] = body
}
/**
@ -108,8 +108,8 @@ object Actor extends Logging {
* </pre>
*
*/
def actor[A](body: => Unit) = {
def handler[A](body: => Unit) = new {
def actor(body: => Unit) = {
def handler(body: => Unit) = new {
def receive(handler: PartialFunction[Any, Unit]) = new Actor() {
start
body
@ -536,19 +536,13 @@ trait Actor extends TransactionManagement {
*/
def !![T](message: Any): Option[T] = !![T](message, timeout)
/*
//FIXME 2.8 def !!!(message: Any)(implicit sender: AnyRef = None): FutureResult = {
def !!!(message: Any)(implicit sender: AnyRef): FutureResult = {
def !!!(message: Any): FutureResult = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (_isRunning) {
val from = if (sender != null && sender.isInstanceOf[Actor]) Some(sender.asInstanceOf[Actor])
else None
postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, from)
postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, None)
} else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
*/
/**
* This method is evil and has been removed. Use '!!' with a timeout instead.

View file

@ -8,10 +8,52 @@
package se.scalablesolutions.akka.dispatch
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
import java.util.concurrent.{SynchronousQueue, TimeUnit}
class FutureTimeoutException(message: String) extends RuntimeException(message)
object Futures {
def awaitAll(futures: List[FutureResult]): Unit = futures.foreach(_.await)
def awaitOne(futures: List[FutureResult]): FutureResult = {
var future: Option[FutureResult] = None
do {
future = futures.find(_.isCompleted)
} while (future.isEmpty)
future.get
}
/*
def awaitEither(f1: FutureResult, f2: FutureResult): Option[Any] = {
import Actor.Sender.Self
import Actor.{spawn, actor}
case class Result(res: Option[Any])
val handOff = new SynchronousQueue[Option[Any]]
spawn {
try {
println("f1 await")
f1.await
println("f1 offer")
handOff.offer(f1.result)
} catch {case _ => {}}
}
spawn {
try {
println("f2 await")
f2.await
println("f2 offer")
println("f2 offer: " + f2.result)
handOff.offer(f2.result)
} catch {case _ => {}}
}
Thread.sleep(100)
handOff.take
}
*/
}
sealed trait FutureResult {
def await
def awaitBlocking
@ -46,7 +88,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
var start = currentTimeInNanos
try {
wait = _signal.awaitNanos(wait)
if (wait <= 0) throw new FutureTimeoutException("Future timed out after [" + timeout + "] milliseconds")
if (wait <= 0) throw new FutureTimeoutException("Futures timed out after [" + timeout + "] milliseconds")
} catch {
case e: InterruptedException =>
wait = wait - (currentTimeInNanos - start)

View file

@ -0,0 +1,111 @@
package se.scalablesolutions.akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Futures
class FutureTest extends JUnitSuite {
class TestActor extends Actor {
def receive = {
case "Hello" =>
reply("World")
case "NoReply" => {}
case "Failure" =>
throw new RuntimeException("expected")
}
}
@Test def shouldActorReplyResultThroughExplicitFuture = {
val actor = new TestActor
actor.start
val future = actor !!! "Hello"
future.await
assert(future.result.isDefined)
assert("World" === future.result.get)
actor.stop
}
@Test def shouldActorReplyExceptionThroughExplicitFuture = {
val actor = new TestActor
actor.start
val future = actor !!! "Failure"
future.await
assert(future.exception.isDefined)
assert("expected" === future.exception.get._2.getMessage)
actor.stop
}
/*
@Test def shouldFutureAwaitEitherLeft = {
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
val future1 = actor1 !!! "Hello"
val future2 = actor2 !!! "NoReply"
val result = Futures.awaitEither(future1, future2)
assert(result.isDefined)
assert("World" === result.get)
actor1.stop
actor2.stop
}
@Test def shouldFutureAwaitEitherRight = {
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
val future1 = actor1 !!! "NoReply"
val future2 = actor2 !!! "Hello"
val result = Futures.awaitEither(future1, future2)
assert(result.isDefined)
assert("World" === result.get)
actor1.stop
actor2.stop
}
*/
@Test def shouldFutureAwaitOneLeft = {
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
val future1 = actor1 !!! "NoReply"
val future2 = actor2 !!! "Hello"
val result = Futures.awaitOne(List(future1, future2))
assert(result.result.isDefined)
assert("World" === result.result.get)
actor1.stop
actor2.stop
}
@Test def shouldFutureAwaitOneRight = {
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
val future1 = actor1 !!! "Hello"
val future2 = actor2 !!! "NoReply"
val result = Futures.awaitOne(List(future1, future2))
assert(result.result.isDefined)
assert("World" === result.result.get)
actor1.stop
actor2.stop
}
@Test def shouldFutureAwaitAll = {
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
val future1 = actor1 !!! "Hello"
val future2 = actor2 !!! "Hello"
Futures.awaitAll(List(future1, future2))
assert(future1.result.isDefined)
assert("World" === future1.result.get)
assert(future2.result.isDefined)
assert("World" === future2.result.get)
actor1.stop
actor2.stop
}
}