diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index ac58eb3ade..9710052295 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -3,24 +3,24 @@ */ package akka.actor -import language.postfixOps +import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.{ CountDownLatch, TimeUnit, TimeoutException } + +import akka.actor.TypedActor._ +import akka.japi.{ Option ⇒ JOption } +import akka.pattern.ask +import akka.routing.RoundRobinGroup +import akka.serialization.JavaSerializer +import akka.testkit.{ AkkaSpec, DefaultTimeout, EventFilter, TimingTest, filterEvents } +import akka.util.Timeout import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } + import scala.annotation.tailrec import scala.collection.immutable -import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.duration._ -import akka.testkit.{ EventFilter, filterEvents, AkkaSpec } -import akka.util.Timeout -import akka.japi.{ Option ⇒ JOption } -import akka.testkit.DefaultTimeout -import akka.dispatch.Dispatchers -import akka.pattern.ask -import akka.serialization.JavaSerializer -import akka.actor.TypedActor._ -import java.util.concurrent.atomic.AtomicReference -import java.lang.IllegalStateException -import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch } -import akka.testkit.TimingTest +import scala.concurrent.{ Await, Future, Promise } +import scala.language.postfixOps +import scala.util.Random object TypedActorSpec { @@ -113,7 +113,7 @@ object TypedActorSpec { class Bar extends Foo with Serializable { - import TypedActor.dispatcher + import akka.actor.TypedActor.dispatcher def pigdog = "Pigdog" @@ -210,7 +210,7 @@ object TypedActorSpec { class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) with BeforeAndAfterEach with BeforeAndAfterAll with DefaultTimeout { - import TypedActorSpec._ + import akka.actor.TypedActorSpec._ def newFooBar: Foo = newFooBar(timeout.duration) @@ -516,3 +516,47 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) } } } + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class TypedActorRouterSpec extends AkkaSpec(TypedActorSpec.config) + with BeforeAndAfterEach with BeforeAndAfterAll with DefaultTimeout { + + import akka.actor.TypedActorSpec._ + + def newFooBar: Foo = newFooBar(timeout.duration) + + def newFooBar(d: FiniteDuration): Foo = + TypedActor(system).typedActorOf(TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(Timeout(d))) + + def mustStop(typedActor: AnyRef) = TypedActor(system).stop(typedActor) should be(true) + + "TypedActor Router" must { + + "work" in { + val t1 = newFooBar + val t2 = newFooBar + val t3 = newFooBar + val t4 = newFooBar + val routees = List(t1, t2, t3, t4) map { t ⇒ TypedActor(system).getActorRefFor(t).path.toStringWithoutAddress } + + TypedActor(system).isTypedActor(t1) should be(true) + TypedActor(system).isTypedActor(t2) should be(true) + + val router = system.actorOf(RoundRobinGroup(routees).props(), "router") + + val typedRouter = TypedActor(system).typedActorOf[Foo, Foo](TypedProps[Foo](), router) + + info("got = " + typedRouter.optionPigdog()) + info("got = " + typedRouter.optionPigdog()) + info("got = " + typedRouter.optionPigdog()) + info("got = " + typedRouter.optionPigdog()) + info("got = " + typedRouter.optionPigdog()) + + mustStop(t1) + mustStop(t2) + mustStop(t3) + mustStop(t4) + } + } + +} diff --git a/akka-docs/rst/java/code/docs/actor/TypedActorDocTest.java b/akka-docs/rst/java/code/docs/actor/TypedActorDocTest.java index cfd483ade3..cac73e609c 100644 --- a/akka-docs/rst/java/code/docs/actor/TypedActorDocTest.java +++ b/akka-docs/rst/java/code/docs/actor/TypedActorDocTest.java @@ -9,16 +9,22 @@ import akka.actor.TypedActor; import akka.actor.*; import akka.japi.*; import akka.dispatch.Futures; + import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import java.util.concurrent.TimeUnit; +import java.util.List; +import java.util.ArrayList; +import java.util.Random; +import akka.routing.RoundRobinGroup; //#imports -import java.lang.Exception; import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; + +//#imports public class TypedActorDocTest { Object someReference = null; ActorSystem system = null; @@ -190,4 +196,58 @@ public class TypedActorDocTest { //dun care } } + + //#typed-router-types-1 + interface HasName { + String name(); + } + + class Named implements HasName { + private int id = new Random().nextInt(1024); + + @Override public String name() { return "name-" + id; } + } + //#typed-router-types + + + @Test public void typedRouterPattern() { + try { + //#typed-router + // prepare routees + TypedActorExtension typed = TypedActor.get(system); + + Named named1 = + typed.typedActorOf(new TypedProps(Named.class)); + + Named named2 = + typed.typedActorOf(new TypedProps(Named.class)); + + List routees = new ArrayList(); + routees.add(named1); + routees.add(named2); + + List routeePaths = new ArrayList(); + routeePaths.add(typed.getActorRefFor(named1).path().toStringWithoutAddress()); + routeePaths.add(typed.getActorRefFor(named2).path().toStringWithoutAddress()); + + // prepare untyped router + ActorRef router = system.actorOf(new RoundRobinGroup(routeePaths).props(), "router"); + + // prepare typed proxy, forwarding MethodCall messages to `router` + Named typedRouter = typed.typedActorOf(new TypedProps(Named.class), router); + + System.out.println("actor was: " + typedRouter.name()); // name-243 + System.out.println("actor was: " + typedRouter.name()); // name-614 + System.out.println("actor was: " + typedRouter.name()); // name-243 + System.out.println("actor was: " + typedRouter.name()); // name-614 + + //#typed-router + typed.poisonPill(named1); + typed.poisonPill(named2); + typed.poisonPill(typedRouter); + + } catch (Exception e) { + //dun care + } + } } diff --git a/akka-docs/rst/java/typed-actors.rst b/akka-docs/rst/java/typed-actors.rst index 3d7baf8578..d8af78ecc9 100644 --- a/akka-docs/rst/java/typed-actors.rst +++ b/akka-docs/rst/java/typed-actors.rst @@ -221,3 +221,21 @@ Lookup & Remoting Since ``TypedActors`` are backed by ``Akka Actors``, you can use ``typedActorOf`` to proxy ``ActorRefs`` potentially residing on remote nodes. .. includecode:: code/docs/actor/TypedActorDocTest.java#typed-actor-remote + +Typed Router pattern +-------------------- + +Sometimes you want to spread messages between multiple actors. The easiest way to achieve this in Akka is to use a :ref:`Router `, +which can implement a specific routing logic, such as ``smallest-mailbox`` or ``consistent-hashing`` etc. + +Routers are not provided directly for typed actors, but it is really easy to leverage an untyped router and use a typed proxy in front of it. +To showcase this let's create typed actors that assign themselves some random ``id``, so we know that in fact, the router has sent the message to different actors: + +.. includecode:: code/docs/actor/TypedActorDocTest.java#typed-router-types + +In order to round robin among a few instances of such actors, you can simply create a plain untyped router, +and then facade it with a ``TypedActor`` like shown in the example below. This works because typed actors of course +communicate using the same mechanisms as normal actors, and methods calls on them get transformed into message sends of ``MethodCall`` messages. + +.. includecode:: code/docs/actor/TypedActorDocTest.java#typed-router + diff --git a/akka-docs/rst/scala/code/docs/actor/TypedActorDocSpec.scala b/akka-docs/rst/scala/code/docs/actor/TypedActorDocSpec.scala index 41f2747e0e..6d6fc0227e 100644 --- a/akka-docs/rst/scala/code/docs/actor/TypedActorDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/actor/TypedActorDocSpec.scala @@ -3,14 +3,15 @@ */ package docs.actor -import language.postfixOps -import scala.concurrent.{ Promise, Future, Await } -import scala.concurrent.duration._ -import akka.actor.{ ActorContext, TypedActor, TypedProps } -import org.scalatest.{ BeforeAndAfterAll, WordSpec } -import org.scalatest.Matchers +import java.lang.String.{ valueOf => println } + +import akka.actor.{ ActorContext, ActorRef, TypedActor, TypedProps } +import akka.routing.RoundRobinGroup import akka.testkit._ +import scala.concurrent.{ Future, Await } +import scala.concurrent.duration._ + //Mr funny man avoids printing to stdout AND keeping docs alright import java.lang.String.{ valueOf => println } import akka.actor.ActorRef @@ -62,6 +63,19 @@ trait Bar { class FooBar extends Foo with Bar //#typed-actor-supercharge +//#typed-router-types +trait HasName { + def name(): String +} + +class Named extends HasName { + import scala.util.Random + private val id = Random.nextInt(1024) + + def name(): String = "name-" + id +} +//#typed-router-types + class TypedActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { "get the TypedActor extension" in { @@ -181,4 +195,31 @@ class TypedActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { //#typed-actor-supercharge-usage Await.result(f, 3 seconds) should be("YES") } + + "typed router pattern" in { + //#typed-router + def namedActor(): HasName = TypedActor(system).typedActorOf(TypedProps[Named]()) + + // prepare routees + val routees: List[HasName] = List.fill(5) { namedActor() } + val routeePaths = routees map { r => + TypedActor(system).getActorRefFor(r).path.toStringWithoutAddress + } + + // prepare untyped router + val router: ActorRef = system.actorOf(RoundRobinGroup(routeePaths).props()) + + // prepare typed proxy, forwarding MethodCall messages to `router` + val typedRouter: HasName = + TypedActor(system).typedActorOf(TypedProps[Named](), actorRef = router) + + println("actor was: " + typedRouter.name()) // name-184 + println("actor was: " + typedRouter.name()) // name-753 + println("actor was: " + typedRouter.name()) // name-320 + println("actor was: " + typedRouter.name()) // name-164 + //#typed-router + + routees foreach { TypedActor(system).poisonPill(_) } + TypedActor(system).poisonPill(router) + } } diff --git a/akka-docs/rst/scala/typed-actors.rst b/akka-docs/rst/scala/typed-actors.rst index ede566971f..b560db2b72 100644 --- a/akka-docs/rst/scala/typed-actors.rst +++ b/akka-docs/rst/scala/typed-actors.rst @@ -214,3 +214,21 @@ Here's an example on how you can use traits to mix in behavior in your Typed Act .. includecode:: code/docs/actor/TypedActorDocSpec.scala#typed-actor-supercharge .. includecode:: code/docs/actor/TypedActorDocSpec.scala#typed-actor-supercharge-usage + +Typed Router pattern +-------------------- + +Sometimes you want to spread messages between multiple actors. The easiest way to achieve this in Akka is to use a :ref:`Router `, +which can implement a specific routing logic, such as ``smallest-mailbox`` or ``consistent-hashing`` etc. + +Routers are not provided directly for typed actors, but it is really easy to leverage an untyped router and use a typed proxy in front of it. +To showcase this let's create typed actors that assign themselves some random ``id``, so we know that in fact, the router has sent the message to different actors: + +.. includecode:: code/docs/actor/TypedActorDocSpec.scala#typed-router-types + +In order to round robin among a few instances of such actors, you can simply create a plain untyped router, +and then facade it with a ``TypedActor`` like shown in the example below. This works because typed actors of course +communicate using the same mechanisms as normal actors, and methods calls on them get transformed into message sends of ``MethodCall`` messages. + +.. includecode:: code/docs/actor/TypedActorDocSpec.scala#typed-router +