add ReliableProxy pattern to demo akka-contrib
This commit is contained in:
parent
26157d0521
commit
0afc3b1721
10 changed files with 731 additions and 54 deletions
|
|
@ -14,8 +14,12 @@ A module in this subproject doesn't have to obey the rule of staying binary comp
|
|||
|
||||
## Suggested Format of Contributions
|
||||
|
||||
Each contribution should be a self-contained unit, consisting of one source file without dependencies to other modules in this subproject (it may depend on anything else in the Akka distribution, though). This ensures that contributions may be moved into the standard distribution individually.
|
||||
Each contribution should be a self-contained unit, consisting of one source file or one exclusively used package, without dependencies to other modules in this subproject; it may depend on everything else in the Akka distribution, though. This ensures that contributions may be moved into the standard distribution individually. The module shall be within a subpackage of `akka.contrib`.
|
||||
|
||||
Each module must be accompanied by a test suite which verifies that the provided features work, possibly complemented by integration and unit tests. The tests should follow the [Developer Guidelines](http://doc.akka.io/docs/akka/current/dev/developer-guidelines.html#testing) and go into the `src/test/scala` or `src/test/java` directories (with package name matching the module which is being tested). As an example, if the module were called `akka.contrib.pattern.ReliableProxy`, then the test suite should be called `akka.contrib.pattern.ReliableProxySpec`.
|
||||
|
||||
Each module must also have proper documentation in [reStructured Text format](http://sphinx.pocoo.org/rest.html). The documentation should be a single `<module>.rst` file in the `akka-contrib/docs` directory, including a link from `index.rst`.
|
||||
|
||||
## Suggested Way of Using these Contributions
|
||||
|
||||
Since the Akka team does not restrict updates to this subproject even during otherwise binary compatible releases, and modules may be removed without deprecation, it is suggested to copy the source files into your own code base, changing the package name. This way you can choose when to update or which fixes to include (to keep binary compatibility if needed) and later releases of Akka do not potentially break your application.
|
||||
Since the Akka team does not restrict updates to this subproject even during otherwise binary compatible releases, and modules may be removed without deprecation, it is suggested to copy the source files into your own code base, changing the package name. This way you can choose when to update or which fixes to include (to keep binary compatibility if needed) and later releases of Akka do not potentially break your application.
|
||||
|
|
|
|||
BIN
akka-contrib/docs/ReliableProxy.png
Normal file
BIN
akka-contrib/docs/ReliableProxy.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 32 KiB |
66
akka-contrib/docs/index.rst
Normal file
66
akka-contrib/docs/index.rst
Normal file
|
|
@ -0,0 +1,66 @@
|
|||
External Contributions
|
||||
======================
|
||||
|
||||
This subproject provides a home to modules contributed by external developers
|
||||
which may or may not move into the officially supported code base over time.
|
||||
The conditions under which this transition can occur include:
|
||||
|
||||
* there must be enough interest in the module to warrant inclusion in the
|
||||
standard distribution,
|
||||
* the module must be actively maintained and
|
||||
* code quality must be good enough to allow efficient maintenance by the Akka
|
||||
core development team
|
||||
|
||||
If a contributions turns out to not “take off” it may be removed again at a
|
||||
later time.
|
||||
|
||||
Caveat Emptor
|
||||
-------------
|
||||
|
||||
A module in this subproject doesn't have to obey the rule of staying binary
|
||||
compatible between minor releases. Breaking API changes may be introduced in
|
||||
minor releases without notice as we refine and simplify based on your feedback.
|
||||
A module may be dropped in any release without prior deprecation. The Typesafe
|
||||
subscription does not cover support for these modules.
|
||||
|
||||
The Current List of Modules
|
||||
---------------------------
|
||||
|
||||
.. toctree::
|
||||
|
||||
reliable-proxy
|
||||
|
||||
Suggested Way of Using these Contributions
|
||||
------------------------------------------
|
||||
|
||||
Since the Akka team does not restrict updates to this subproject even during
|
||||
otherwise binary compatible releases, and modules may be removed without
|
||||
deprecation, it is suggested to copy the source files into your own code base,
|
||||
changing the package name. This way you can choose when to update or which
|
||||
fixes to include (to keep binary compatibility if needed) and later releases of
|
||||
Akka do not potentially break your application.
|
||||
|
||||
Suggested Format of Contributions
|
||||
---------------------------------
|
||||
|
||||
Each contribution should be a self-contained unit, consisting of one source
|
||||
file or one exclusively used package, without dependencies to other modules in
|
||||
this subproject; it may depend on everything else in the Akka distribution,
|
||||
though. This ensures that contributions may be moved into the standard
|
||||
distribution individually. The module shall be within a subpackage of
|
||||
``akka.contrib``.
|
||||
|
||||
Each module must be accompanied by a test suite which verifies that the
|
||||
provided features work, possibly complemented by integration and unit tests.
|
||||
The tests should follow the :ref:`developer_guidelines` and go into the
|
||||
``src/test/scala`` or ``src/test/java`` directories (with package name matching
|
||||
the module which is being tested). As an example, if the module were called
|
||||
``akka.contrib.pattern.ReliableProxy``, then the test suite should be called
|
||||
``akka.contrib.pattern.ReliableProxySpec``.
|
||||
|
||||
Each module must also have proper documentation in `reStructured Text`_ format.
|
||||
The documentation should be a single ``<module>.rst`` file in the
|
||||
``akka-contrib/docs`` directory, including a link from ``index.rst`` (this file).
|
||||
|
||||
.. _reStructured Text: http://sphinx.pocoo.org/rest.html
|
||||
|
||||
92
akka-contrib/docs/reliable-proxy.rst
Normal file
92
akka-contrib/docs/reliable-proxy.rst
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
Reliable Proxy Pattern
|
||||
======================
|
||||
|
||||
Looking at :ref:`message-send-semantics` one might come to the conclusion that
|
||||
Akka actors are made for blue-sky scenarios: sending messages is the only way
|
||||
for actors to communicate, and then that is not even guaranteed to work. Is the
|
||||
whole paradigm built on sand? Of course the answer is an emphatic “No!”.
|
||||
|
||||
A local message send—within the same JVM instance—is not likely to fail, and if
|
||||
it does the reason was one of
|
||||
|
||||
* it was meant to fail (due to consciously choosing a bounded mailbox, which
|
||||
upon overflow will have to drop messages)
|
||||
* or it failed due to a catastrophic VM error, e.g. an
|
||||
:class:`OutOfMemoryError`, a memory access violation (“segmentation fault”,
|
||||
GPF, etc.), JVM bug—or someone calling ``System.exit()``.
|
||||
|
||||
In all of these cases, the actor was very likely not in a position to process
|
||||
the message anyway, so this part of the non-guarantee is not problematic.
|
||||
|
||||
It is a lot more likely for an unintended message delivery failure to occur
|
||||
when a message send crosses JVM boundaries, i.e. an intermediate unreliable
|
||||
network is involved. If someone unplugs an ethernet cable, or a power failure
|
||||
shuts down a router, messages will be lost while the actors would be able to
|
||||
process them just fine.
|
||||
|
||||
.. note::
|
||||
|
||||
This does not mean that message send semantics are different between local
|
||||
and remote operations, it just means that in practice there is a difference
|
||||
between how good the “best effort” is.
|
||||
|
||||
Introducing the Reliable Proxy
|
||||
------------------------------
|
||||
|
||||
.. image:: ReliableProxy.png
|
||||
|
||||
To bridge the disparity between “local” and “remote” sends is the goal of this
|
||||
pattern. When sending from A to B must be as reliable as in-JVM, regardless of
|
||||
the deployment, then you can interject a reliable tunnel and send through that
|
||||
instead. The tunnel consists of two end-points, where the ingress point P (the
|
||||
“proxy”) is a child of A and the egress point E is a child of P, deployed onto
|
||||
the same network node where B lives. Messages sent to P will be wrapped in an
|
||||
envelope, tagged with a sequence number and sent to E, who verifies that the
|
||||
received envelope has the right sequence number (the next expected one) and
|
||||
forwards the contained message to B. When B receives this message, the
|
||||
``sender`` will be a reference to the sender of the original message to P.
|
||||
Reliability is added by E replying to orderly received messages with an ACK, so
|
||||
that P can tick those messages off its resend list. If ACKs do not come in a
|
||||
timely fashion, P will try to resend until successful.
|
||||
|
||||
Exactly what does it guarantee?
|
||||
-------------------------------
|
||||
|
||||
Sending via a :class:`ReliableProxy` makes the message send exactly as reliable
|
||||
as if the represented target were to live within the same JVM, provided that
|
||||
the remote actor system does not terminate. In effect, both ends (i.e. JVM and
|
||||
actor system) must be considered as one when evaluating the reliability of this
|
||||
communication channel. The benefit is that the network in-between is taken out
|
||||
of that equation.
|
||||
|
||||
When the target actor terminates, the proxy will terminate as well (on the
|
||||
terms of :ref:`deathwatch-java` / :ref:`deathwath-scala`).
|
||||
|
||||
How to use it
|
||||
-------------
|
||||
|
||||
Since this implementation does not offer much in the way of configuration,
|
||||
simply instantiate a proxy wrapping some target reference. From Java it looks
|
||||
like this:
|
||||
|
||||
.. includecode:: ../src/test/java/akka/contrib/pattern/ReliableProxyTest.java#imports
|
||||
.. includecode:: ../src/test/java/akka/contrib/pattern/ReliableProxyTest.java#demo-proxy
|
||||
|
||||
And from Scala like this:
|
||||
|
||||
.. includecode:: ../src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala#demo
|
||||
|
||||
Since the :class:`ReliableProxy` actor is an :ref:`fsm-scala`, it also offers
|
||||
the capability to subscribe to state transitions. If you need to know when all
|
||||
enqueued messages have been received by the remote end-point (and consequently
|
||||
been forwarded to the target), you can subscribe to the FSM notifications and
|
||||
observe a transition from state :class:`ReliableProxy.Active` to state
|
||||
:class:`ReliableProxy.Idle`.
|
||||
|
||||
.. includecode:: ../src/test/java/akka/contrib/pattern/ReliableProxyTest.java#demo-transition
|
||||
|
||||
From Scala it would look like so:
|
||||
|
||||
.. includecode:: ../src/test/scala/akka/contrib/pattern/ReliableProxyDocSpec.scala#demo-transition
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,167 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.contrib.pattern
|
||||
|
||||
import akka.actor._
|
||||
import akka.remote.RemoteScope
|
||||
import scala.concurrent.util._
|
||||
|
||||
object ReliableProxy {
|
||||
|
||||
class Receiver(target: ActorRef) extends Actor with ActorLogging {
|
||||
var lastSerial = 0
|
||||
|
||||
context.watch(target)
|
||||
|
||||
def receive = {
|
||||
case Message(msg, snd, serial) ⇒
|
||||
if (serial == lastSerial + 1) {
|
||||
target.tell(msg, snd)
|
||||
sender ! Ack(serial)
|
||||
lastSerial = serial
|
||||
} else if (compare(serial, lastSerial) <= 0) {
|
||||
sender ! Ack(serial)
|
||||
} else {
|
||||
log.debug("received msg of {} from {} with wrong serial", msg.asInstanceOf[AnyRef].getClass, snd)
|
||||
}
|
||||
case Terminated(`target`) ⇒ context stop self
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrap-around aware comparison of integers: differences limited to 2^31-1
|
||||
* in magnitude will work correctly.
|
||||
*/
|
||||
def compare(a: Int, b: Int): Int = (a - b) match {
|
||||
case x if x < 0 ⇒ -1
|
||||
case x if x == 0 ⇒ 0
|
||||
case x if x > 0 ⇒ 1
|
||||
}
|
||||
|
||||
case class Message(msg: Any, sender: ActorRef, serial: Int)
|
||||
case class Ack(serial: Int)
|
||||
case object Tick
|
||||
|
||||
def receiver(target: ActorRef): Props = Props(new Receiver(target))
|
||||
|
||||
sealed trait State
|
||||
case object Idle extends State
|
||||
case object Active extends State
|
||||
|
||||
// Java API
|
||||
val idle = Idle
|
||||
val active = Active
|
||||
}
|
||||
|
||||
import ReliableProxy._
|
||||
|
||||
/**
|
||||
* A ReliableProxy is a means to wrap a remote actor reference in order to
|
||||
* obtain certain improved delivery guarantees:
|
||||
*
|
||||
* - as long as none of the JVMs crashes and the proxy and its remote-deployed
|
||||
* peer are not forcefully terminated or restarted, no messages will be lost
|
||||
* - messages re-sent due to the first point will not be delivered out-of-order,
|
||||
* message ordering is preserved
|
||||
*
|
||||
* These guarantees are valid for the communication between the two end-points
|
||||
* of the reliable “tunnel”, which usually spans an unreliable network. Delivery
|
||||
* from the remote end-point to the target actor is still subject to in-JVM
|
||||
* delivery semantics (i.e. not strictly guaranteed due to possible OutOfMemory
|
||||
* situations or other VM errors).
|
||||
*
|
||||
* You can create a reliable connection like this:
|
||||
* {{{
|
||||
* val proxy = context.actorOf(Props(new ReliableProxy(target)))
|
||||
* }}}
|
||||
* or in Java:
|
||||
* {{{
|
||||
* final ActorRef proxy = getContext().actorOf(new Props(new UntypedActorFactory() {
|
||||
* public Actor create() {
|
||||
* return new ReliableProxy(target);
|
||||
* }
|
||||
* }));
|
||||
* }}}
|
||||
*
|
||||
* '''''Please note:''''' the tunnel is uni-directional, and original sender
|
||||
* information is retained, hence replies by the wrapped target reference will
|
||||
* go back in the normal “unreliable” way unless also secured by a ReliableProxy
|
||||
* from the remote end.
|
||||
*
|
||||
* ==Message Types==
|
||||
*
|
||||
* This actor is an [[akka.actor.FSM]], hence it offers the service of
|
||||
* transition callbacks to those actors which subscribe using the
|
||||
* ``SubscribeTransitionCallBack`` and ``UnsubscribeTransitionCallBack``
|
||||
* messages; see [[akka.actor.FSM]] for more documentation. The proxy will
|
||||
* transition into [[ReliableProxy.Active]] state when ACKs are outstanding and
|
||||
* return to the [[ReliableProxy.Idle]] state when every message send so far
|
||||
* has been confirmed by the peer end-point.
|
||||
* Any other message type sent to this actor will be delivered via a remote-deployed
|
||||
* child actor to the designated target. Message types declared in the companion
|
||||
* object are for internal use only and not to be sent from the outside.
|
||||
*
|
||||
* ==Failure Cases==
|
||||
*
|
||||
* All failures of either the local or the remote end-point are escalated to the
|
||||
* parent of this actor; there are no specific error cases which are predefined.
|
||||
*
|
||||
* ==Arguments==
|
||||
*
|
||||
* '''''target''''' is the [[akka.actor.ActorRef]] to which all messages will be
|
||||
* forwarded which are sent to this actor. It can be any type of actor reference,
|
||||
* but the “remote” tunnel endpoint will be deployed on the node where the target
|
||||
* ref points to.
|
||||
*
|
||||
* '''''retryAfter''''' is the ACK timeout after which all outstanding messages
|
||||
* will be resent. There is not limit on the queue size or the number of retries.
|
||||
*/
|
||||
class ReliableProxy(target: ActorRef, retryAfter: FiniteDuration) extends Actor with FSM[State, Vector[Message]] {
|
||||
|
||||
val tunnel = context.actorOf(receiver(target).withDeploy(Deploy(scope = RemoteScope(target.path.address))), "tunnel")
|
||||
context.watch(tunnel)
|
||||
|
||||
override def supervisorStrategy = OneForOneStrategy() {
|
||||
case _ ⇒ SupervisorStrategy.Escalate
|
||||
}
|
||||
|
||||
startWith(Idle, Vector.empty)
|
||||
|
||||
when(Idle) {
|
||||
case Event(Terminated(`tunnel`), _) ⇒ stop
|
||||
case Event(Ack(_), _) ⇒ stay
|
||||
case Event(msg, _) ⇒ goto(Active) using Vector(send(msg))
|
||||
}
|
||||
|
||||
onTransition {
|
||||
case Idle -> Active ⇒ scheduleTick()
|
||||
case Active -> Idle ⇒ cancelTimer("resend")
|
||||
}
|
||||
|
||||
when(Active) {
|
||||
case Event(Terminated(`tunnel`), _) ⇒ stop
|
||||
case Event(Ack(serial), queue) ⇒
|
||||
val q = queue dropWhile (m ⇒ compare(m.serial, serial) <= 0)
|
||||
scheduleTick()
|
||||
if (q.isEmpty) goto(Idle) using Vector.empty
|
||||
else stay using q
|
||||
case Event(Tick, queue) ⇒
|
||||
queue foreach (tunnel ! _)
|
||||
scheduleTick()
|
||||
stay
|
||||
case Event(msg, queue) ⇒ stay using (queue :+ send(msg))
|
||||
}
|
||||
|
||||
def scheduleTick(): Unit = setTimer("resend", Tick, retryAfter, false)
|
||||
|
||||
var nextSerial = 1
|
||||
def send(msg: Any): Message = {
|
||||
val m = Message(msg, sender, nextSerial)
|
||||
nextSerial += 1
|
||||
tunnel ! m
|
||||
m
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,196 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.contrib.pattern
|
||||
|
||||
import language.postfixOps
|
||||
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.remote.testkit.STMultiNodeSpec
|
||||
import org.scalatest.BeforeAndAfterEach
|
||||
import akka.remote.testconductor.Direction
|
||||
import akka.actor.Props
|
||||
import akka.actor.Actor
|
||||
import akka.testkit.ImplicitSender
|
||||
import scala.concurrent.util.duration._
|
||||
import akka.actor.FSM
|
||||
import akka.actor.ActorRef
|
||||
import akka.testkit.TestProbe
|
||||
|
||||
object ReliableProxySpec extends MultiNodeConfig {
|
||||
val local = role("local")
|
||||
val remote = role("remote")
|
||||
}
|
||||
|
||||
class ReliableProxyMultiJvmNode1 extends ReliableProxySpec
|
||||
class ReliableProxyMultiJvmNode2 extends ReliableProxySpec
|
||||
|
||||
class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNodeSpec with BeforeAndAfterEach with ImplicitSender {
|
||||
import ReliableProxySpec._
|
||||
import ReliableProxy._
|
||||
|
||||
override def initialParticipants = 2
|
||||
|
||||
override def afterEach {
|
||||
runOn(local) {
|
||||
testConductor.throttle(local, remote, Direction.Both, -1).await
|
||||
}
|
||||
}
|
||||
|
||||
runOn(remote) {
|
||||
system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case x ⇒ testActor ! x
|
||||
}
|
||||
}), "echo")
|
||||
}
|
||||
|
||||
val target = system.actorFor(node(remote) / "user" / "echo")
|
||||
|
||||
var proxy: ActorRef = _
|
||||
def expectState(s: State) = expectMsg(FSM.CurrentState(proxy, s))
|
||||
def expectTransition(s1: State, s2: State) = expectMsg(FSM.Transition(proxy, s1, s2))
|
||||
|
||||
runOn(local) {
|
||||
//#demo
|
||||
import akka.contrib.pattern.ReliableProxy
|
||||
|
||||
proxy = system.actorOf(Props(new ReliableProxy(target, 100.millis)), "proxy")
|
||||
//#demo
|
||||
proxy ! FSM.SubscribeTransitionCallBack(testActor)
|
||||
expectState(Idle)
|
||||
//#demo
|
||||
proxy ! "hello"
|
||||
//#demo
|
||||
expectTransition(Idle, Active)
|
||||
expectTransition(Active, Idle)
|
||||
}
|
||||
runOn(remote) {
|
||||
expectMsg("hello")
|
||||
}
|
||||
|
||||
"A ReliableProxy" must {
|
||||
|
||||
"forward messages in sequence" in {
|
||||
runOn(local) {
|
||||
(1 to 100) foreach (proxy ! _)
|
||||
expectTransition(Idle, Active)
|
||||
expectTransition(Active, Idle)
|
||||
}
|
||||
runOn(remote) {
|
||||
within(1 second) {
|
||||
(1 to 100) foreach { n ⇒ expectMsg(n); lastSender must be === target }
|
||||
}
|
||||
}
|
||||
|
||||
enterBarrier("test1a")
|
||||
|
||||
runOn(local) {
|
||||
(1 to 100) foreach (proxy ! _)
|
||||
expectTransition(Idle, Active)
|
||||
expectTransition(Active, Idle)
|
||||
}
|
||||
runOn(remote) {
|
||||
within(1 second) {
|
||||
(1 to 100) foreach { n ⇒ expectMsg(n); lastSender must be === target }
|
||||
}
|
||||
}
|
||||
|
||||
enterBarrier("test1b")
|
||||
}
|
||||
|
||||
"retry when sending fails" in {
|
||||
runOn(local) {
|
||||
testConductor.blackhole(local, remote, Direction.Send).await
|
||||
(1 to 100) foreach (proxy ! _)
|
||||
within(1 second) {
|
||||
expectTransition(Idle, Active)
|
||||
expectNoMsg
|
||||
}
|
||||
}
|
||||
|
||||
enterBarrier("test2a")
|
||||
|
||||
runOn(remote) {
|
||||
expectNoMsg(0 seconds)
|
||||
}
|
||||
|
||||
enterBarrier("test2b")
|
||||
|
||||
runOn(local) {
|
||||
testConductor.throttle(local, remote, Direction.Send, -1)
|
||||
expectTransition(Active, Idle)
|
||||
}
|
||||
runOn(remote) {
|
||||
within(1 second) {
|
||||
(1 to 100) foreach { n ⇒ expectMsg(n); lastSender must be === target }
|
||||
}
|
||||
}
|
||||
|
||||
enterBarrier("test2c")
|
||||
}
|
||||
|
||||
"retry when receiving fails" in {
|
||||
runOn(local) {
|
||||
testConductor.blackhole(local, remote, Direction.Receive).await
|
||||
(1 to 100) foreach (proxy ! _)
|
||||
within(1 second) {
|
||||
expectTransition(Idle, Active)
|
||||
expectNoMsg
|
||||
}
|
||||
}
|
||||
runOn(remote) {
|
||||
within(1 second) {
|
||||
(1 to 100) foreach { n ⇒ expectMsg(n); lastSender must be === target }
|
||||
}
|
||||
}
|
||||
|
||||
enterBarrier("test3a")
|
||||
|
||||
runOn(local) {
|
||||
testConductor.throttle(local, remote, Direction.Receive, -1)
|
||||
expectTransition(Active, Idle)
|
||||
}
|
||||
|
||||
enterBarrier("test3b")
|
||||
}
|
||||
|
||||
"resend across a slow link" in {
|
||||
runOn(local) {
|
||||
testConductor.throttle(local, remote, Direction.Send, rateMBit = 0.1).await
|
||||
(1 to 50) foreach (proxy ! _)
|
||||
within(5 seconds) {
|
||||
expectTransition(Idle, Active)
|
||||
expectTransition(Active, Idle)
|
||||
}
|
||||
}
|
||||
runOn(remote) {
|
||||
within(5 seconds) {
|
||||
(1 to 50) foreach { n ⇒ expectMsg(n); lastSender must be === target }
|
||||
}
|
||||
}
|
||||
|
||||
enterBarrier("test4a")
|
||||
|
||||
runOn(local) {
|
||||
testConductor.throttle(local, remote, Direction.Send, rateMBit = -1).await
|
||||
testConductor.throttle(local, remote, Direction.Receive, rateMBit = 0.1).await
|
||||
(1 to 50) foreach (proxy ! _)
|
||||
within(5 seconds) {
|
||||
expectTransition(Idle, Active)
|
||||
expectTransition(Active, Idle)
|
||||
}
|
||||
}
|
||||
runOn(remote) {
|
||||
within(1 second) {
|
||||
(1 to 50) foreach { n ⇒ expectMsg(n); lastSender must be === target }
|
||||
}
|
||||
}
|
||||
|
||||
enterBarrier("test4a")
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,123 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.contrib.pattern;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import scala.concurrent.util.Duration;
|
||||
import scala.concurrent.util.FiniteDuration;
|
||||
import akka.actor.Actor;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.FSM;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.actor.UntypedActorFactory;
|
||||
import akka.testkit.TestProbe;
|
||||
|
||||
//#import
|
||||
import akka.contrib.pattern.ReliableProxy;
|
||||
//#import
|
||||
|
||||
public class ReliableProxyTest {
|
||||
|
||||
private static ActorSystem system;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
system.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void demonstrateUsage() {
|
||||
final TestProbe probe = TestProbe.apply(system);
|
||||
final ActorRef target = probe.ref();
|
||||
final ActorRef parent = system.actorOf(new Props(new UntypedActorFactory() {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public Actor create() {
|
||||
return new UntypedActor() {
|
||||
|
||||
//#demo-proxy
|
||||
final ActorRef proxy = getContext().actorOf(
|
||||
new Props(new UntypedActorFactory() {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public Actor create() {
|
||||
final FiniteDuration retry = Duration.create(100, "millis");
|
||||
return new ReliableProxy(target, retry);
|
||||
}
|
||||
}));
|
||||
|
||||
public void onReceive(Object msg) {
|
||||
if ("hello".equals(msg)) {
|
||||
proxy.tell("world!", getSelf());
|
||||
}
|
||||
}
|
||||
//#demo-proxy
|
||||
};
|
||||
}
|
||||
}));
|
||||
parent.tell("hello", null);
|
||||
probe.expectMsg("world!");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void demonstrateTransitions() {
|
||||
final ActorRef target = system.deadLetters();
|
||||
final ActorRef parent = system.actorOf(new Props(new UntypedActorFactory() {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public Actor create() {
|
||||
return new UntypedActor() {
|
||||
|
||||
//#demo-transition
|
||||
final ActorRef proxy = getContext().actorOf(
|
||||
new Props(new UntypedActorFactory() {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public Actor create() {
|
||||
final FiniteDuration retry = Duration.create(100, "millis");
|
||||
return new ReliableProxy(target, retry);
|
||||
}
|
||||
}));
|
||||
ActorRef client = null;
|
||||
|
||||
{
|
||||
proxy.tell(new FSM.SubscribeTransitionCallBack(getSelf()), getSelf());
|
||||
}
|
||||
|
||||
public void onReceive(Object msg) {
|
||||
if ("hello".equals(msg)) {
|
||||
proxy.tell("world!", getSelf());
|
||||
client = getSender();
|
||||
} else if (msg instanceof FSM.CurrentState<?>) {
|
||||
// get initial state
|
||||
} else if (msg instanceof FSM.Transition<?>) {
|
||||
@SuppressWarnings("unchecked")
|
||||
final FSM.Transition<ReliableProxy.State> transition =
|
||||
(FSM.Transition<ReliableProxy.State>) msg;
|
||||
assert transition.fsmRef().equals(proxy);
|
||||
if (transition.to().equals(ReliableProxy.idle())) {
|
||||
client.tell("done", getSelf());
|
||||
}
|
||||
}
|
||||
}
|
||||
//#demo-transition
|
||||
};
|
||||
}
|
||||
}));
|
||||
final TestProbe probe = TestProbe.apply(system);
|
||||
parent.tell("hello", probe.ref());
|
||||
probe.expectMsg("done");
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,41 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.contrib.pattern
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.actor.Props
|
||||
import akka.actor.Actor
|
||||
import akka.testkit.ImplicitSender
|
||||
import scala.concurrent.util.duration._
|
||||
import akka.actor.FSM
|
||||
import akka.actor.ActorRef
|
||||
|
||||
class ReliableProxyDocSpec extends AkkaSpec with ImplicitSender {
|
||||
|
||||
"A ReliableProxy" must {
|
||||
|
||||
"show state transitions" in {
|
||||
val target = system.deadLetters
|
||||
val a = system.actorOf(Props(new Actor {
|
||||
//#demo-transition
|
||||
val proxy = context.actorOf(Props(new ReliableProxy(target, 100.millis)))
|
||||
proxy ! FSM.SubscribeTransitionCallBack(self)
|
||||
|
||||
var client: ActorRef = _
|
||||
|
||||
def receive = {
|
||||
case "go" ⇒ proxy ! 42; client = sender
|
||||
case FSM.CurrentState(`proxy`, initial) ⇒
|
||||
case FSM.Transition(`proxy`, from, to) ⇒ if (to == ReliableProxy.Idle) client ! "done"
|
||||
}
|
||||
//#demo-transition
|
||||
}))
|
||||
a ! "go"
|
||||
expectMsg("done")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -16,12 +16,18 @@ in minor releases without notice as we refine and simplify based on your
|
|||
feedback. An experimental module may be dropped in major releases without
|
||||
prior deprecation.
|
||||
|
||||
Another reason for marking a module as experimental is that it's too early
|
||||
to tell if the module has a maintainer that can take the responsibility
|
||||
of the module over time.
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 1
|
||||
|
||||
../cluster/index
|
||||
../dev/multi-node-testing
|
||||
|
||||
Another reason for marking a module as experimental is that it's too early
|
||||
to tell if the module has a maintainer that can take the responsibility
|
||||
of the module over time. These modules live in the ``akka-contrib`` subproject:
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
|
||||
../b/../../../akka-contrib/docs/index.rst
|
||||
|
||||
|
|
|
|||
|
|
@ -135,7 +135,7 @@ object AkkaBuild extends Build {
|
|||
id = "akka-remote-tests-experimental",
|
||||
base = file("akka-remote-tests"),
|
||||
dependencies = Seq(remote, actorTests % "test->test", testkit),
|
||||
settings = defaultSettings ++ multiJvmSettings ++ Seq(
|
||||
settings = defaultSettings ++ multiJvmSettings ++ experimentalSettings ++ Seq(
|
||||
libraryDependencies ++= Dependencies.remoteTests,
|
||||
// disable parallel tests
|
||||
parallelExecution in Test := false,
|
||||
|
|
@ -143,19 +143,7 @@ object AkkaBuild extends Build {
|
|||
(name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq
|
||||
},
|
||||
scalatestOptions in MultiJvm := defaultMultiJvmScalatestOptions,
|
||||
jvmOptions in MultiJvm := defaultMultiJvmOptions,
|
||||
previousArtifact := akkaPreviousArtifact("akka-remote"),
|
||||
description := """|This module of Akka is marked as
|
||||
|experimental, which means that it is in early
|
||||
|access mode, which also means that it is not covered
|
||||
|by commercial support. An experimental module doesn't
|
||||
|have to obey the rule of staying binary compatible
|
||||
|between minor releases. Breaking API changes may be
|
||||
|introduced in minor releases without notice as we
|
||||
|refine and simplify based on your feedback. An
|
||||
|experimental module may be dropped in major releases
|
||||
|without prior deprecation.
|
||||
|""".stripMargin
|
||||
previousArtifact := akkaPreviousArtifact("akka-remote")
|
||||
)
|
||||
) configs (MultiJvm)
|
||||
|
||||
|
|
@ -163,7 +151,7 @@ object AkkaBuild extends Build {
|
|||
id = "akka-cluster-experimental",
|
||||
base = file("akka-cluster"),
|
||||
dependencies = Seq(remote, remoteTests % "test->test" , testkit % "test->test"),
|
||||
settings = defaultSettings ++ multiJvmSettings ++ OSGi.cluster ++ Seq(
|
||||
settings = defaultSettings ++ multiJvmSettings ++ OSGi.cluster ++ experimentalSettings ++ Seq(
|
||||
libraryDependencies ++= Dependencies.cluster,
|
||||
// disable parallel tests
|
||||
parallelExecution in Test := false,
|
||||
|
|
@ -171,19 +159,7 @@ object AkkaBuild extends Build {
|
|||
(name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq
|
||||
},
|
||||
scalatestOptions in MultiJvm := defaultMultiJvmScalatestOptions,
|
||||
jvmOptions in MultiJvm := defaultMultiJvmOptions,
|
||||
previousArtifact := akkaPreviousArtifact("akka-remote"),
|
||||
description := """|This module of Akka is marked as
|
||||
|experimental, which means that it is in early
|
||||
|access mode, which also means that it is not covered
|
||||
|by commercial support. An experimental module doesn't
|
||||
|have to obey the rule of staying binary compatible
|
||||
|between minor releases. Breaking API changes may be
|
||||
|introduced in minor releases without notice as we
|
||||
|refine and simplify based on your feedback. An
|
||||
|experimental module may be dropped in major releases
|
||||
|without prior deprecation.
|
||||
|""".stripMargin
|
||||
previousArtifact := akkaPreviousArtifact("akka-remote")
|
||||
)
|
||||
) configs (MultiJvm)
|
||||
|
||||
|
|
@ -354,25 +330,13 @@ object AkkaBuild extends Build {
|
|||
id = "akka-sample-cluster-experimental",
|
||||
base = file("akka-samples/akka-sample-cluster"),
|
||||
dependencies = Seq(cluster, remoteTests % "test", testkit % "test"),
|
||||
settings = sampleSettings ++ multiJvmSettings ++ Seq(
|
||||
settings = sampleSettings ++ multiJvmSettings ++ experimentalSettings ++ Seq(
|
||||
libraryDependencies ++= Dependencies.clusterSample,
|
||||
// disable parallel tests
|
||||
parallelExecution in Test := false,
|
||||
extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src =>
|
||||
(name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq
|
||||
},
|
||||
jvmOptions in MultiJvm := defaultMultiJvmOptions,
|
||||
description := """|This module of Akka is marked as
|
||||
|experimental, which means that it is in early
|
||||
|access mode, which also means that it is not covered
|
||||
|by commercial support. An experimental module doesn't
|
||||
|have to obey the rule of staying binary compatible
|
||||
|between minor releases. Breaking API changes may be
|
||||
|introduced in minor releases without notice as we
|
||||
|refine and simplify based on your feedback. An
|
||||
|experimental module may be dropped in major releases
|
||||
|without prior deprecation.
|
||||
|""".stripMargin
|
||||
}
|
||||
)
|
||||
) configs (MultiJvm)
|
||||
|
||||
|
|
@ -380,14 +344,13 @@ object AkkaBuild extends Build {
|
|||
id = "akka-sample-multi-node-experimental",
|
||||
base = file("akka-samples/akka-sample-multi-node"),
|
||||
dependencies = Seq(remoteTests % "test", testkit % "test"),
|
||||
settings = sampleSettings ++ multiJvmSettings ++ Seq(
|
||||
settings = sampleSettings ++ multiJvmSettings ++ experimentalSettings ++ Seq(
|
||||
libraryDependencies ++= Dependencies.multiNodeSample,
|
||||
// disable parallel tests
|
||||
parallelExecution in Test := false,
|
||||
extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src =>
|
||||
(name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq
|
||||
},
|
||||
jvmOptions in MultiJvm := defaultMultiJvmOptions
|
||||
}
|
||||
)
|
||||
) configs (MultiJvm)
|
||||
|
||||
|
|
@ -408,8 +371,10 @@ object AkkaBuild extends Build {
|
|||
lazy val contrib = Project(
|
||||
id = "akka-contrib",
|
||||
base = file("akka-contrib"),
|
||||
dependencies = Seq(actor),
|
||||
settings = defaultSettings ++ Seq(
|
||||
dependencies = Seq(remote, remoteTests % "compile;test->test"),
|
||||
settings = defaultSettings ++ multiJvmSettings ++ Seq(
|
||||
libraryDependencies ++= Dependencies.contrib,
|
||||
testOptions += Tests.Argument(TestFrameworks.JUnit, "-v"),
|
||||
description := """|
|
||||
|This subproject provides a home to modules contributed by external
|
||||
|developers which may or may not move into the officially supported code
|
||||
|
|
@ -421,7 +386,7 @@ object AkkaBuild extends Build {
|
|||
|support for these modules.
|
||||
|""".stripMargin
|
||||
)
|
||||
)
|
||||
) configs (MultiJvm)
|
||||
|
||||
// Settings
|
||||
|
||||
|
|
@ -448,6 +413,20 @@ object AkkaBuild extends Build {
|
|||
publishArtifact in Compile := false
|
||||
)
|
||||
|
||||
lazy val experimentalSettings = Seq(
|
||||
description := """|This module of Akka is marked as
|
||||
|experimental, which means that it is in early
|
||||
|access mode, which also means that it is not covered
|
||||
|by commercial support. An experimental module doesn't
|
||||
|have to obey the rule of staying binary compatible
|
||||
|between minor releases. Breaking API changes may be
|
||||
|introduced in minor releases without notice as we
|
||||
|refine and simplify based on your feedback. An
|
||||
|experimental module may be dropped in major releases
|
||||
|without prior deprecation.
|
||||
|""".stripMargin
|
||||
)
|
||||
|
||||
val excludeTestNames = SettingKey[Seq[String]]("exclude-test-names")
|
||||
val excludeTestTags = SettingKey[Set[String]]("exclude-test-tags")
|
||||
val includeTestTags = SettingKey[Set[String]]("include-test-tags")
|
||||
|
|
@ -589,6 +568,7 @@ object AkkaBuild extends Build {
|
|||
}
|
||||
|
||||
lazy val multiJvmSettings = SbtMultiJvm.multiJvmSettings ++ inConfig(MultiJvm)(ScalariformPlugin.scalariformSettings) ++ Seq(
|
||||
jvmOptions in MultiJvm := defaultMultiJvmOptions,
|
||||
compileInputs in MultiJvm <<= (compileInputs in MultiJvm) dependsOn (ScalariformKeys.format in MultiJvm),
|
||||
compile in MultiJvm <<= (compile in MultiJvm) triggeredBy (compile in Test),
|
||||
ScalariformKeys.preferences in MultiJvm := formattingPreferences) ++
|
||||
|
|
@ -706,6 +686,8 @@ object Dependencies {
|
|||
|
||||
val clusterSample = Seq(Test.scalatest)
|
||||
|
||||
val contrib = Seq(Test.junitIntf)
|
||||
|
||||
val multiNodeSample = Seq(Test.scalatest)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue