+doc #15537 add Typed router pattern

Conflicts:
	akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala
This commit is contained in:
Konrad 'ktoso' Malawski 2014-07-14 14:51:09 +02:00
parent 76bc8afe8c
commit ba2411833a
5 changed files with 205 additions and 24 deletions

View file

@ -3,24 +3,24 @@
*/ */
package akka.actor package akka.actor
import language.postfixOps import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ CountDownLatch, TimeUnit, TimeoutException }
import akka.actor.TypedActor._
import akka.japi.{ Option JOption }
import akka.pattern.ask
import akka.routing.RoundRobinGroup
import akka.serialization.JavaSerializer
import akka.testkit.{ AkkaSpec, DefaultTimeout, EventFilter, TimingTest, filterEvents }
import akka.util.Timeout
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.{ Await, Future, Promise }
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.testkit.{ EventFilter, filterEvents, AkkaSpec } import scala.concurrent.{ Await, Future, Promise }
import akka.util.Timeout import scala.language.postfixOps
import akka.japi.{ Option JOption } import scala.util.Random
import akka.testkit.DefaultTimeout
import akka.dispatch.Dispatchers
import akka.pattern.ask
import akka.serialization.JavaSerializer
import akka.actor.TypedActor._
import java.util.concurrent.atomic.AtomicReference
import java.lang.IllegalStateException
import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch }
import akka.testkit.TimingTest
object TypedActorSpec { object TypedActorSpec {
@ -113,7 +113,7 @@ object TypedActorSpec {
class Bar extends Foo with Serializable { class Bar extends Foo with Serializable {
import TypedActor.dispatcher import akka.actor.TypedActor.dispatcher
def pigdog = "Pigdog" def pigdog = "Pigdog"
@ -210,7 +210,7 @@ object TypedActorSpec {
class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
with BeforeAndAfterEach with BeforeAndAfterAll with DefaultTimeout { with BeforeAndAfterEach with BeforeAndAfterAll with DefaultTimeout {
import TypedActorSpec._ import akka.actor.TypedActorSpec._
def newFooBar: Foo = newFooBar(timeout.duration) def newFooBar: Foo = newFooBar(timeout.duration)
@ -516,3 +516,47 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
} }
} }
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TypedActorRouterSpec extends AkkaSpec(TypedActorSpec.config)
with BeforeAndAfterEach with BeforeAndAfterAll with DefaultTimeout {
import akka.actor.TypedActorSpec._
def newFooBar: Foo = newFooBar(timeout.duration)
def newFooBar(d: FiniteDuration): Foo =
TypedActor(system).typedActorOf(TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(Timeout(d)))
def mustStop(typedActor: AnyRef) = TypedActor(system).stop(typedActor) should be(true)
"TypedActor Router" must {
"work" in {
val t1 = newFooBar
val t2 = newFooBar
val t3 = newFooBar
val t4 = newFooBar
val routees = List(t1, t2, t3, t4) map { t TypedActor(system).getActorRefFor(t).path.toStringWithoutAddress }
TypedActor(system).isTypedActor(t1) should be(true)
TypedActor(system).isTypedActor(t2) should be(true)
val router = system.actorOf(RoundRobinGroup(routees).props(), "router")
val typedRouter = TypedActor(system).typedActorOf[Foo, Foo](TypedProps[Foo](), router)
info("got = " + typedRouter.optionPigdog())
info("got = " + typedRouter.optionPigdog())
info("got = " + typedRouter.optionPigdog())
info("got = " + typedRouter.optionPigdog())
info("got = " + typedRouter.optionPigdog())
mustStop(t1)
mustStop(t2)
mustStop(t3)
mustStop(t4)
}
}
}

View file

@ -9,16 +9,22 @@ import akka.actor.TypedActor;
import akka.actor.*; import akka.actor.*;
import akka.japi.*; import akka.japi.*;
import akka.dispatch.Futures; import akka.dispatch.Futures;
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.Future; import scala.concurrent.Future;
import scala.concurrent.duration.Duration; import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.ArrayList;
import java.util.Random;
import akka.routing.RoundRobinGroup;
//#imports //#imports
import java.lang.Exception;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*; import static org.junit.Assert.assertEquals;
//#imports
public class TypedActorDocTest { public class TypedActorDocTest {
Object someReference = null; Object someReference = null;
ActorSystem system = null; ActorSystem system = null;
@ -190,4 +196,58 @@ public class TypedActorDocTest {
//dun care //dun care
} }
} }
//#typed-router-types-1
interface HasName {
String name();
}
class Named implements HasName {
private int id = new Random().nextInt(1024);
@Override public String name() { return "name-" + id; }
}
//#typed-router-types
@Test public void typedRouterPattern() {
try {
//#typed-router
// prepare routees
TypedActorExtension typed = TypedActor.get(system);
Named named1 =
typed.typedActorOf(new TypedProps<Named>(Named.class));
Named named2 =
typed.typedActorOf(new TypedProps<Named>(Named.class));
List<Named> routees = new ArrayList<Named>();
routees.add(named1);
routees.add(named2);
List<String> routeePaths = new ArrayList<String>();
routeePaths.add(typed.getActorRefFor(named1).path().toStringWithoutAddress());
routeePaths.add(typed.getActorRefFor(named2).path().toStringWithoutAddress());
// prepare untyped router
ActorRef router = system.actorOf(new RoundRobinGroup(routeePaths).props(), "router");
// prepare typed proxy, forwarding MethodCall messages to `router`
Named typedRouter = typed.typedActorOf(new TypedProps<Named>(Named.class), router);
System.out.println("actor was: " + typedRouter.name()); // name-243
System.out.println("actor was: " + typedRouter.name()); // name-614
System.out.println("actor was: " + typedRouter.name()); // name-243
System.out.println("actor was: " + typedRouter.name()); // name-614
//#typed-router
typed.poisonPill(named1);
typed.poisonPill(named2);
typed.poisonPill(typedRouter);
} catch (Exception e) {
//dun care
}
}
} }

View file

@ -221,3 +221,21 @@ Lookup & Remoting
Since ``TypedActors`` are backed by ``Akka Actors``, you can use ``typedActorOf`` to proxy ``ActorRefs`` potentially residing on remote nodes. Since ``TypedActors`` are backed by ``Akka Actors``, you can use ``typedActorOf`` to proxy ``ActorRefs`` potentially residing on remote nodes.
.. includecode:: code/docs/actor/TypedActorDocTest.java#typed-actor-remote .. includecode:: code/docs/actor/TypedActorDocTest.java#typed-actor-remote
Typed Router pattern
--------------------
Sometimes you want to spread messages between multiple actors. The easiest way to achieve this in Akka is to use a :ref:`Router <routing-java>`,
which can implement a specific routing logic, such as ``smallest-mailbox`` or ``consistent-hashing`` etc.
Routers are not provided directly for typed actors, but it is really easy to leverage an untyped router and use a typed proxy in front of it.
To showcase this let's create typed actors that assign themselves some random ``id``, so we know that in fact, the router has sent the message to different actors:
.. includecode:: code/docs/actor/TypedActorDocTest.java#typed-router-types
In order to round robin among a few instances of such actors, you can simply create a plain untyped router,
and then facade it with a ``TypedActor`` like shown in the example below. This works because typed actors of course
communicate using the same mechanisms as normal actors, and methods calls on them get transformed into message sends of ``MethodCall`` messages.
.. includecode:: code/docs/actor/TypedActorDocTest.java#typed-router

View file

@ -3,14 +3,15 @@
*/ */
package docs.actor package docs.actor
import language.postfixOps import java.lang.String.{ valueOf => println }
import scala.concurrent.{ Promise, Future, Await }
import scala.concurrent.duration._ import akka.actor.{ ActorContext, ActorRef, TypedActor, TypedProps }
import akka.actor.{ ActorContext, TypedActor, TypedProps } import akka.routing.RoundRobinGroup
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
import org.scalatest.Matchers
import akka.testkit._ import akka.testkit._
import scala.concurrent.{ Future, Await }
import scala.concurrent.duration._
//Mr funny man avoids printing to stdout AND keeping docs alright //Mr funny man avoids printing to stdout AND keeping docs alright
import java.lang.String.{ valueOf => println } import java.lang.String.{ valueOf => println }
import akka.actor.ActorRef import akka.actor.ActorRef
@ -62,6 +63,19 @@ trait Bar {
class FooBar extends Foo with Bar class FooBar extends Foo with Bar
//#typed-actor-supercharge //#typed-actor-supercharge
//#typed-router-types
trait HasName {
def name(): String
}
class Named extends HasName {
import scala.util.Random
private val id = Random.nextInt(1024)
def name(): String = "name-" + id
}
//#typed-router-types
class TypedActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { class TypedActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
"get the TypedActor extension" in { "get the TypedActor extension" in {
@ -181,4 +195,31 @@ class TypedActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
//#typed-actor-supercharge-usage //#typed-actor-supercharge-usage
Await.result(f, 3 seconds) should be("YES") Await.result(f, 3 seconds) should be("YES")
} }
"typed router pattern" in {
//#typed-router
def namedActor(): HasName = TypedActor(system).typedActorOf(TypedProps[Named]())
// prepare routees
val routees: List[HasName] = List.fill(5) { namedActor() }
val routeePaths = routees map { r =>
TypedActor(system).getActorRefFor(r).path.toStringWithoutAddress
}
// prepare untyped router
val router: ActorRef = system.actorOf(RoundRobinGroup(routeePaths).props())
// prepare typed proxy, forwarding MethodCall messages to `router`
val typedRouter: HasName =
TypedActor(system).typedActorOf(TypedProps[Named](), actorRef = router)
println("actor was: " + typedRouter.name()) // name-184
println("actor was: " + typedRouter.name()) // name-753
println("actor was: " + typedRouter.name()) // name-320
println("actor was: " + typedRouter.name()) // name-164
//#typed-router
routees foreach { TypedActor(system).poisonPill(_) }
TypedActor(system).poisonPill(router)
}
} }

View file

@ -214,3 +214,21 @@ Here's an example on how you can use traits to mix in behavior in your Typed Act
.. includecode:: code/docs/actor/TypedActorDocSpec.scala#typed-actor-supercharge .. includecode:: code/docs/actor/TypedActorDocSpec.scala#typed-actor-supercharge
.. includecode:: code/docs/actor/TypedActorDocSpec.scala#typed-actor-supercharge-usage .. includecode:: code/docs/actor/TypedActorDocSpec.scala#typed-actor-supercharge-usage
Typed Router pattern
--------------------
Sometimes you want to spread messages between multiple actors. The easiest way to achieve this in Akka is to use a :ref:`Router <routing-scala>`,
which can implement a specific routing logic, such as ``smallest-mailbox`` or ``consistent-hashing`` etc.
Routers are not provided directly for typed actors, but it is really easy to leverage an untyped router and use a typed proxy in front of it.
To showcase this let's create typed actors that assign themselves some random ``id``, so we know that in fact, the router has sent the message to different actors:
.. includecode:: code/docs/actor/TypedActorDocSpec.scala#typed-router-types
In order to round robin among a few instances of such actors, you can simply create a plain untyped router,
and then facade it with a ``TypedActor`` like shown in the example below. This works because typed actors of course
communicate using the same mechanisms as normal actors, and methods calls on them get transformed into message sends of ``MethodCall`` messages.
.. includecode:: code/docs/actor/TypedActorDocSpec.scala#typed-router