Replacing !!! with ?
This commit is contained in:
parent
fd5afde4ff
commit
fa0478bc32
26 changed files with 102 additions and 100 deletions
|
|
@ -310,8 +310,8 @@ class ActorRefSpec extends WordSpec with MustMatchers {
|
||||||
}
|
}
|
||||||
}).start()
|
}).start()
|
||||||
|
|
||||||
val ffive: Future[String] = ref !!! 5
|
val ffive: Future[String] = ref ? 5
|
||||||
val fnull: Future[String] = ref !!! null
|
val fnull: Future[String] = ref ? null
|
||||||
|
|
||||||
intercept[ActorKilledException] {
|
intercept[ActorKilledException] {
|
||||||
ref !! PoisonPill
|
ref !! PoisonPill
|
||||||
|
|
|
||||||
|
|
@ -348,9 +348,9 @@ abstract class ActorModelSpec extends JUnitSuite {
|
||||||
implicit val dispatcher = newInterceptedDispatcher
|
implicit val dispatcher = newInterceptedDispatcher
|
||||||
val a = newTestActor.start()
|
val a = newTestActor.start()
|
||||||
dispatcher.suspend(a)
|
dispatcher.suspend(a)
|
||||||
val f1: Future[String] = a !!! Reply("foo")
|
val f1: Future[String] = a ? Reply("foo")
|
||||||
val stopped = a !!! PoisonPill
|
val stopped = a ? PoisonPill
|
||||||
val shouldBeCompleted = for (i ← 1 to 10) yield a !!! Reply(i)
|
val shouldBeCompleted = for (i ← 1 to 10) yield a ? Reply(i)
|
||||||
dispatcher.resume(a)
|
dispatcher.resume(a)
|
||||||
assert(f1.get === "foo")
|
assert(f1.get === "foo")
|
||||||
stopped.await
|
stopped.await
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,7 @@ class FutureSpec extends JUnitSuite {
|
||||||
def shouldActorReplyResultThroughExplicitFuture {
|
def shouldActorReplyResultThroughExplicitFuture {
|
||||||
val actor = actorOf[TestActor]
|
val actor = actorOf[TestActor]
|
||||||
actor.start()
|
actor.start()
|
||||||
val future = actor !!! "Hello"
|
val future = actor ? "Hello"
|
||||||
future.await
|
future.await
|
||||||
assert(future.result.isDefined)
|
assert(future.result.isDefined)
|
||||||
assert("World" === future.result.get)
|
assert("World" === future.result.get)
|
||||||
|
|
@ -51,7 +51,7 @@ class FutureSpec extends JUnitSuite {
|
||||||
def shouldActorReplyExceptionThroughExplicitFuture {
|
def shouldActorReplyExceptionThroughExplicitFuture {
|
||||||
val actor = actorOf[TestActor]
|
val actor = actorOf[TestActor]
|
||||||
actor.start()
|
actor.start()
|
||||||
val future = actor !!! "Failure"
|
val future = actor ? "Failure"
|
||||||
future.await
|
future.await
|
||||||
assert(future.exception.isDefined)
|
assert(future.exception.isDefined)
|
||||||
assert("Expected exception; to test fault-tolerance" === future.exception.get.getMessage)
|
assert("Expected exception; to test fault-tolerance" === future.exception.get.getMessage)
|
||||||
|
|
@ -62,9 +62,9 @@ class FutureSpec extends JUnitSuite {
|
||||||
def shouldFutureCompose {
|
def shouldFutureCompose {
|
||||||
val actor1 = actorOf[TestActor].start()
|
val actor1 = actorOf[TestActor].start()
|
||||||
val actor2 = actorOf(new Actor { def receive = { case s: String ⇒ self reply s.toUpperCase } }).start()
|
val actor2 = actorOf(new Actor { def receive = { case s: String ⇒ self reply s.toUpperCase } }).start()
|
||||||
val future1 = actor1 !!! "Hello" flatMap ((s: String) ⇒ actor2 !!! s)
|
val future1 = actor1 ? "Hello" flatMap ((s: String) ⇒ actor2 ? s)
|
||||||
val future2 = actor1 !!! "Hello" flatMap (actor2 !!! (_: String))
|
val future2 = actor1 ? "Hello" flatMap (actor2 ? (_: String))
|
||||||
val future3 = actor1 !!! "Hello" flatMap (actor2 !!! (_: Int))
|
val future3 = actor1 ? "Hello" flatMap (actor2 ? (_: Int))
|
||||||
assert((future1.get: Any) === "WORLD")
|
assert((future1.get: Any) === "WORLD")
|
||||||
assert((future2.get: Any) === "WORLD")
|
assert((future2.get: Any) === "WORLD")
|
||||||
intercept[ClassCastException] { future3.get }
|
intercept[ClassCastException] { future3.get }
|
||||||
|
|
@ -76,8 +76,8 @@ class FutureSpec extends JUnitSuite {
|
||||||
def shouldFutureComposePatternMatch {
|
def shouldFutureComposePatternMatch {
|
||||||
val actor1 = actorOf[TestActor].start()
|
val actor1 = actorOf[TestActor].start()
|
||||||
val actor2 = actorOf(new Actor { def receive = { case s: String ⇒ self reply s.toUpperCase } }).start()
|
val actor2 = actorOf(new Actor { def receive = { case s: String ⇒ self reply s.toUpperCase } }).start()
|
||||||
val future1 = actor1 !!! "Hello" collect { case (s: String) ⇒ s } flatMap (actor2 !!! _)
|
val future1 = actor1 ? "Hello" collect { case (s: String) ⇒ s } flatMap (actor2 ? _)
|
||||||
val future2 = actor1 !!! "Hello" collect { case (n: Int) ⇒ n } flatMap (actor2 !!! _)
|
val future2 = actor1 ? "Hello" collect { case (n: Int) ⇒ n } flatMap (actor2 ? _)
|
||||||
assert((future1.get: Any) === "WORLD")
|
assert((future1.get: Any) === "WORLD")
|
||||||
intercept[MatchError] { future2.get }
|
intercept[MatchError] { future2.get }
|
||||||
actor1.stop()
|
actor1.stop()
|
||||||
|
|
@ -93,18 +93,18 @@ class FutureSpec extends JUnitSuite {
|
||||||
}
|
}
|
||||||
}).start()
|
}).start()
|
||||||
|
|
||||||
val future0 = actor !!! "Hello"
|
val future0 = actor ? "Hello"
|
||||||
|
|
||||||
val future1 = for {
|
val future1 = for {
|
||||||
a: Int ← future0 // returns 5
|
a: Int ← future0 // returns 5
|
||||||
b: String ← actor !!! a // returns "10"
|
b: String ← actor ? a // returns "10"
|
||||||
c: String ← actor !!! 7 // returns "14"
|
c: String ← actor ? 7 // returns "14"
|
||||||
} yield b + "-" + c
|
} yield b + "-" + c
|
||||||
|
|
||||||
val future2 = for {
|
val future2 = for {
|
||||||
a: Int ← future0
|
a: Int ← future0
|
||||||
b: Int ← actor !!! a
|
b: Int ← actor ? a
|
||||||
c: String ← actor !!! 7
|
c: String ← actor ? 7
|
||||||
} yield b + "-" + c
|
} yield b + "-" + c
|
||||||
|
|
||||||
assert(future1.get === "10-14")
|
assert(future1.get === "10-14")
|
||||||
|
|
@ -124,15 +124,15 @@ class FutureSpec extends JUnitSuite {
|
||||||
}).start()
|
}).start()
|
||||||
|
|
||||||
val future1 = for {
|
val future1 = for {
|
||||||
Res(a: Int) ← actor.!!![Res[Int]](Req("Hello"))
|
Res(a: Int) ← actor.?[Res[Int]](Req("Hello"))
|
||||||
Res(b: String) ← actor.!!![Res[String]](Req(a))
|
Res(b: String) ← actor.?[Res[String]](Req(a))
|
||||||
Res(c: String) ← actor.!!![Res[String]](Req(7))
|
Res(c: String) ← actor.?[Res[String]](Req(7))
|
||||||
} yield b + "-" + c
|
} yield b + "-" + c
|
||||||
|
|
||||||
val future2 = for {
|
val future2 = for {
|
||||||
Res(a: Int) ← actor.!!)
|
Res(a: Int) ← actor.?[Any](Req("Hello"))
|
||||||
Res(b: Int) ← actor.!!![Res[Int]](Req(a))
|
Res(b: Int) ← actor.?[Res[Int]](Req(a))
|
||||||
Res(c: Int) ← actor.!!![Res[Int]](Req(7))
|
Res(c: Int) ← actor.?[Res[Int]](Req(7))
|
||||||
} yield b + "-" + c
|
} yield b + "-" + c
|
||||||
|
|
||||||
assert(future1.get === "10-14")
|
assert(future1.get === "10-14")
|
||||||
|
|
@ -162,14 +162,14 @@ class FutureSpec extends JUnitSuite {
|
||||||
|
|
||||||
val actor = actorOf[TestActor].start()
|
val actor = actorOf[TestActor].start()
|
||||||
|
|
||||||
val future8 = actor !!! "Failure"
|
val future8 = actor ? "Failure"
|
||||||
val future9 = actor !!! "Failure" recover {
|
val future9 = actor ? "Failure" recover {
|
||||||
case e: RuntimeException ⇒ "FAIL!"
|
case e: RuntimeException ⇒ "FAIL!"
|
||||||
}
|
}
|
||||||
val future10 = actor !!! "Hello" recover {
|
val future10 = actor ? "Hello" recover {
|
||||||
case e: RuntimeException ⇒ "FAIL!"
|
case e: RuntimeException ⇒ "FAIL!"
|
||||||
}
|
}
|
||||||
val future11 = actor !!! "Failure" recover { case _ ⇒ "Oops!" }
|
val future11 = actor ? "Failure" recover { case _ ⇒ "Oops!" }
|
||||||
|
|
||||||
assert(future1.get === 5)
|
assert(future1.get === 5)
|
||||||
intercept[ArithmeticException] { future2.get }
|
intercept[ArithmeticException] { future2.get }
|
||||||
|
|
@ -194,7 +194,7 @@ class FutureSpec extends JUnitSuite {
|
||||||
}).start()
|
}).start()
|
||||||
}
|
}
|
||||||
val timeout = 10000
|
val timeout = 10000
|
||||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.!!, timeout) }
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?[Int]((idx, idx * 200), timeout) }
|
||||||
assert(Futures.fold(0, timeout)(futures)(_ + _).await.result.get === 45)
|
assert(Futures.fold(0, timeout)(futures)(_ + _).await.result.get === 45)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -205,7 +205,7 @@ class FutureSpec extends JUnitSuite {
|
||||||
def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); self reply_? add }
|
def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); self reply_? add }
|
||||||
}).start()
|
}).start()
|
||||||
}
|
}
|
||||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.!!, 10000) }
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?[Int]((idx, idx * 200), 10000) }
|
||||||
assert(futures.foldLeft(Future(0))((fr, fa) ⇒ for (r ← fr; a ← fa) yield (r + a)).get === 45)
|
assert(futures.foldLeft(Future(0))((fr, fa) ⇒ for (r ← fr; a ← fa) yield (r + a)).get === 45)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -222,7 +222,7 @@ class FutureSpec extends JUnitSuite {
|
||||||
}).start()
|
}).start()
|
||||||
}
|
}
|
||||||
val timeout = 10000
|
val timeout = 10000
|
||||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.!!, timeout) }
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?[Int]((idx, idx * 100), timeout) }
|
||||||
assert(Futures.fold(0, timeout)(futures)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected")
|
assert(Futures.fold(0, timeout)(futures)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -239,7 +239,7 @@ class FutureSpec extends JUnitSuite {
|
||||||
}).start()
|
}).start()
|
||||||
}
|
}
|
||||||
val timeout = 10000
|
val timeout = 10000
|
||||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.!!, timeout) }
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?[Int]((idx, idx * 200), timeout) }
|
||||||
assert(Futures.reduce(futures, timeout)(_ + _).get === 45)
|
assert(Futures.reduce(futures, timeout)(_ + _).get === 45)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -256,7 +256,7 @@ class FutureSpec extends JUnitSuite {
|
||||||
}).start()
|
}).start()
|
||||||
}
|
}
|
||||||
val timeout = 10000
|
val timeout = 10000
|
||||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.!!, timeout) }
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?[Int]((idx, idx * 100), timeout) }
|
||||||
assert(Futures.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected")
|
assert(Futures.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -269,7 +269,7 @@ class FutureSpec extends JUnitSuite {
|
||||||
def receiveShouldExecuteOnComplete {
|
def receiveShouldExecuteOnComplete {
|
||||||
val latch = new StandardLatch
|
val latch = new StandardLatch
|
||||||
val actor = actorOf[TestActor].start()
|
val actor = actorOf[TestActor].start()
|
||||||
actor !!! "Hello" onResult { case "World" ⇒ latch.open }
|
actor ? "Hello" onResult { case "World" ⇒ latch.open }
|
||||||
assert(latch.tryAwait(5, TimeUnit.SECONDS))
|
assert(latch.tryAwait(5, TimeUnit.SECONDS))
|
||||||
actor.stop()
|
actor.stop()
|
||||||
}
|
}
|
||||||
|
|
@ -285,7 +285,7 @@ class FutureSpec extends JUnitSuite {
|
||||||
}
|
}
|
||||||
}).start()
|
}).start()
|
||||||
|
|
||||||
val oddFutures: List[Future[Int]] = List.fill(100)(oddActor !!! 'GetNext)
|
val oddFutures: List[Future[Int]] = List.fill(100)(oddActor ? 'GetNext)
|
||||||
assert(Future.sequence(oddFutures).get.sum === 10000)
|
assert(Future.sequence(oddFutures).get.sum === 10000)
|
||||||
oddActor.stop()
|
oddActor.stop()
|
||||||
|
|
||||||
|
|
@ -342,7 +342,7 @@ class FutureSpec extends JUnitSuite {
|
||||||
val actor = actorOf[TestActor].start
|
val actor = actorOf[TestActor].start
|
||||||
|
|
||||||
val x = Future("Hello")
|
val x = Future("Hello")
|
||||||
val y = x flatMap (actor !!! _)
|
val y = x flatMap (actor ? _)
|
||||||
|
|
||||||
val r = flow(x() + " " + y[String]() + "!")
|
val r = flow(x() + " " + y[String]() + "!")
|
||||||
|
|
||||||
|
|
@ -370,7 +370,7 @@ class FutureSpec extends JUnitSuite {
|
||||||
val actor = actorOf[TestActor].start
|
val actor = actorOf[TestActor].start
|
||||||
|
|
||||||
val x = Future(3)
|
val x = Future(3)
|
||||||
val y = actor !!! "Hello"
|
val y = actor ? "Hello"
|
||||||
|
|
||||||
val r = flow(x() + y[Int](), 100)
|
val r = flow(x() + y[Int](), 100)
|
||||||
|
|
||||||
|
|
@ -384,7 +384,7 @@ class FutureSpec extends JUnitSuite {
|
||||||
val actor = actorOf[TestActor].start
|
val actor = actorOf[TestActor].start
|
||||||
|
|
||||||
val x = Future("Hello")
|
val x = Future("Hello")
|
||||||
val y = actor !!! "Hello"
|
val y = actor ? "Hello"
|
||||||
|
|
||||||
val r = flow(x() + y())
|
val r = flow(x() + y())
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ class PriorityDispatcherSpec extends WordSpec with MustMatchers {
|
||||||
|
|
||||||
dispatcher.resume(actor) //Signal the actor to start treating it's message backlog
|
dispatcher.resume(actor) //Signal the actor to start treating it's message backlog
|
||||||
|
|
||||||
actor.!!![List[Int]]('Result).await.result.get must be === (msgs.reverse)
|
actor.?[List[Int]]('Result).await.result.get must be === (msgs.reverse)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -89,7 +89,7 @@ class ActorRegistrySpec extends JUnitSuite {
|
||||||
val actor2 = actorOf[TestActor]("test-actor-2").start
|
val actor2 = actorOf[TestActor]("test-actor-2").start
|
||||||
val results = new ConcurrentLinkedQueue[Future[String]]
|
val results = new ConcurrentLinkedQueue[Future[String]]
|
||||||
|
|
||||||
Actor.registry.local.foreach(actor ⇒ results.add(actor.!!))
|
Actor.registry.local.foreach(actor ⇒ results.add(actor.?[String]("ping")))
|
||||||
|
|
||||||
assert(results.size === 2)
|
assert(results.size === 2)
|
||||||
val i = results.iterator
|
val i = results.iterator
|
||||||
|
|
|
||||||
|
|
@ -237,7 +237,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
|
||||||
}).start()
|
}).start()
|
||||||
|
|
||||||
try {
|
try {
|
||||||
(for (count ← 1 to 500) yield pool.!!) foreach {
|
(for (count ← 1 to 500) yield pool.?[String]("Test", 20000)) foreach {
|
||||||
_.await.resultOrException.get must be("Response")
|
_.await.resultOrException.get must be("Response")
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
@ -283,7 +283,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
|
||||||
latch = TestLatch(loops)
|
latch = TestLatch(loops)
|
||||||
count.set(0)
|
count.set(0)
|
||||||
for (m ← 0 until loops) {
|
for (m ← 0 until loops) {
|
||||||
pool !!! t
|
pool ? t
|
||||||
sleepFor(50 millis)
|
sleepFor(50 millis)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ import org.scalatest.matchers.MustMatchers
|
||||||
|
|
||||||
class Ticket703Spec extends WordSpec with MustMatchers {
|
class Ticket703Spec extends WordSpec with MustMatchers {
|
||||||
|
|
||||||
"A !!! call to an actor pool" should {
|
"A ? call to an actor pool" should {
|
||||||
"reuse the proper timeout" in {
|
"reuse the proper timeout" in {
|
||||||
val actorPool = actorOf(
|
val actorPool = actorOf(
|
||||||
new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
|
new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
|
||||||
|
|
@ -28,7 +28,7 @@ class Ticket703Spec extends WordSpec with MustMatchers {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}).start()
|
}).start()
|
||||||
(actorPool.!!).await.result must be === Some("Response")
|
(actorPool.?[String]("Ping", 7000)).await.result must be === Some("Response")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -333,7 +333,7 @@ object Actor extends ListenerManagement {
|
||||||
* This means that the following code is equivalent:
|
* This means that the following code is equivalent:
|
||||||
* (actor !! "foo").as[Int] (Deprecated)
|
* (actor !! "foo").as[Int] (Deprecated)
|
||||||
* and
|
* and
|
||||||
* (actor !!! "foo").as[Int] (Recommended)
|
* (actor ? "foo").as[Int] (Recommended)
|
||||||
*/
|
*/
|
||||||
implicit def futureToAnyOptionAsTypedOption(anyFuture: Future[_]) = new AnyOptionAsTypedOption({
|
implicit def futureToAnyOptionAsTypedOption(anyFuture: Future[_]) = new AnyOptionAsTypedOption({
|
||||||
try { anyFuture.await } catch { case t: FutureTimeoutException ⇒ }
|
try { anyFuture.await } catch { case t: FutureTimeoutException ⇒ }
|
||||||
|
|
@ -475,7 +475,7 @@ object Actor extends ListenerManagement {
|
||||||
*
|
*
|
||||||
* <p/>
|
* <p/>
|
||||||
* Here you find functions like:
|
* Here you find functions like:
|
||||||
* - !, !!, !!! and forward
|
* - !, !!, ? and forward
|
||||||
* - link, unlink, startLink etc
|
* - link, unlink, startLink etc
|
||||||
* - start, stop
|
* - start, stop
|
||||||
* - etc.
|
* - etc.
|
||||||
|
|
@ -543,7 +543,7 @@ trait Actor {
|
||||||
* Option[ActorRef] representation of the 'self' ActorRef reference.
|
* Option[ActorRef] representation of the 'self' ActorRef reference.
|
||||||
* <p/>
|
* <p/>
|
||||||
* Mainly for internal use, functions as the implicit sender references when invoking
|
* Mainly for internal use, functions as the implicit sender references when invoking
|
||||||
* one of the message send functions ('!', '!!' and '!!!').
|
* one of the message send functions ('!', '!!' and '?').
|
||||||
*/
|
*/
|
||||||
implicit def optionSelf: Option[ActorRef] = someSelf
|
implicit def optionSelf: Option[ActorRef] = someSelf
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -101,8 +101,8 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S
|
||||||
/**
|
/**
|
||||||
* User overridable callback/setting.
|
* User overridable callback/setting.
|
||||||
* <p/>
|
* <p/>
|
||||||
* Defines the default timeout for '!!' and '!!!' invocations,
|
* Defines the default timeout for '!!' and '?' invocations,
|
||||||
* e.g. the timeout for the future returned by the call to '!!' and '!!!'.
|
* e.g. the timeout for the future returned by the call to '!!' and '?'.
|
||||||
*/
|
*/
|
||||||
@deprecated("Will be replaced by implicit-scoped timeout on all methods that needs it, will default to timeout specified in config", "1.1")
|
@deprecated("Will be replaced by implicit-scoped timeout on all methods that needs it, will default to timeout specified in config", "1.1")
|
||||||
@BeanProperty
|
@BeanProperty
|
||||||
|
|
@ -210,7 +210,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S
|
||||||
/**
|
/**
|
||||||
* Akka Java API. <p/>
|
* Akka Java API. <p/>
|
||||||
* The reference sender future of the last received message.
|
* The reference sender future of the last received message.
|
||||||
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
|
* Is defined if the message was sent with sent with '!!' or '?', else None.
|
||||||
*/
|
*/
|
||||||
def getSenderFuture: Option[Promise[Any]] = senderFuture
|
def getSenderFuture: Option[Promise[Any]] = senderFuture
|
||||||
|
|
||||||
|
|
@ -297,7 +297,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S
|
||||||
* If you are sending messages using <code>ask</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
|
* If you are sending messages using <code>ask</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
|
||||||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||||
*/
|
*/
|
||||||
def ask[T <: AnyRef](message: AnyRef, timeout: Long, sender: ActorRef): Future[T] = !!!(message, timeout)(Option(sender)).asInstanceOf[Future[T]]
|
def ask[T <: AnyRef](message: AnyRef, timeout: Long, sender: ActorRef): Future[T] = ?(message, timeout)(Option(sender)).asInstanceOf[Future[T]]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Akka Java API. <p/>
|
* Akka Java API. <p/>
|
||||||
|
|
@ -1148,7 +1148,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The reference sender future of the last received message.
|
* The reference sender future of the last received message.
|
||||||
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
|
* Is defined if the message was sent with sent with '!!' or '?', else None.
|
||||||
*/
|
*/
|
||||||
def senderFuture(): Option[Promise[Any]] = {
|
def senderFuture(): Option[Promise[Any]] = {
|
||||||
val msg = currentMessage
|
val msg = currentMessage
|
||||||
|
|
@ -1199,14 +1199,16 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends a message asynchronously returns a future holding the eventual reply message.
|
* Sends a message asynchronously returns a future holding the eventual reply message.
|
||||||
|
* Synonymous to: ask
|
||||||
|
* Pronounced: "ask"
|
||||||
* <p/>
|
* <p/>
|
||||||
* <b>NOTE:</b>
|
* <b>NOTE:</b>
|
||||||
* Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
|
* Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
|
||||||
* implement request/response message exchanges.
|
* implement request/response message exchanges.
|
||||||
* If you are sending messages using <code>!!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
|
* If you are sending messages using <code>?</code> then you <b>have to</b> use <code>self.reply(..)</code>
|
||||||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||||
*/
|
*/
|
||||||
def !!(implicit sender: Option[ActorRef] = None): Future[T] = {
|
def ?[T](message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Future[T] = {
|
||||||
if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None)
|
if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None)
|
||||||
else throw new ActorInitializationException(
|
else throw new ActorInitializationException(
|
||||||
"Actor has not been started, you need to invoke 'actor.start()' before using it")
|
"Actor has not been started, you need to invoke 'actor.start()' before using it")
|
||||||
|
|
@ -1215,7 +1217,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒
|
||||||
/**
|
/**
|
||||||
* Forwards the message and passes the original sender actor as the sender.
|
* Forwards the message and passes the original sender actor as the sender.
|
||||||
* <p/>
|
* <p/>
|
||||||
* Works with '!', '!!' and '!!!'.
|
* Works with '!', '!!' and '?'.
|
||||||
*/
|
*/
|
||||||
def forward(message: Any)(implicit sender: Some[ActorRef]) = {
|
def forward(message: Any)(implicit sender: Some[ActorRef]) = {
|
||||||
if (isRunning) {
|
if (isRunning) {
|
||||||
|
|
|
||||||
|
|
@ -46,14 +46,14 @@ object TypedActor {
|
||||||
actor ! m
|
actor ! m
|
||||||
null
|
null
|
||||||
case m if m.returnsFuture_? ⇒
|
case m if m.returnsFuture_? ⇒
|
||||||
actor !!! m
|
actor ? m
|
||||||
case m if m.returnsJOption_? || m.returnsOption_? ⇒
|
case m if m.returnsJOption_? || m.returnsOption_? ⇒
|
||||||
(actor !!! m).as[AnyRef] match {
|
(actor ? m).as[AnyRef] match {
|
||||||
case Some(null) | None ⇒ if (m.returnsJOption_?) JOption.none[Any] else None
|
case Some(null) | None ⇒ if (m.returnsJOption_?) JOption.none[Any] else None
|
||||||
case Some(joption) ⇒ joption
|
case Some(joption) ⇒ joption
|
||||||
}
|
}
|
||||||
case m ⇒
|
case m ⇒
|
||||||
(actor !!! m).get
|
(actor ? m).get
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -278,7 +278,7 @@ sealed trait Future[+T] {
|
||||||
* continuation until the result is available.
|
* continuation until the result is available.
|
||||||
*
|
*
|
||||||
* If this Future is untyped (a Future[Nothing]), a type parameter must be explicitly provided or
|
* If this Future is untyped (a Future[Nothing]), a type parameter must be explicitly provided or
|
||||||
* execution will fail. The normal result of getting a Future from an ActorRef using !!! will return
|
* execution will fail. The normal result of getting a Future from an ActorRef using ? will return
|
||||||
* an untyped Future.
|
* an untyped Future.
|
||||||
*/
|
*/
|
||||||
def apply[A >: T](): A @cps[Future[Any]] = shift(this flatMap (_: A ⇒ Future[Any]))
|
def apply[A >: T](): A @cps[Future[Any]] = shift(this flatMap (_: A ⇒ Future[Any]))
|
||||||
|
|
@ -403,9 +403,9 @@ sealed trait Future[+T] {
|
||||||
* Example:
|
* Example:
|
||||||
* <pre>
|
* <pre>
|
||||||
* val future1 = for {
|
* val future1 = for {
|
||||||
* a <- actor !!! Req("Hello") collect { case Res(x: Int) => x }
|
* a <- actor ? Req("Hello") collect { case Res(x: Int) => x }
|
||||||
* b <- actor !!! Req(a) collect { case Res(x: String) => x }
|
* b <- actor ? Req(a) collect { case Res(x: String) => x }
|
||||||
* c <- actor !!! Req(7) collect { case Res(x: String) => x }
|
* c <- actor ? Req(7) collect { case Res(x: String) => x }
|
||||||
* } yield b + "-" + c
|
* } yield b + "-" + c
|
||||||
* </pre>
|
* </pre>
|
||||||
*/
|
*/
|
||||||
|
|
@ -468,9 +468,9 @@ sealed trait Future[+T] {
|
||||||
* Example:
|
* Example:
|
||||||
* <pre>
|
* <pre>
|
||||||
* val future1 = for {
|
* val future1 = for {
|
||||||
* a: Int <- actor !!! "Hello" // returns 5
|
* a: Int <- actor ? "Hello" // returns 5
|
||||||
* b: String <- actor !!! a // returns "10"
|
* b: String <- actor ? a // returns "10"
|
||||||
* c: String <- actor !!! 7 // returns "14"
|
* c: String <- actor ? 7 // returns "14"
|
||||||
* } yield b + "-" + c
|
* } yield b + "-" + c
|
||||||
* </pre>
|
* </pre>
|
||||||
*/
|
*/
|
||||||
|
|
@ -504,9 +504,9 @@ sealed trait Future[+T] {
|
||||||
* Example:
|
* Example:
|
||||||
* <pre>
|
* <pre>
|
||||||
* val future1 = for {
|
* val future1 = for {
|
||||||
* a: Int <- actor !!! "Hello" // returns 5
|
* a: Int <- actor ? "Hello" // returns 5
|
||||||
* b: String <- actor !!! a // returns "10"
|
* b: String <- actor ? a // returns "10"
|
||||||
* c: String <- actor !!! 7 // returns "14"
|
* c: String <- actor ? 7 // returns "14"
|
||||||
* } yield b + "-" + c
|
* } yield b + "-" + c
|
||||||
* </pre>
|
* </pre>
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -1056,7 +1056,7 @@ class DefaultClusterNode private[akka] (
|
||||||
.setMessageType(FUNCTION_FUN0_ANY)
|
.setMessageType(FUNCTION_FUN0_ANY)
|
||||||
.setPayload(ByteString.copyFrom(Serializers.Java.toBinary(f)))
|
.setPayload(ByteString.copyFrom(Serializers.Java.toBinary(f)))
|
||||||
.build
|
.build
|
||||||
val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ !!! message)
|
val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ ? message)
|
||||||
results.toList.asInstanceOf[List[Future[Any]]]
|
results.toList.asInstanceOf[List[Future[Any]]]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1082,7 +1082,7 @@ class DefaultClusterNode private[akka] (
|
||||||
.setMessageType(FUNCTION_FUN1_ARG_ANY)
|
.setMessageType(FUNCTION_FUN1_ARG_ANY)
|
||||||
.setPayload(ByteString.copyFrom(Serializers.Java.toBinary((f, arg))))
|
.setPayload(ByteString.copyFrom(Serializers.Java.toBinary((f, arg))))
|
||||||
.build
|
.build
|
||||||
val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ !!! message)
|
val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ ? message)
|
||||||
results.toList.asInstanceOf[List[Future[Any]]]
|
results.toList.asInstanceOf[List[Future[Any]]]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -56,7 +56,7 @@ object Router {
|
||||||
}
|
}
|
||||||
|
|
||||||
def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] = next match {
|
def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] = next match {
|
||||||
case Some(actor) ⇒ actor.!!!(message, timeout)(sender)
|
case Some(actor) ⇒ actor.?(message, timeout)(sender)
|
||||||
case _ ⇒ throwNoConnectionsError()
|
case _ ⇒ throwNoConnectionsError()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ in its mailbox.
|
||||||
.. sidebar:: **IMPORTANT**
|
.. sidebar:: **IMPORTANT**
|
||||||
|
|
||||||
None of these mailboxes work with blocking message send, e.g. the message
|
None of these mailboxes work with blocking message send, e.g. the message
|
||||||
send operations that are relying on futures; ``!!``, ``!!!``,
|
send operations that are relying on futures; ``!!``, ``?``,
|
||||||
``sendRequestReply`` and ``ask``. If the node has crashed
|
``sendRequestReply`` and ``ask``. If the node has crashed
|
||||||
and then restarted, the thread that was blocked waiting for the reply is gone
|
and then restarted, the thread that was blocked waiting for the reply is gone
|
||||||
and there is no way we can deliver the message.
|
and there is no way we can deliver the message.
|
||||||
|
|
|
||||||
|
|
@ -67,7 +67,7 @@ Here is the master actor:
|
||||||
|
|
||||||
A couple of things are worth explaining further.
|
A couple of things are worth explaining further.
|
||||||
|
|
||||||
First, we are passing in a ``java.util.concurrent.CountDownLatch`` to the ``Master`` actor. This latch is only used for plumbing (in this specific tutorial), to have a simple way of letting the outside world knowing when the master can deliver the result and shut down. In more idiomatic Akka code, as we will see in part two of this tutorial series, we would not use a latch but other abstractions and functions like ``Channel``, ``Future`` and ``!!!`` to achive the same thing in a non-blocking way. But for simplicity let's stick to a ``CountDownLatch`` for now.
|
First, we are passing in a ``java.util.concurrent.CountDownLatch`` to the ``Master`` actor. This latch is only used for plumbing (in this specific tutorial), to have a simple way of letting the outside world knowing when the master can deliver the result and shut down. In more idiomatic Akka code, as we will see in part two of this tutorial series, we would not use a latch but other abstractions and functions like ``Channel``, ``Future`` and ``?`` to achive the same thing in a non-blocking way. But for simplicity let's stick to a ``CountDownLatch`` for now.
|
||||||
|
|
||||||
Second, we are adding a couple of life-cycle callback methods; ``preStart`` and ``postStop``. In the ``preStart`` callback we are recording the time when the actor is started and in the ``postStop`` callback we are printing out the result (the approximation of Pi) and the time it took to calculate it. In this call we also invoke ``latch.countDown`` to tell the outside world that we are done.
|
Second, we are adding a couple of life-cycle callback methods; ``preStart`` and ``postStop``. In the ``preStart`` callback we are recording the time when the actor is started and in the ``postStop`` callback we are printing out the result (the approximation of Pi) and the time it took to calculate it. In this call we also invoke ``latch.countDown`` to tell the outside world that we are done.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -435,7 +435,7 @@ Here is the master actor::
|
||||||
|
|
||||||
A couple of things are worth explaining further.
|
A couple of things are worth explaining further.
|
||||||
|
|
||||||
First, we are passing in a ``java.util.concurrent.CountDownLatch`` to the ``Master`` actor. This latch is only used for plumbing (in this specific tutorial), to have a simple way of letting the outside world knowing when the master can deliver the result and shut down. In more idiomatic Akka code, as we will see in part two of this tutorial series, we would not use a latch but other abstractions and functions like ``Channel``, ``Future`` and ``!!!`` to achieve the same thing in a non-blocking way. But for simplicity let's stick to a ``CountDownLatch`` for now.
|
First, we are passing in a ``java.util.concurrent.CountDownLatch`` to the ``Master`` actor. This latch is only used for plumbing (in this specific tutorial), to have a simple way of letting the outside world knowing when the master can deliver the result and shut down. In more idiomatic Akka code, as we will see in part two of this tutorial series, we would not use a latch but other abstractions and functions like ``Channel``, ``Future`` and ``?`` to achieve the same thing in a non-blocking way. But for simplicity let's stick to a ``CountDownLatch`` for now.
|
||||||
|
|
||||||
Second, we are adding a couple of life-cycle callback methods; ``preStart`` and ``postStop``. In the ``preStart`` callback we are recording the time when the actor is started and in the ``postStop`` callback we are printing out the result (the approximation of Pi) and the time it took to calculate it. In this call we also invoke ``latch.countDown()`` to tell the outside world that we are done.
|
Second, we are adding a couple of life-cycle callback methods; ``preStart`` and ``postStop``. In the ``preStart`` callback we are recording the time when the actor is started and in the ``postStop`` callback we are printing out the result (the approximation of Pi) and the time it took to calculate it. In this call we also invoke ``latch.countDown()`` to tell the outside world that we are done.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -335,7 +335,7 @@ Here is the master actor::
|
||||||
|
|
||||||
A couple of things are worth explaining further.
|
A couple of things are worth explaining further.
|
||||||
|
|
||||||
First, we are passing in a ``java.util.concurrent.CountDownLatch`` to the ``Master`` actor. This latch is only used for plumbing (in this specific tutorial), to have a simple way of letting the outside world knowing when the master can deliver the result and shut down. In more idiomatic Akka code, as we will see in part two of this tutorial series, we would not use a latch but other abstractions and functions like ``Channel``, ``Future`` and ``!!!`` to achieve the same thing in a non-blocking way. But for simplicity let's stick to a ``CountDownLatch`` for now.
|
First, we are passing in a ``java.util.concurrent.CountDownLatch`` to the ``Master`` actor. This latch is only used for plumbing (in this specific tutorial), to have a simple way of letting the outside world knowing when the master can deliver the result and shut down. In more idiomatic Akka code, as we will see in part two of this tutorial series, we would not use a latch but other abstractions and functions like ``Channel``, ``Future`` and ``?`` to achieve the same thing in a non-blocking way. But for simplicity let's stick to a ``CountDownLatch`` for now.
|
||||||
|
|
||||||
Second, we are adding a couple of life-cycle callback methods; ``preStart`` and ``postStop``. In the ``preStart`` callback we are recording the time when the actor is started and in the ``postStop`` callback we are printing out the result (the approximation of Pi) and the time it took to calculate it. In this call we also invoke ``latch.countDown`` to tell the outside world that we are done.
|
Second, we are adding a couple of life-cycle callback methods; ``preStart`` and ``postStop``. In the ``preStart`` callback we are recording the time when the actor is started and in the ``postStop`` callback we are printing out the result (the approximation of Pi) and the time it took to calculate it. In this call we also invoke ``latch.countDown`` to tell the outside world that we are done.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -329,7 +329,7 @@ Here is the master actor::
|
||||||
|
|
||||||
A couple of things are worth explaining further.
|
A couple of things are worth explaining further.
|
||||||
|
|
||||||
First, we are passing in a ``java.util.concurrent.CountDownLatch`` to the ``Master`` actor. This latch is only used for plumbing (in this specific tutorial), to have a simple way of letting the outside world knowing when the master can deliver the result and shut down. In more idiomatic Akka code, as we will see in part two of this tutorial series, we would not use a latch but other abstractions and functions like ``Channel``, ``Future`` and ``!!!`` to achieve the same thing in a non-blocking way. But for simplicity let's stick to a ``CountDownLatch`` for now.
|
First, we are passing in a ``java.util.concurrent.CountDownLatch`` to the ``Master`` actor. This latch is only used for plumbing (in this specific tutorial), to have a simple way of letting the outside world knowing when the master can deliver the result and shut down. In more idiomatic Akka code, as we will see in part two of this tutorial series, we would not use a latch but other abstractions and functions like ``Channel``, ``Future`` and ``?`` to achieve the same thing in a non-blocking way. But for simplicity let's stick to a ``CountDownLatch`` for now.
|
||||||
|
|
||||||
Second, we are adding a couple of life-cycle callback methods; ``preStart`` and ``postStop``. In the ``preStart`` callback we are recording the time when the actor is started and in the ``postStop`` callback we are printing out the result (the approximation of Pi) and the time it took to calculate it. In this call we also invoke ``latch.countDown`` to tell the outside world that we are done.
|
Second, we are adding a couple of life-cycle callback methods; ``preStart`` and ``postStop``. In the ``preStart`` callback we are recording the time when the actor is started and in the ``postStop`` callback we are printing out the result (the approximation of Pi) and the time it took to calculate it. In this call we also invoke ``latch.countDown`` to tell the outside world that we are done.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -126,7 +126,7 @@ Messages are sent to an Actor through one of the “bang” methods.
|
||||||
|
|
||||||
* ! means “fire-and-forget”, e.g. send a message asynchronously and return immediately.
|
* ! means “fire-and-forget”, e.g. send a message asynchronously and return immediately.
|
||||||
* !! means “send-and-reply-eventually”, e.g. send a message asynchronously and wait for a reply through aFuture. Here you can specify a timeout. Using timeouts is very important. If no timeout is specified then the actor’s default timeout (set by the this.timeout variable in the actor) is used. This method returns an ``Option[Any]`` which will be either ``Some(result)`` if returning successfully or None if the call timed out.
|
* !! means “send-and-reply-eventually”, e.g. send a message asynchronously and wait for a reply through aFuture. Here you can specify a timeout. Using timeouts is very important. If no timeout is specified then the actor’s default timeout (set by the this.timeout variable in the actor) is used. This method returns an ``Option[Any]`` which will be either ``Some(result)`` if returning successfully or None if the call timed out.
|
||||||
* !!! sends a message asynchronously and returns a ``Future``.
|
* ? sends a message asynchronously and returns a ``Future``.
|
||||||
|
|
||||||
You can check if an Actor can handle a specific message by invoking the ``isDefinedAt`` method:
|
You can check if an Actor can handle a specific message by invoking the ``isDefinedAt`` method:
|
||||||
|
|
||||||
|
|
@ -180,11 +180,11 @@ Here are some examples:
|
||||||
Send-And-Receive-Future
|
Send-And-Receive-Future
|
||||||
^^^^^^^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
Using ``!!!`` will send a message to the receiving Actor asynchronously and will return a 'Future':
|
Using ``?`` will send a message to the receiving Actor asynchronously and will return a 'Future':
|
||||||
|
|
||||||
.. code-block:: scala
|
.. code-block:: scala
|
||||||
|
|
||||||
val future = actor !!! "Hello"
|
val future = actor ? "Hello"
|
||||||
|
|
||||||
See :ref:`futures-scala` for more information.
|
See :ref:`futures-scala` for more information.
|
||||||
|
|
||||||
|
|
@ -329,7 +329,7 @@ The same pattern holds for using the ``senderFuture`` in the section below.
|
||||||
Reply using the sender future
|
Reply using the sender future
|
||||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
If a message was sent with the ``!!`` or ``!!!`` methods, which both implements request-reply semantics using Future's, then you either have the option of replying using the ``reply`` method as above. This method will then resolve the Future. But you can also get a reference to the Future directly and resolve it yourself or if you would like to store it away to resolve it later, or pass it on to some other Actor to resolve it.
|
If a message was sent with the ``!!`` or ``?`` methods, which both implements request-reply semantics using Future's, then you either have the option of replying using the ``reply`` method as above. This method will then resolve the Future. But you can also get a reference to the Future directly and resolve it yourself or if you would like to store it away to resolve it later, or pass it on to some other Actor to resolve it.
|
||||||
|
|
||||||
The reference to the Future resides in the ``senderFuture: Option[Promise[_]]`` member field in the ``ActorRef`` class.
|
The reference to the Future resides in the ``senderFuture: Option[Promise[_]]`` member field in the ``ActorRef`` class.
|
||||||
|
|
||||||
|
|
@ -427,7 +427,7 @@ PoisonPill
|
||||||
|
|
||||||
You can also send an actor the ``akka.actor.PoisonPill`` message, which will stop the actor when the message is processed.
|
You can also send an actor the ``akka.actor.PoisonPill`` message, which will stop the actor when the message is processed.
|
||||||
|
|
||||||
If the sender is a ``Future`` (e.g. the message is sent with ``!!`` or ``!!!``), the ``Future`` will be completed with an ``akka.actor.ActorKilledException("PoisonPill")``.
|
If the sender is a ``Future`` (e.g. the message is sent with ``!!`` or ``?``), the ``Future`` will be completed with an ``akka.actor.ActorKilledException("PoisonPill")``.
|
||||||
|
|
||||||
HotSwap
|
HotSwap
|
||||||
-------
|
-------
|
||||||
|
|
@ -457,7 +457,7 @@ To hotswap the Actor using ``become``:
|
||||||
.. code-block:: scala
|
.. code-block:: scala
|
||||||
|
|
||||||
def angry: Receive = {
|
def angry: Receive = {
|
||||||
case "foo" => self reply "I am already angry!!!"
|
case "foo" => self reply "I am already angry?"
|
||||||
case "bar" => become(happy)
|
case "bar" => become(happy)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -17,11 +17,11 @@ Use with Actors
|
||||||
|
|
||||||
There are generally two ways of getting a reply from an ``Actor``: the first is by a sent message (``actor ! msg``), which only works if the original sender was an ``Actor``) and the second is through a ``Future``.
|
There are generally two ways of getting a reply from an ``Actor``: the first is by a sent message (``actor ! msg``), which only works if the original sender was an ``Actor``) and the second is through a ``Future``.
|
||||||
|
|
||||||
Using an ``Actor``\'s ``!!!`` method to send a message will return a Future. To wait for and retrieve the actual result the simplest method is:
|
Using an ``Actor``\'s ``?`` method to send a message will return a Future. To wait for and retrieve the actual result the simplest method is:
|
||||||
|
|
||||||
.. code-block:: scala
|
.. code-block:: scala
|
||||||
|
|
||||||
val future = actor !!! msg
|
val future = actor ? msg
|
||||||
val result: Any = future.get()
|
val result: Any = future.get()
|
||||||
|
|
||||||
This will cause the current thread to block and wait for the ``Actor`` to 'complete' the ``Future`` with it's reply. Due to the dynamic nature of Akka's ``Actor``\s this result will be untyped and will default to ``Nothing``. The safest way to deal with this is to cast the result to an ``Any`` as is shown in the above example. You can also use the expected result type instead of ``Any``, but if an unexpected type were to be returned you will get a ``ClassCastException``. For more elegant ways to deal with this and to use the result without blocking, refer to `Functional Futures`_.
|
This will cause the current thread to block and wait for the ``Actor`` to 'complete' the ``Future`` with it's reply. Due to the dynamic nature of Akka's ``Actor``\s this result will be untyped and will default to ``Nothing``. The safest way to deal with this is to cast the result to an ``Any`` as is shown in the above example. You can also use the expected result type instead of ``Any``, but if an unexpected type were to be returned you will get a ``ClassCastException``. For more elegant ways to deal with this and to use the result without blocking, refer to `Functional Futures`_.
|
||||||
|
|
@ -141,13 +141,13 @@ The example for comprehension above is an example of composing ``Future``\s. A c
|
||||||
|
|
||||||
.. code-block:: scala
|
.. code-block:: scala
|
||||||
|
|
||||||
val f1 = actor1 !!! msg1
|
val f1 = actor1 ? msg1
|
||||||
val f2 = actor2 !!! msg2
|
val f2 = actor2 ? msg2
|
||||||
|
|
||||||
val a: Int = f1.get()
|
val a: Int = f1.get()
|
||||||
val b: Int = f2.get()
|
val b: Int = f2.get()
|
||||||
|
|
||||||
val f3 = actor3 !!! (a + b)
|
val f3 = actor3 ? (a + b)
|
||||||
|
|
||||||
val result: String = f3.get()
|
val result: String = f3.get()
|
||||||
|
|
||||||
|
|
@ -155,13 +155,13 @@ Here we wait for the results from the first 2 ``Actor``\s before sending that re
|
||||||
|
|
||||||
.. code-block:: scala
|
.. code-block:: scala
|
||||||
|
|
||||||
val f1 = actor1 !!! msg1
|
val f1 = actor1 ? msg1
|
||||||
val f2 = actor2 !!! msg2
|
val f2 = actor2 ? msg2
|
||||||
|
|
||||||
val f3 = for {
|
val f3 = for {
|
||||||
a: Int <- f1
|
a: Int <- f1
|
||||||
b: Int <- f2
|
b: Int <- f2
|
||||||
c: String <- actor3 !!! (a + b)
|
c: String <- actor3 ? (a + b)
|
||||||
} yield c
|
} yield c
|
||||||
|
|
||||||
val result = f3.get()
|
val result = f3.get()
|
||||||
|
|
@ -173,7 +173,7 @@ This is fine when dealing with a known amount of Actors, but can grow unwieldy i
|
||||||
.. code-block:: scala
|
.. code-block:: scala
|
||||||
|
|
||||||
// oddActor returns odd numbers sequentially from 1
|
// oddActor returns odd numbers sequentially from 1
|
||||||
val listOfFutures: List[Future[Int]] = List.fill(100)(oddActor !!! GetNext)
|
val listOfFutures: List[Future[Int]] = List.fill(100)(oddActor ? GetNext)
|
||||||
|
|
||||||
// now we have a Future[List[Int]]
|
// now we have a Future[List[Int]]
|
||||||
val futureList = Future.sequence(listOfFutures)
|
val futureList = Future.sequence(listOfFutures)
|
||||||
|
|
@ -242,7 +242,7 @@ It is also possible to handle an ``Exception`` by returning a different result.
|
||||||
|
|
||||||
.. code-block:: scala
|
.. code-block:: scala
|
||||||
|
|
||||||
val future = actor !!! msg1 recover {
|
val future = actor ? msg1 recover {
|
||||||
case e: ArithmeticException => 0
|
case e: ArithmeticException => 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -95,7 +95,7 @@ case class DurableDispatcher(
|
||||||
|
|
||||||
private[akka] override def dispatch(invocation: MessageInvocation): Unit = {
|
private[akka] override def dispatch(invocation: MessageInvocation): Unit = {
|
||||||
if (invocation.senderFuture.isDefined)
|
if (invocation.senderFuture.isDefined)
|
||||||
throw new IllegalArgumentException("Durable mailboxes do not support Future-based messages from !! and !!!")
|
throw new IllegalArgumentException("Durable mailboxes do not support Future-based messages from !! and ?")
|
||||||
super.dispatch(invocation)
|
super.dispatch(invocation)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -132,7 +132,7 @@ case class DurablePinnedDispatcher(
|
||||||
|
|
||||||
private[akka] override def dispatch(invocation: MessageInvocation): Unit = {
|
private[akka] override def dispatch(invocation: MessageInvocation): Unit = {
|
||||||
if (invocation.senderFuture.isDefined)
|
if (invocation.senderFuture.isDefined)
|
||||||
throw new IllegalArgumentException("Actor has a durable mailbox that does not support !! or !!!")
|
throw new IllegalArgumentException("Actor has a durable mailbox that does not support !! or ?")
|
||||||
super.dispatch(invocation)
|
super.dispatch(invocation)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ akka {
|
||||||
|
|
||||||
actor {
|
actor {
|
||||||
timeout = 2000 # Default timeout for Future based invocations
|
timeout = 2000 # Default timeout for Future based invocations
|
||||||
# - Actor: !! && !!!
|
# - Actor: !! && ?
|
||||||
# - UntypedActor: sendRequestReply && ask
|
# - UntypedActor: sendRequestReply && ask
|
||||||
# - TypedActor: methods with non-void return type
|
# - TypedActor: methods with non-void return type
|
||||||
serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability
|
serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability
|
||||||
|
|
|
||||||
|
|
@ -120,7 +120,7 @@ class Agent[T](initialValue: T) {
|
||||||
* within the given timeout
|
* within the given timeout
|
||||||
*/
|
*/
|
||||||
def alter(f: T ⇒ T)(timeout: Long): Future[T] = {
|
def alter(f: T ⇒ T)(timeout: Long): Future[T] = {
|
||||||
def dispatch = updater.!!!(Update(f), timeout)
|
def dispatch = updater.?(Update(f), timeout)
|
||||||
if (Stm.activeTransaction) {
|
if (Stm.activeTransaction) {
|
||||||
val result = new DefaultPromise[T](timeout)
|
val result = new DefaultPromise[T](timeout)
|
||||||
get //Join xa
|
get //Join xa
|
||||||
|
|
@ -168,7 +168,7 @@ class Agent[T](initialValue: T) {
|
||||||
send((value: T) ⇒ {
|
send((value: T) ⇒ {
|
||||||
suspend
|
suspend
|
||||||
val threadBased = Actor.actorOf(new ThreadBasedAgentUpdater(this)).start()
|
val threadBased = Actor.actorOf(new ThreadBasedAgentUpdater(this)).start()
|
||||||
result completeWith threadBased.!!!(Update(f), timeout)
|
result completeWith threadBased.?(Update(f), timeout)
|
||||||
value
|
value
|
||||||
})
|
})
|
||||||
result
|
result
|
||||||
|
|
@ -178,7 +178,7 @@ class Agent[T](initialValue: T) {
|
||||||
* A future to the current value that will be completed after any currently
|
* A future to the current value that will be completed after any currently
|
||||||
* queued updates.
|
* queued updates.
|
||||||
*/
|
*/
|
||||||
def future(): Future[T] = (updater !!! Get).asInstanceOf[Future[T]]
|
def future(): Future[T] = (updater ? Get).asInstanceOf[Future[T]]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets this agent's value after all currently queued updates have completed.
|
* Gets this agent's value after all currently queued updates have completed.
|
||||||
|
|
|
||||||
|
|
@ -190,7 +190,7 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac
|
||||||
|
|
||||||
"support futures" in {
|
"support futures" in {
|
||||||
val a = TestActorRef[WorkerActor].start()
|
val a = TestActorRef[WorkerActor].start()
|
||||||
val f: Future[String] = a !!! "work"
|
val f: Future[String] = a ? "work"
|
||||||
f must be('completed)
|
f must be('completed)
|
||||||
f.get must equal("workDone")
|
f.get must equal("workDone")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -104,7 +104,7 @@ object Pi extends App {
|
||||||
val start = now
|
val start = now
|
||||||
|
|
||||||
//send calculate message
|
//send calculate message
|
||||||
master.!!.
|
master.?[Double](Calculate, timeout = 60000).
|
||||||
await.resultOrException match {//wait for the result, with a 60 seconds timeout
|
await.resultOrException match {//wait for the result, with a 60 seconds timeout
|
||||||
case Some(pi) =>
|
case Some(pi) =>
|
||||||
EventHandler.info(this, "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start)))
|
EventHandler.info(this, "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start)))
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ akka {
|
||||||
|
|
||||||
actor {
|
actor {
|
||||||
timeout = 5 # Default timeout for Future based invocations
|
timeout = 5 # Default timeout for Future based invocations
|
||||||
# - Actor: !! && !!!
|
# - Actor: !! && ?
|
||||||
# - UntypedActor: sendRequestReply && ask
|
# - UntypedActor: sendRequestReply && ask
|
||||||
# - TypedActor: methods with non-void return type
|
# - TypedActor: methods with non-void return type
|
||||||
serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability
|
serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue