Merge pull request #719 from akka/wip-2530-deprecate-tell1-∂π

remove all but one occurrence of single-arg tell()
This commit is contained in:
Viktor Klang (√) 2012-09-21 03:05:00 -07:00
commit a323161b2f
71 changed files with 550 additions and 538 deletions

View file

@ -2,7 +2,7 @@ package akka.actor;
public class JavaAPITestActor extends UntypedActor { public class JavaAPITestActor extends UntypedActor {
public void onReceive(Object msg) { public void onReceive(Object msg) {
getSender().tell("got it!"); getSender().tell("got it!", getSelf());
getContext().getChildren(); getContext().getChildren();
} }
} }

View file

@ -12,6 +12,6 @@ public class NonPublicClass {
class MyNonPublicActorClass extends UntypedActor { class MyNonPublicActorClass extends UntypedActor {
@Override public void onReceive(Object msg) { @Override public void onReceive(Object msg) {
getSender().tell(msg); getSender().tell(msg, getSelf());
} }
} }

View file

@ -1,14 +1,10 @@
package akka.actor; package akka.actor;
import akka.actor.ActorSystem;
import akka.japi.Creator;
import akka.testkit.AkkaSpec;
import com.typesafe.config.ConfigFactory;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*;
import com.typesafe.config.ConfigFactory;
public class StashJavaAPI { public class StashJavaAPI {
@ -16,7 +12,8 @@ public class StashJavaAPI {
@BeforeClass @BeforeClass
public static void beforeAll() { public static void beforeAll() {
system = ActorSystem.create("StashJavaAPI", ConfigFactory.parseString(ActorWithStashSpec.testConf())); system = ActorSystem.create("StashJavaAPI",
ConfigFactory.parseString(ActorWithStashSpec.testConf()));
} }
@AfterClass @AfterClass
@ -27,10 +24,11 @@ public class StashJavaAPI {
@Test @Test
public void mustBeAbleToUseStash() { public void mustBeAbleToUseStash() {
ActorRef ref = system.actorOf(new Props(StashJavaAPITestActor.class).withDispatcher("my-dispatcher")); ActorRef ref = system.actorOf(new Props(StashJavaAPITestActor.class)
.withDispatcher("my-dispatcher"));
ref.tell("Hello", ref); ref.tell("Hello", ref);
ref.tell("Hello", ref); ref.tell("Hello", ref);
ref.tell(new Object()); ref.tell(new Object(), null);
} }
} }

View file

@ -4,10 +4,11 @@ import static org.junit.Assert.*;
public class StashJavaAPITestActor extends UntypedActorWithStash { public class StashJavaAPITestActor extends UntypedActorWithStash {
int count = 0; int count = 0;
public void onReceive(Object msg) { public void onReceive(Object msg) {
if (msg instanceof String) { if (msg instanceof String) {
if (count < 0) { if (count < 0) {
getSender().tell(new Integer(((String) msg).length())); getSender().tell(new Integer(((String) msg).length()), getSelf());
} else if (count == 2) { } else if (count == 2) {
count = -1; count = -1;
unstashAll(); unstashAll();

View file

@ -3,14 +3,9 @@
*/ */
package akka.routing; package akka.routing;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.Props; import akka.actor.Props;
import akka.routing.RoundRobinRouter;
import akka.testkit.ExtractRoute; import akka.testkit.ExtractRoute;
public class CustomRouteTest { public class CustomRouteTest {

View file

@ -371,8 +371,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
val timeout = Timeout(20000) val timeout = Timeout(20000)
val ref = system.actorOf(Props(new Actor { val ref = system.actorOf(Props(new Actor {
def receive = { def receive = {
case 5 sender.tell("five") case 5 sender ! "five"
case 0 sender.tell("null") case 0 sender ! "null"
} }
})) }))

View file

@ -32,7 +32,7 @@ object ConsistencySpec {
case step: Long case step: Long
if (lastStep != (step - 1)) if (lastStep != (step - 1))
sender.tell("Test failed: Last step %s, this step %s".format(lastStep, step)) sender ! "Test failed: Last step %s, this step %s".format(lastStep, step)
var shouldBeFortyTwo = left.value + right.value var shouldBeFortyTwo = left.value + right.value
if (shouldBeFortyTwo != 42) if (shouldBeFortyTwo != 42)

View file

@ -59,11 +59,11 @@ object Ticket669Spec {
} }
override def preRestart(reason: scala.Throwable, msg: Option[Any]) { override def preRestart(reason: scala.Throwable, msg: Option[Any]) {
sender.tell("failure1") sender ! "failure1"
} }
override def postStop() { override def postStop() {
sender.tell("failure2") sender ! "failure2"
} }
} }
} }

View file

@ -86,7 +86,7 @@ object ActorModelSpec {
case Wait(time) ack; Thread.sleep(time); busy.switchOff() case Wait(time) ack; Thread.sleep(time); busy.switchOff()
case WaitAck(time, l) ack; Thread.sleep(time); l.countDown(); busy.switchOff() case WaitAck(time, l) ack; Thread.sleep(time); l.countDown(); busy.switchOff()
case Reply(msg) ack; sender ! msg; busy.switchOff() case Reply(msg) ack; sender ! msg; busy.switchOff()
case TryReply(msg) ack; sender.tell(msg); busy.switchOff() case TryReply(msg) ack; sender.tell(msg, null); busy.switchOff()
case Forward(to, msg) ack; to.forward(msg); busy.switchOff() case Forward(to, msg) ack; to.forward(msg); busy.switchOff()
case CountDown(latch) ack; latch.countDown(); busy.switchOff() case CountDown(latch) ack; latch.countDown(); busy.switchOff()
case Increment(count) ack; count.incrementAndGet(); busy.switchOff() case Increment(count) ack; count.incrementAndGet(); busy.switchOff()

View file

@ -56,7 +56,7 @@ class PriorityDispatcherSpec extends AkkaSpec(PriorityDispatcherSpec.config) wit
def receive = { def receive = {
case i: Int acc = i :: acc case i: Int acc = i :: acc
case 'Result sender.tell(acc) case 'Result sender ! acc
} }
}).withDispatcher(dispatcherKey)).asInstanceOf[InternalActorRef] }).withDispatcher(dispatcherKey)).asInstanceOf[InternalActorRef]

View file

@ -535,7 +535,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
case _id: Int if (_id == id) case _id: Int if (_id == id)
case x { case x {
Thread sleep 100 * id Thread sleep 100 * id
sender.tell(id) sender ! id
} }
} }

View file

@ -6,9 +6,6 @@ package akka.dispatch;
import akka.util.Unsafe; import akka.util.Unsafe;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
abstract class AbstractMessageDispatcher { abstract class AbstractMessageDispatcher {
final static long shutdownScheduleOffset; final static long shutdownScheduleOffset;
final static long inhabitantsOffset; final static long inhabitantsOffset;

View file

@ -92,13 +92,14 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
* actor.tell(message); * actor.tell(message);
* </pre> * </pre>
*/ */
@deprecated("use the two-arg variant (typically getSelf() as second arg)", "2.1")
final def tell(msg: Any): Unit = this.!(msg)(null: ActorRef) final def tell(msg: Any): Unit = this.!(msg)(null: ActorRef)
/** /**
* Java API. <p/> * Java API. <p/>
* Sends the specified message to the sender, i.e. fire-and-forget * Sends the specified message to the sender, i.e. fire-and-forget
* semantics, including the sender reference if possible (not supported on * semantics, including the sender reference if possible (pass `null` if
* all senders).<p/> * there is nobody to reply to).<p/>
* <pre> * <pre>
* actor.tell(message, context); * actor.tell(message, context);
* </pre> * </pre>

View file

@ -195,7 +195,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep
lock.unlock() lock.unlock()
} }
} else { } else {
system.deadLetters.tell(DeadLetter(message, sender, self)) system.deadLetters ! DeadLetter(message, sender, self)
} }
} }
def sendSystemMessage(msg: SystemMessage): Unit = { def sendSystemMessage(msg: SystemMessage): Unit = {
@ -209,7 +209,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep
} else { } else {
// FIXME: once we have guaranteed delivery of system messages, hook this in! // FIXME: once we have guaranteed delivery of system messages, hook this in!
system.eventStream.publish(Warning(self.path.toString, getClass, "dropping system message " + msg + " due to lock timeout")) system.eventStream.publish(Warning(self.path.toString, getClass, "dropping system message " + msg + " due to lock timeout"))
system.deadLetters.tell(DeadLetter(msg, self, self)) system.deadLetters ! DeadLetter(msg, self, self)
} }
} }
def isLocal = true def isLocal = true

View file

@ -57,7 +57,7 @@ private[akka] trait Dispatch { this: ActorCell ⇒
if (sendSupervise) { if (sendSupervise) {
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(akka.dispatch.Supervise(self, uid)) parent.sendSystemMessage(akka.dispatch.Supervise(self, uid))
parent.tell(NullMessage) // read ScalaDoc of NullMessage to see why parent ! NullMessage // read ScalaDoc of NullMessage to see why
} }
// This call is expected to start off the actor by scheduling its mailbox. // This call is expected to start off the actor by scheduling its mailbox.

View file

@ -194,7 +194,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
try if (a ne null) a.postStop() try if (a ne null) a.postStop()
finally try dispatcher.detach(this) finally try dispatcher.detach(this)
finally try parent.sendSystemMessage(ChildTerminated(self)) finally try parent.sendSystemMessage(ChildTerminated(self))
finally try parent.tell(NullMessage) // read ScalaDoc of NullMessage to see why finally try parent ! NullMessage // read ScalaDoc of NullMessage to see why
finally try tellWatchersWeDied(a) finally try tellWatchersWeDied(a)
finally try unwatchWatchedActors(a) finally try unwatchWatchedActors(a)
finally { finally {

View file

@ -75,7 +75,7 @@ trait AskSupport {
*/ */
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match { def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match {
case ref: InternalActorRef if ref.isTerminated case ref: InternalActorRef if ref.isTerminated
actorRef.tell(message) actorRef ! message
Future.failed[Any](new AskTimeoutException("Recipient[%s] had already been terminated." format actorRef)) Future.failed[Any](new AskTimeoutException("Recipient[%s] had already been terminated." format actorRef))
case ref: InternalActorRef case ref: InternalActorRef
if (timeout.duration.length <= 0) Future.failed[Any](new IllegalArgumentException("Timeout length for an `ask` must be greater or equal to 1. Question not sent to [%s]" format actorRef)) if (timeout.duration.length <= 0) Future.failed[Any](new IllegalArgumentException("Timeout length for an `ask` must be greater or equal to 1. Question not sent to [%s]" format actorRef))

View file

@ -21,7 +21,6 @@ import akka.actor.Props;
import akka.testkit.AkkaSpec; import akka.testkit.AkkaSpec;
import akka.testkit.JavaTestKit; import akka.testkit.JavaTestKit;
/** /**
* @author Martin Krasser * @author Martin Krasser
*/ */
@ -35,25 +34,29 @@ public class ConsumerJavaTestBase {
} }
@Test @Test
public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() throws Exception { public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse()
new JavaTestKit(system) {{ throws Exception {
new JavaTestKit(system) {
{
String result = new EventFilter<String>(Exception.class) { String result = new EventFilter<String>(Exception.class) {
protected String run() { protected String run() {
FiniteDuration timeout = Duration.create(1, TimeUnit.SECONDS); FiniteDuration timeout = Duration.create(1, TimeUnit.SECONDS);
Camel camel = CamelExtension.get(system); Camel camel = CamelExtension.get(system);
ExecutionContext executionContext = system.dispatcher(); ExecutionContext executionContext = system.dispatcher();
try { try {
ActorRef ref = Await.result( @SuppressWarnings("unused")
camel.activationFutureFor(system.actorOf(new Props(SampleErrorHandlingConsumer.class)), timeout, executionContext), ActorRef ref = Await.result(camel.activationFutureFor(
timeout); system.actorOf(new Props(SampleErrorHandlingConsumer.class)),
return camel.template().requestBody("direct:error-handler-test-java", "hello", String.class); timeout, executionContext), timeout);
} return camel.template().requestBody(
catch (Exception e) { "direct:error-handler-test-java", "hello", String.class);
} catch (Exception e) {
return e.getMessage(); return e.getMessage();
} }
} }
}.occurrences(1).exec(); }.occurrences(1).exec();
assertEquals("error: hello", result); assertEquals("error: hello", result);
}}; }
};
} }
} }

View file

@ -208,7 +208,7 @@ public class CustomRouteTestBase {
@Override @Override
public void onReceive(Object message) { public void onReceive(Object message) {
this.getProducerTemplate().sendBody(to, "test"); this.getProducerTemplate().sendBody(to, "test");
getSender().tell(Ack.getInstance()); getSender().tell(Ack.getInstance(), getSelf());
} }
} }
} }

View file

@ -42,7 +42,7 @@ public class SampleErrorHandlingConsumer extends UntypedConsumerActor {
@Override @Override
public void preRestart(Throwable reason, Option<Object> message){ public void preRestart(Throwable reason, Option<Object> message){
getSender().tell(new Status.Failure(reason)); getSender().tell(new Status.Failure(reason), getSelf());
} }
} }

View file

@ -19,7 +19,7 @@ public class SampleUntypedConsumer extends UntypedConsumerActor {
CamelMessage msg = (CamelMessage)message; CamelMessage msg = (CamelMessage)message;
String body = msg.getBodyAs(String.class, getCamelContext()); String body = msg.getBodyAs(String.class, getCamelContext());
String header = msg.getHeaderAs("test", String.class,getCamelContext()); String header = msg.getHeaderAs("test", String.class,getCamelContext());
sender().tell(String.format("%s %s", body, header)); sender().tell(String.format("%s %s", body, header), getSelf());
} }
} }

View file

@ -64,7 +64,7 @@ public class DangerousJavaActor extends UntypedActor {
public Future<String> call() throws Exception { public Future<String> call() throws Exception {
return f; return f;
} }
})); }), getSelf());
} }
if ("block for me".equals(m)) { if ("block for me".equals(m)) {
getSender().tell(breaker getSender().tell(breaker
@ -74,7 +74,7 @@ public class DangerousJavaActor extends UntypedActor {
public String call() throws Exception { public String call() throws Exception {
return dangerousCall(); return dangerousCall();
} }
})); }), getSelf());
} }
} }
} }

View file

@ -22,5 +22,6 @@ class Java {
final Deadline deadline = Duration.create(10, "seconds").fromNow(); final Deadline deadline = Duration.create(10, "seconds").fromNow();
final Duration rest = deadline.timeLeft(); final Duration rest = deadline.timeLeft();
//#deadline //#deadline
rest.toString();
} }
} }

View file

@ -147,7 +147,7 @@ public class FSMDocTestBase {
@Override @Override
public void transition(State old, State next) { public void transition(State old, State next) {
if (old == State.ACTIVE) { if (old == State.ACTIVE) {
getTarget().tell(new Batch(drainQueue())); getTarget().tell(new Batch(drainQueue()), getSelf());
} }
} }
@ -175,11 +175,11 @@ public class FSMDocTestBase {
public void mustBunch() { public void mustBunch() {
final ActorRef buncher = system.actorOf(new Props(MyFSM.class)); final ActorRef buncher = system.actorOf(new Props(MyFSM.class));
final TestProbe probe = new TestProbe(system); final TestProbe probe = new TestProbe(system);
buncher.tell(new SetTarget(probe.ref())); buncher.tell(new SetTarget(probe.ref()), null);
buncher.tell(new Queue(1)); buncher.tell(new Queue(1), null);
buncher.tell(new Queue(2)); buncher.tell(new Queue(2), null);
buncher.tell(flush); buncher.tell(flush, null);
buncher.tell(new Queue(3)); buncher.tell(new Queue(3), null);
final Batch b = probe.expectMsgClass(Batch.class); final Batch b = probe.expectMsgClass(Batch.class);
assert b.objects.size() == 2; assert b.objects.size() == 2;
assert b.objects.contains(1); assert b.objects.contains(1);

View file

@ -64,7 +64,7 @@ public class FaultHandlingTestBase {
public void onReceive(Object o) { public void onReceive(Object o) {
if (o instanceof Props) { if (o instanceof Props) {
getSender().tell(getContext().actorOf((Props) o)); getSender().tell(getContext().actorOf((Props) o), getSelf());
} else { } else {
unhandled(o); unhandled(o);
} }
@ -102,7 +102,7 @@ public class FaultHandlingTestBase {
public void onReceive(Object o) { public void onReceive(Object o) {
if (o instanceof Props) { if (o instanceof Props) {
getSender().tell(getContext().actorOf((Props) o)); getSender().tell(getContext().actorOf((Props) o), getSelf());
} else { } else {
unhandled(o); unhandled(o);
} }
@ -126,7 +126,7 @@ public class FaultHandlingTestBase {
} else if (o instanceof Integer) { } else if (o instanceof Integer) {
state = (Integer) o; state = (Integer) o;
} else if (o.equals("get")) { } else if (o.equals("get")) {
getSender().tell(state); getSender().tell(state, getSelf());
} else { } else {
unhandled(o); unhandled(o);
} }
@ -167,21 +167,21 @@ public class FaultHandlingTestBase {
//#create //#create
//#resume //#resume
child.tell(42); child.tell(42, null);
assert Await.result(ask(child, "get", 5000), timeout).equals(42); assert Await.result(ask(child, "get", 5000), timeout).equals(42);
child.tell(new ArithmeticException()); child.tell(new ArithmeticException(), null);
assert Await.result(ask(child, "get", 5000), timeout).equals(42); assert Await.result(ask(child, "get", 5000), timeout).equals(42);
//#resume //#resume
//#restart //#restart
child.tell(new NullPointerException()); child.tell(new NullPointerException(), null);
assert Await.result(ask(child, "get", 5000), timeout).equals(0); assert Await.result(ask(child, "get", 5000), timeout).equals(0);
//#restart //#restart
//#stop //#stop
final TestProbe probe = new TestProbe(system); final TestProbe probe = new TestProbe(system);
probe.watch(child); probe.watch(child);
child.tell(new IllegalArgumentException()); child.tell(new IllegalArgumentException(), null);
probe.expectMsgClass(Terminated.class); probe.expectMsgClass(Terminated.class);
//#stop //#stop
@ -189,7 +189,7 @@ public class FaultHandlingTestBase {
child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout); child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout);
probe.watch(child); probe.watch(child);
assert Await.result(ask(child, "get", 5000), timeout).equals(0); assert Await.result(ask(child, "get", 5000), timeout).equals(0);
child.tell(new Exception()); child.tell(new Exception(), null);
probe.expectMsgClass(Terminated.class); probe.expectMsgClass(Terminated.class);
//#escalate-kill //#escalate-kill
@ -197,9 +197,9 @@ public class FaultHandlingTestBase {
superprops = new Props(Supervisor2.class); superprops = new Props(Supervisor2.class);
supervisor = system.actorOf(superprops); supervisor = system.actorOf(superprops);
child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout); child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout);
child.tell(23); child.tell(23, null);
assert Await.result(ask(child, "get", 5000), timeout).equals(23); assert Await.result(ask(child, "get", 5000), timeout).equals(23);
child.tell(new Exception()); child.tell(new Exception(), null);
assert Await.result(ask(child, "get", 5000), timeout).equals(0); assert Await.result(ask(child, "get", 5000), timeout).equals(0);
//#escalate-restart //#escalate-restart
//#testkit //#testkit

View file

@ -16,6 +16,6 @@ public class FirstUntypedActor extends UntypedActor {
public void onReceive(Object message) { public void onReceive(Object message) {
myActor.forward(message, getContext()); myActor.forward(message, getContext());
myActor.tell(PoisonPill.getInstance()); myActor.tell(PoisonPill.getInstance(), null);
} }
} }

View file

@ -16,7 +16,7 @@ public class MyReceivedTimeoutUntypedActor extends UntypedActor {
public void onReceive(Object message) { public void onReceive(Object message) {
if (message.equals("Hello")) { if (message.equals("Hello")) {
getSender().tell("Hello world"); getSender().tell("Hello world", getSelf());
} else if (message == ReceiveTimeout.getInstance()) { } else if (message == ReceiveTimeout.getInstance()) {
throw new RuntimeException("received timeout"); throw new RuntimeException("received timeout");
} else { } else {

View file

@ -24,7 +24,6 @@ import akka.testkit.AkkaSpec;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*;
public class SchedulerDocTestBase { public class SchedulerDocTestBase {
@ -54,7 +53,7 @@ public class SchedulerDocTestBase {
system.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS), new Runnable() { system.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS), new Runnable() {
@Override @Override
public void run() { public void run() {
testActor.tell(System.currentTimeMillis()); testActor.tell(System.currentTimeMillis(), null);
} }
}, system.dispatcher()); }, system.dispatcher());
//#schedule-one-off-thunk //#schedule-one-off-thunk

View file

@ -54,21 +54,15 @@ import java.util.ArrayList;
import akka.actor.UntypedActorWithStash; import akka.actor.UntypedActorWithStash;
//#import-stash //#import-stash
import akka.actor.Props;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory; import akka.actor.UntypedActorFactory;
import akka.dispatch.MessageDispatcher;
import org.junit.Test; import org.junit.Test;
import scala.Option; import scala.Option;
import java.lang.Object; import java.lang.Object;
import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import akka.pattern.Patterns; import akka.pattern.Patterns;
import static org.junit.Assert.*;
public class UntypedActorDocTestBase { public class UntypedActorDocTestBase {
@Test @Test
@ -95,7 +89,7 @@ public class UntypedActorDocTestBase {
ActorSystem system = ActorSystem.create("MySystem"); ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class), "myactor"); ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class), "myactor");
//#system-actorOf //#system-actorOf
myActor.tell("test"); myActor.tell("test", null);
system.shutdown(); system.shutdown();
} }
@ -105,7 +99,7 @@ public class UntypedActorDocTestBase {
ActorSystem system = ActorSystem.create("MySystem"); ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class), "myactor"); ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class), "myactor");
//#context-actorOf //#context-actorOf
myActor.tell("test"); myActor.tell("test", null);
system.shutdown(); system.shutdown();
} }
@ -120,7 +114,7 @@ public class UntypedActorDocTestBase {
} }
}), "myactor"); }), "myactor");
//#creating-constructor //#creating-constructor
myActor.tell("test"); myActor.tell("test", null);
system.shutdown(); system.shutdown();
} }
@ -130,7 +124,7 @@ public class UntypedActorDocTestBase {
//#creating-props //#creating-props
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), "myactor"); ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), "myactor");
//#creating-props //#creating-props
myActor.tell("test"); myActor.tell("test", null);
system.shutdown(); system.shutdown();
} }
@ -154,7 +148,7 @@ public class UntypedActorDocTestBase {
public void receiveTimeout() { public void receiveTimeout() {
ActorSystem system = ActorSystem.create("MySystem"); ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(MyReceivedTimeoutUntypedActor.class)); ActorRef myActor = system.actorOf(new Props(MyReceivedTimeoutUntypedActor.class));
myActor.tell("Hello"); myActor.tell("Hello", null);
system.shutdown(); system.shutdown();
} }
@ -163,7 +157,7 @@ public class UntypedActorDocTestBase {
ActorSystem system = ActorSystem.create("MySystem"); ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class)); ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class));
//#poison-pill //#poison-pill
myActor.tell(PoisonPill.getInstance()); myActor.tell(PoisonPill.getInstance(), null);
//#poison-pill //#poison-pill
system.shutdown(); system.shutdown();
} }
@ -173,7 +167,7 @@ public class UntypedActorDocTestBase {
ActorSystem system = ActorSystem.create("MySystem"); ActorSystem system = ActorSystem.create("MySystem");
ActorRef victim = system.actorOf(new Props(MyUntypedActor.class)); ActorRef victim = system.actorOf(new Props(MyUntypedActor.class));
//#kill //#kill
victim.tell(Kill.getInstance()); victim.tell(Kill.getInstance(), null);
//#kill //#kill
system.shutdown(); system.shutdown();
} }
@ -186,9 +180,9 @@ public class UntypedActorDocTestBase {
return new HotSwapActor(); return new HotSwapActor();
} }
})); }));
myActor.tell("foo"); myActor.tell("foo", null);
myActor.tell("bar"); myActor.tell("bar", null);
myActor.tell("bar"); myActor.tell("bar", null);
system.shutdown(); system.shutdown();
} }
@ -265,7 +259,7 @@ public class UntypedActorDocTestBase {
try { try {
operation(); operation();
} catch (Exception e) { } catch (Exception e) {
getSender().tell(new akka.actor.Status.Failure(e)); getSender().tell(new akka.actor.Status.Failure(e), getSelf());
throw e; throw e;
} }
} }
@ -298,9 +292,9 @@ public class UntypedActorDocTestBase {
//#reply-exception //#reply-exception
try { try {
String result = operation(); String result = operation();
getSender().tell(result); getSender().tell(result, getSelf());
} catch (Exception e) { } catch (Exception e) {
getSender().tell(new akka.actor.Status.Failure(e)); getSender().tell(new akka.actor.Status.Failure(e), getSelf());
throw e; throw e;
} }
//#reply-exception //#reply-exception
@ -318,7 +312,7 @@ public class UntypedActorDocTestBase {
@Override @Override
public void apply(Object message) { public void apply(Object message) {
if (message.equals("bar")) { if (message.equals("bar")) {
getSender().tell("I am already angry?"); getSender().tell("I am already angry?", getSelf());
} else if (message.equals("foo")) { } else if (message.equals("foo")) {
getContext().become(happy); getContext().become(happy);
} }
@ -329,7 +323,7 @@ public class UntypedActorDocTestBase {
@Override @Override
public void apply(Object message) { public void apply(Object message) {
if (message.equals("bar")) { if (message.equals("bar")) {
getSender().tell("I am already happy :-)"); getSender().tell("I am already happy :-)", getSelf());
} else if (message.equals("foo")) { } else if (message.equals("foo")) {
getContext().become(angry); getContext().become(angry);
} }
@ -390,7 +384,7 @@ public class UntypedActorDocTestBase {
} else if (message instanceof Terminated) { } else if (message instanceof Terminated) {
final Terminated t = (Terminated) message; final Terminated t = (Terminated) message;
if (t.getActor() == child) { if (t.getActor() == child) {
lastSender.tell("finished"); lastSender.tell("finished", getSelf());
} }
} else { } else {
unhandled(message); unhandled(message);

View file

@ -44,12 +44,12 @@ public class UntypedActorSwapper {
public static void main(String... args) { public static void main(String... args) {
ActorSystem system = ActorSystem.create("MySystem"); ActorSystem system = ActorSystem.create("MySystem");
ActorRef swap = system.actorOf(new Props(Swapper.class)); ActorRef swap = system.actorOf(new Props(Swapper.class));
swap.tell(SWAP); // logs Hi swap.tell(SWAP, null); // logs Hi
swap.tell(SWAP); // logs Ho swap.tell(SWAP, null); // logs Ho
swap.tell(SWAP); // logs Hi swap.tell(SWAP, null); // logs Hi
swap.tell(SWAP); // logs Ho swap.tell(SWAP, null); // logs Ho
swap.tell(SWAP); // logs Hi swap.tell(SWAP, null); // logs Hi
swap.tell(SWAP); // logs Ho swap.tell(SWAP, null); // logs Ho
} }
} }

View file

@ -14,8 +14,6 @@ package docs.camel;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.TimeUnit;
public class ActivationTestBase { public class ActivationTestBase {
@Test @Test

View file

@ -12,7 +12,7 @@ public class Consumer2 extends UntypedConsumerActor {
if (message instanceof CamelMessage) { if (message instanceof CamelMessage) {
CamelMessage camelMessage = (CamelMessage) message; CamelMessage camelMessage = (CamelMessage) message;
String body = camelMessage.getBodyAs(String.class, getCamelContext()); String body = camelMessage.getBodyAs(String.class, getCamelContext());
getSender().tell(String.format("Received message: %s",body)); getSender().tell(String.format("Received message: %s",body), getSelf());
} else } else
unhandled(message); unhandled(message);
} }

View file

@ -18,12 +18,12 @@ public class Consumer3 extends UntypedConsumerActor{
public void onReceive(Object message) { public void onReceive(Object message) {
if (message instanceof CamelMessage) { if (message instanceof CamelMessage) {
getSender().tell(Ack.getInstance()); getSender().tell(Ack.getInstance(), getSelf());
// on success // on success
// .. // ..
Exception someException = new Exception("e1"); Exception someException = new Exception("e1");
// on failure // on failure
getSender().tell(new Status.Failure(someException)); getSender().tell(new Status.Failure(someException), getSelf());
} else } else
unhandled(message); unhandled(message);
} }

View file

@ -23,7 +23,7 @@ public class Consumer4 extends UntypedConsumerActor {
if (message instanceof CamelMessage) { if (message instanceof CamelMessage) {
CamelMessage camelMessage = (CamelMessage) message; CamelMessage camelMessage = (CamelMessage) message;
String body = camelMessage.getBodyAs(String.class, getCamelContext()); String body = camelMessage.getBodyAs(String.class, getCamelContext());
getSender().tell(String.format("Hello %s",body)); getSender().tell(String.format("Hello %s",body), getSelf());
} else } else
unhandled(message); unhandled(message);
} }

View file

@ -36,7 +36,7 @@ public class ErrorThrowingConsumer extends UntypedConsumerActor{
@Override @Override
public void preRestart(Throwable reason, Option<Object> message) { public void preRestart(Throwable reason, Option<Object> message) {
getSender().tell(new Status.Failure(reason)); getSender().tell(new Status.Failure(reason), getSelf());
} }
} }
//#ErrorThrowingConsumer //#ErrorThrowingConsumer

View file

@ -1,7 +1,6 @@
package docs.camel; package docs.camel;
import akka.actor.*; import akka.actor.*;
import org.junit.Test;
public class OnRouteResponseTestBase { public class OnRouteResponseTestBase {
@ -18,7 +17,7 @@ public class OnRouteResponseTestBase {
ActorRef forwardResponse = system.actorOf(new Props(factory)); ActorRef forwardResponse = system.actorOf(new Props(factory));
// the Forwarder sends out a request to the web page and forwards the response to // the Forwarder sends out a request to the web page and forwards the response to
// the ResponseReceiver // the ResponseReceiver
forwardResponse.tell("some request"); forwardResponse.tell("some request", null);
//#RouteResponse //#RouteResponse
system.stop(receiver); system.stop(receiver);
system.stop(forwardResponse); system.stop(forwardResponse);

View file

@ -1,20 +1,14 @@
package docs.camel; package docs.camel;
import akka.actor.*;
import akka.camel.Camel;
import akka.camel.CamelExtension;
import akka.camel.CamelMessage;
import akka.pattern.Patterns;
import scala.concurrent.Future;
import scala.concurrent.util.Duration;
import scala.concurrent.util.FiniteDuration;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.junit.Test;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import scala.concurrent.Future;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.camel.CamelMessage;
import akka.pattern.Patterns;
public class ProducerTestBase { public class ProducerTestBase {
public void tellJmsProducer() { public void tellJmsProducer() {
@ -22,7 +16,7 @@ public class ProducerTestBase {
ActorSystem system = ActorSystem.create("some-system"); ActorSystem system = ActorSystem.create("some-system");
Props props = new Props(Orders.class); Props props = new Props(Orders.class);
ActorRef producer = system.actorOf(props, "jmsproducer"); ActorRef producer = system.actorOf(props, "jmsproducer");
producer.tell("<order amount=\"100\" currency=\"PLN\" itemId=\"12345\"/>"); producer.tell("<order amount=\"100\" currency=\"PLN\" itemId=\"12345\"/>", null);
//#TellProducer //#TellProducer
system.shutdown(); system.shutdown();
} }
@ -45,7 +39,7 @@ public class ProducerTestBase {
ActorRef producer = system.actorOf(props,"jmsproducer"); ActorRef producer = system.actorOf(props,"jmsproducer");
Map<String,Object> headers = new HashMap<String, Object>(); Map<String,Object> headers = new HashMap<String, Object>();
headers.put(CamelMessage.MessageExchangeId(),"123"); headers.put(CamelMessage.MessageExchangeId(),"123");
producer.tell(new CamelMessage("<order amount=\"100\" currency=\"PLN\" itemId=\"12345\"/>",headers)); producer.tell(new CamelMessage("<order amount=\"100\" currency=\"PLN\" itemId=\"12345\"/>",headers), null);
//#Correlate //#Correlate
system.stop(producer); system.stop(producer);
system.shutdown(); system.shutdown();

View file

@ -9,7 +9,7 @@ public class RequestBodyActor extends UntypedActor {
public void onReceive(Object message) { public void onReceive(Object message) {
Camel camel = CamelExtension.get(getContext().system()); Camel camel = CamelExtension.get(getContext().system());
ProducerTemplate template = camel.template(); ProducerTemplate template = camel.template();
getSender().tell(template.requestBody("direct:news", message)); getSender().tell(template.requestBody("direct:news", message), getSelf());
} }
} }
//#RequestProducerTemplate //#RequestProducerTemplate

View file

@ -9,7 +9,7 @@ public class Responder extends UntypedActor{
public void onReceive(Object message) { public void onReceive(Object message) {
if (message instanceof CamelMessage) { if (message instanceof CamelMessage) {
CamelMessage camelMessage = (CamelMessage) message; CamelMessage camelMessage = (CamelMessage) message;
getSender().tell(createResponse(camelMessage)); getSender().tell(createResponse(camelMessage), getSelf());
} else } else
unhandled(message); unhandled(message);
} }

View file

@ -16,9 +16,9 @@ public class HttpTransformer extends UntypedActor{
return text.replaceAll("Akka ", "AKKA "); return text.replaceAll("Akka ", "AKKA ");
} }
}); });
getSender().tell(replacedMessage); getSender().tell(replacedMessage, getSelf());
} else if (message instanceof Status.Failure) { } else if (message instanceof Status.Failure) {
getSender().tell(message); getSender().tell(message, getSelf());
} else } else
unhandled(message); unhandled(message);
} }

View file

@ -37,12 +37,10 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import scala.Option; import scala.Option;
import static org.junit.Assert.*;
import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigFactory;
import docs.actor.MyUntypedActor; import docs.actor.MyUntypedActor;
import docs.actor.UntypedActorDocTestBase.MyActor;
import akka.testkit.AkkaSpec; import akka.testkit.AkkaSpec;
public class DispatcherDocTestBase { public class DispatcherDocTestBase {
@ -89,14 +87,14 @@ public class DispatcherDocTestBase {
LoggingAdapter log = LoggingAdapter log =
Logging.getLogger(getContext().system(), this); Logging.getLogger(getContext().system(), this);
{ {
getSelf().tell("lowpriority"); getSelf().tell("lowpriority", getSelf());
getSelf().tell("lowpriority"); getSelf().tell("lowpriority", getSelf());
getSelf().tell("highpriority"); getSelf().tell("highpriority", getSelf());
getSelf().tell("pigdog"); getSelf().tell("pigdog", getSelf());
getSelf().tell("pigdog2"); getSelf().tell("pigdog2", getSelf());
getSelf().tell("pigdog3"); getSelf().tell("pigdog3", getSelf());
getSelf().tell("highpriority"); getSelf().tell("highpriority", getSelf());
getSelf().tell(PoisonPill.getInstance()); getSelf().tell(PoisonPill.getInstance(), getSelf());
} }
public void onReceive(Object message) { public void onReceive(Object message) {

View file

@ -21,7 +21,6 @@ import akka.event.Logging.Debug;
import org.junit.Test; import org.junit.Test;
import scala.Option; import scala.Option;
import static org.junit.Assert.*;
import akka.actor.UntypedActorFactory; import akka.actor.UntypedActorFactory;
//#imports-deadletter //#imports-deadletter
@ -42,7 +41,7 @@ public class LoggingDocTestBase {
return new MyActor(); return new MyActor();
} }
})); }));
myActor.tell("test"); myActor.tell("test", null);
system.shutdown(); system.shutdown();
} }
@ -96,7 +95,7 @@ public class LoggingDocTestBase {
class MyEventListener extends UntypedActor { class MyEventListener extends UntypedActor {
public void onReceive(Object message) { public void onReceive(Object message) {
if (message instanceof InitializeLogger) { if (message instanceof InitializeLogger) {
getSender().tell(Logging.loggerInitialized()); getSender().tell(Logging.loggerInitialized(), getSelf());
} else if (message instanceof Error) { } else if (message instanceof Error) {
// ... // ...
} else if (message instanceof Warning) { } else if (message instanceof Warning) {

View file

@ -534,13 +534,13 @@ public class FutureDocTestBase {
public static class MyActor extends UntypedActor { public static class MyActor extends UntypedActor {
public void onReceive(Object message) { public void onReceive(Object message) {
if (message instanceof String) { if (message instanceof String) {
getSender().tell(((String) message).toUpperCase()); getSender().tell(((String) message).toUpperCase(), getSelf());
} else if (message instanceof Integer) { } else if (message instanceof Integer) {
int i = ((Integer) message).intValue(); int i = ((Integer) message).intValue();
if (i < 0) { if (i < 0) {
getSender().tell(new Failure(new ArithmeticException("Negative values not supported"))); getSender().tell(new Failure(new ArithmeticException("Negative values not supported")), getSelf());
} else { } else {
getSender().tell(i); getSender().tell(i, getSelf());
} }
} else { } else {
unhandled(message); unhandled(message);

View file

@ -3,28 +3,37 @@
*/ */
package docs.jrouting; package docs.jrouting;
import java.util.List; import static akka.pattern.Patterns.ask;
import static docs.jrouting.CustomRouterDocTestBase.Message.DemocratCountResult;
import static docs.jrouting.CustomRouterDocTestBase.Message.DemocratVote;
import static docs.jrouting.CustomRouterDocTestBase.Message.RepublicanCountResult;
import static docs.jrouting.CustomRouterDocTestBase.Message.RepublicanVote;
import static org.junit.Assert.assertEquals;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals;
import akka.actor.*;
import akka.routing.*;
import scala.concurrent.util.Duration;
import akka.util.Timeout;
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.Future; import scala.concurrent.Future;
import scala.concurrent.util.Duration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.actor.UntypedActor;
import akka.dispatch.Dispatchers; import akka.dispatch.Dispatchers;
import akka.routing.CustomRoute;
import akka.routing.CustomRouterConfig;
import akka.routing.Destination;
import akka.routing.RoundRobinRouter;
import akka.routing.RouteeProvider;
import akka.testkit.AkkaSpec; import akka.testkit.AkkaSpec;
import com.typesafe.config.ConfigFactory; import akka.util.Timeout;
import static akka.pattern.Patterns.ask;
import static docs.jrouting.CustomRouterDocTestBase.DemocratActor;
import static docs.jrouting.CustomRouterDocTestBase.RepublicanActor;
import static docs.jrouting.CustomRouterDocTestBase.Message.*;
public class CustomRouterDocTestBase { public class CustomRouterDocTestBase {
@ -67,11 +76,11 @@ public class CustomRouterDocTestBase {
@Test @Test
public void countVotesAsIntendedNotAsInFlorida() throws Exception { public void countVotesAsIntendedNotAsInFlorida() throws Exception {
ActorRef routedActor = system.actorOf(new Props().withRouter(new VoteCountRouter())); ActorRef routedActor = system.actorOf(new Props().withRouter(new VoteCountRouter()));
routedActor.tell(DemocratVote); routedActor.tell(DemocratVote, null);
routedActor.tell(DemocratVote); routedActor.tell(DemocratVote, null);
routedActor.tell(RepublicanVote); routedActor.tell(RepublicanVote, null);
routedActor.tell(DemocratVote); routedActor.tell(DemocratVote, null);
routedActor.tell(RepublicanVote); routedActor.tell(RepublicanVote, null);
Timeout timeout = new Timeout(Duration.create(1, "seconds")); Timeout timeout = new Timeout(Duration.create(1, "seconds"));
Future<Object> democratsResult = ask(routedActor, DemocratCountResult, timeout); Future<Object> democratsResult = ask(routedActor, DemocratCountResult, timeout);
Future<Object> republicansResult = ask(routedActor, RepublicanCountResult, timeout); Future<Object> republicansResult = ask(routedActor, RepublicanCountResult, timeout);

View file

@ -12,7 +12,7 @@ public class FibonacciActor extends UntypedActor {
public void onReceive(Object msg) { public void onReceive(Object msg) {
if (msg instanceof FibonacciNumber) { if (msg instanceof FibonacciNumber) {
FibonacciNumber fibonacciNumber = (FibonacciNumber) msg; FibonacciNumber fibonacciNumber = (FibonacciNumber) msg;
getSender().tell(fibonacci(fibonacciNumber.getNbr())); getSender().tell(fibonacci(fibonacciNumber.getNbr()), getSelf());
} else { } else {
unhandled(msg); unhandled(msg);
} }

View file

@ -45,14 +45,14 @@ public class RouterViaConfigExample {
ActorRef router = system.actorOf(new Props(ExampleActor.class).withRouter(new FromConfig()), "router"); ActorRef router = system.actorOf(new Props(ExampleActor.class).withRouter(new FromConfig()), "router");
//#configurableRouting //#configurableRouting
for (int i = 1; i <= 10; i++) { for (int i = 1; i <= 10; i++) {
router.tell(new ExampleActor.Message(i)); router.tell(new ExampleActor.Message(i), null);
} }
//#configurableRoutingWithResizer //#configurableRoutingWithResizer
ActorRef router2 = system.actorOf(new Props(ExampleActor.class).withRouter(new FromConfig()), "router2"); ActorRef router2 = system.actorOf(new Props(ExampleActor.class).withRouter(new FromConfig()), "router2");
//#configurableRoutingWithResizer //#configurableRoutingWithResizer
for (int i = 1; i <= 10; i++) { for (int i = 1; i <= 10; i++) {
router2.tell(new ExampleActor.Message(i)); router2.tell(new ExampleActor.Message(i), null);
} }
} }
} }

View file

@ -47,7 +47,7 @@ public class RouterViaProgramExample {
ActorRef router1 = system.actorOf(new Props(ExampleActor.class).withRouter(new RoundRobinRouter(nrOfInstances))); ActorRef router1 = system.actorOf(new Props(ExampleActor.class).withRouter(new RoundRobinRouter(nrOfInstances)));
//#programmaticRoutingNrOfInstances //#programmaticRoutingNrOfInstances
for (int i = 1; i <= 6; i++) { for (int i = 1; i <= 6; i++) {
router1.tell(new ExampleActor.Message(i)); router1.tell(new ExampleActor.Message(i), null);
} }
//#programmaticRoutingRoutees //#programmaticRoutingRoutees
@ -58,7 +58,7 @@ public class RouterViaProgramExample {
ActorRef router2 = system.actorOf(new Props().withRouter(RoundRobinRouter.create(routees))); ActorRef router2 = system.actorOf(new Props().withRouter(RoundRobinRouter.create(routees)));
//#programmaticRoutingRoutees //#programmaticRoutingRoutees
for (int i = 1; i <= 6; i++) { for (int i = 1; i <= 6; i++) {
router2.tell(new ExampleActor.Message(i)); router2.tell(new ExampleActor.Message(i), null);
} }
//#programmaticRoutingWithResizer //#programmaticRoutingWithResizer
@ -68,7 +68,7 @@ public class RouterViaProgramExample {
ActorRef router3 = system.actorOf(new Props(ExampleActor.class).withRouter(new RoundRobinRouter(nrOfInstances))); ActorRef router3 = system.actorOf(new Props(ExampleActor.class).withRouter(new RoundRobinRouter(nrOfInstances)));
//#programmaticRoutingWithResizer //#programmaticRoutingWithResizer
for (int i = 1; i <= 6; i++) { for (int i = 1; i <= 6; i++) {
router3.tell(new ExampleActor.Message(i)); router3.tell(new ExampleActor.Message(i), null);
} }
//#remoteRoutees //#remoteRoutees

View file

@ -9,7 +9,6 @@ import static org.junit.Assert.*;
import akka.actor.*; import akka.actor.*;
import akka.remote.RemoteActorRefProvider; import akka.remote.RemoteActorRefProvider;
import akka.serialization.*; import akka.serialization.*;
import com.typesafe.config.*;
//#imports //#imports

View file

@ -97,7 +97,7 @@ public class TestKitDocTest {
public void demonstrateWithin() { public void demonstrateWithin() {
//#test-within //#test-within
new JavaTestKit(system) {{ new JavaTestKit(system) {{
getRef().tell(42); getRef().tell(42, null);
new Within(Duration.Zero(), Duration.create(1, "second")) { new Within(Duration.Zero(), Duration.create(1, "second")) {
// do not put code outside this method, will run afterwards // do not put code outside this method, will run afterwards
public void run() { public void run() {
@ -112,7 +112,7 @@ public class TestKitDocTest {
public void demonstrateExpectMsg() { public void demonstrateExpectMsg() {
//#test-expectmsg //#test-expectmsg
new JavaTestKit(system) {{ new JavaTestKit(system) {{
getRef().tell(42); getRef().tell(42, null);
final String out = new ExpectMsg<String>("match hint") { final String out = new ExpectMsg<String>("match hint") {
// do not put code outside this method, will run afterwards // do not put code outside this method, will run afterwards
protected String match(Object in) { protected String match(Object in) {
@ -132,9 +132,9 @@ public class TestKitDocTest {
public void demonstrateReceiveWhile() { public void demonstrateReceiveWhile() {
//#test-receivewhile //#test-receivewhile
new JavaTestKit(system) {{ new JavaTestKit(system) {{
getRef().tell(42); getRef().tell(42, null);
getRef().tell(43); getRef().tell(43, null);
getRef().tell("hello"); getRef().tell("hello", null);
final String[] out = final String[] out =
new ReceiveWhile<String>(String.class, duration("1 second")) { new ReceiveWhile<String>(String.class, duration("1 second")) {
// do not put code outside this method, will run afterwards // do not put code outside this method, will run afterwards
@ -172,7 +172,7 @@ public class TestKitDocTest {
public void demonstrateAwaitCond() { public void demonstrateAwaitCond() {
//#test-awaitCond //#test-awaitCond
new JavaTestKit(system) {{ new JavaTestKit(system) {{
getRef().tell(42); getRef().tell(42, null);
new AwaitCond( new AwaitCond(
duration("1 second"), // maximum wait time duration("1 second"), // maximum wait time
duration("100 millis") // interval at which to check the condition duration("100 millis") // interval at which to check the condition
@ -191,12 +191,12 @@ public class TestKitDocTest {
@SuppressWarnings("unchecked") // due to generic varargs @SuppressWarnings("unchecked") // due to generic varargs
public void demonstrateExpect() { public void demonstrateExpect() {
new JavaTestKit(system) {{ new JavaTestKit(system) {{
getRef().tell("hello"); getRef().tell("hello", null);
getRef().tell("hello"); getRef().tell("hello", null);
getRef().tell("hello"); getRef().tell("hello", null);
getRef().tell("world"); getRef().tell("world", null);
getRef().tell(42); getRef().tell(42, null);
getRef().tell(42); getRef().tell(42, null);
//#test-expect //#test-expect
final String hello = expectMsgEquals("hello"); final String hello = expectMsgEquals("hello");
final Object any = expectMsgAnyOf("hello", "world"); final Object any = expectMsgAnyOf("hello", "world");
@ -223,12 +223,12 @@ public class TestKitDocTest {
return msg instanceof String; return msg instanceof String;
} }
}; };
getRef().tell("hello"); getRef().tell("hello", null);
getRef().tell(42); getRef().tell(42, null);
expectMsgEquals(42); expectMsgEquals(42);
// remove message filter // remove message filter
ignoreNoMsg(); ignoreNoMsg();
getRef().tell("hello"); getRef().tell("hello", null);
expectMsgEquals("hello"); expectMsgEquals("hello");
}}; }};
//#test-ignoreMsg //#test-ignoreMsg
@ -294,7 +294,7 @@ public class TestKitDocTest {
} }
final MyProbe probe = new MyProbe(); final MyProbe probe = new MyProbe();
probe.getRef().tell("hello"); probe.getRef().tell("hello", null);
probe.assertHello(); probe.assertHello();
}}; }};
//#test-special-probe //#test-special-probe
@ -354,7 +354,7 @@ public class TestKitDocTest {
// install auto-pilot // install auto-pilot
probe.setAutoPilot(new TestActor.AutoPilot() { probe.setAutoPilot(new TestActor.AutoPilot() {
public AutoPilot run(ActorRef sender, Object msg) { public AutoPilot run(ActorRef sender, Object msg) {
sender.tell(msg); sender.tell(msg, null);
return noAutoPilot(); return noAutoPilot();
} }
}); });
@ -386,7 +386,7 @@ public class TestKitDocTest {
final int result = new EventFilter<Integer>(ActorKilledException.class) { final int result = new EventFilter<Integer>(ActorKilledException.class) {
protected Integer run() { protected Integer run() {
victim.tell(Kill.getInstance()); victim.tell(Kill.getInstance(), null);
return 42; return 42;
} }
}.from("akka://demoSystem/user/victim").occurrences(1).exec(); }.from("akka://demoSystem/user/victim").occurrences(1).exec();

View file

@ -24,12 +24,12 @@ public class TestKitSampleTest {
public void onReceive(Object msg) { public void onReceive(Object msg) {
if (msg.equals("hello")) { if (msg.equals("hello")) {
getSender().tell("world"); getSender().tell("world", getSelf());
if (target != null) target.forward(msg, getContext()); if (target != null) target.forward(msg, getContext());
} else if (msg instanceof ActorRef) { } else if (msg instanceof ActorRef) {
target = (ActorRef) msg; target = (ActorRef) msg;
getSender().tell("done"); getSender().tell("done", getSelf());
} }
} }
} }

View file

@ -20,7 +20,7 @@ public class CoordinatedCounter extends UntypedActor {
if (message instanceof Increment) { if (message instanceof Increment) {
Increment increment = (Increment) message; Increment increment = (Increment) message;
if (increment.hasFriend()) { if (increment.hasFriend()) {
increment.getFriend().tell(coordinated.coordinate(new Increment())); increment.getFriend().tell(coordinated.coordinate(new Increment()), getSelf());
} }
coordinated.atomic(new Runnable() { coordinated.atomic(new Runnable() {
public void run() { public void run() {
@ -29,7 +29,7 @@ public class CoordinatedCounter extends UntypedActor {
}); });
} }
} else if ("GetCount".equals(incoming)) { } else if ("GetCount".equals(incoming)) {
getSender().tell(count.get()); getSender().tell(count.get(), getSelf());
} else { } else {
unhandled(incoming); unhandled(incoming);
} }

View file

@ -20,7 +20,7 @@ public class Counter extends UntypedTransactor {
@Override public boolean normally(Object message) { @Override public boolean normally(Object message) {
if ("GetCount".equals(message)) { if ("GetCount".equals(message)) {
getSender().tell(count.get()); getSender().tell(count.get(), getSelf());
return true; return true;
} else return false; } else return false;
} }

View file

@ -5,7 +5,6 @@
package docs.transactor; package docs.transactor;
//#class //#class
import akka.actor.*;
import akka.transactor.*; import akka.transactor.*;
import java.util.Set; import java.util.Set;
import scala.concurrent.stm.Ref; import scala.concurrent.stm.Ref;
@ -31,7 +30,7 @@ public class FriendlyCounter extends UntypedTransactor {
@Override public boolean normally(Object message) { @Override public boolean normally(Object message) {
if ("GetCount".equals(message)) { if ("GetCount".equals(message)) {
getSender().tell(count.get()); getSender().tell(count.get(), getSelf());
return true; return true;
} else return false; } else return false;
} }

View file

@ -12,7 +12,6 @@ import akka.actor.*;
import scala.concurrent.Await; import scala.concurrent.Await;
import static akka.pattern.Patterns.ask; import static akka.pattern.Patterns.ask;
import akka.transactor.Coordinated; import akka.transactor.Coordinated;
import scala.concurrent.util.Duration;
import akka.util.Timeout; import akka.util.Timeout;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
//#imports //#imports
@ -29,7 +28,7 @@ public class TransactorDocTest {
Timeout timeout = new Timeout(5, SECONDS); Timeout timeout = new Timeout(5, SECONDS);
counter1.tell(new Coordinated(new Increment(counter2), timeout)); counter1.tell(new Coordinated(new Increment(counter2), timeout), null);
Integer count = (Integer) Await.result(ask(counter1, "GetCount", timeout), timeout.duration()); Integer count = (Integer) Await.result(ask(counter1, "GetCount", timeout), timeout.duration());
//#coordinated-example //#coordinated-example
@ -50,11 +49,11 @@ public class TransactorDocTest {
ActorRef actor = system.actorOf(new Props(Coordinator.class)); ActorRef actor = system.actorOf(new Props(Coordinator.class));
//#send-coordinated //#send-coordinated
actor.tell(new Coordinated(new Message(), timeout)); actor.tell(new Coordinated(new Message(), timeout), null);
//#send-coordinated //#send-coordinated
//#include-coordinated //#include-coordinated
actor.tell(coordinated.coordinate(new Message())); actor.tell(coordinated.coordinate(new Message()), null);
//#include-coordinated //#include-coordinated
coordinated.await(); coordinated.await();
@ -69,7 +68,7 @@ public class TransactorDocTest {
Timeout timeout = new Timeout(5, SECONDS); Timeout timeout = new Timeout(5, SECONDS);
Coordinated coordinated = new Coordinated(timeout); Coordinated coordinated = new Coordinated(timeout);
counter.tell(coordinated.coordinate(new Increment())); counter.tell(coordinated.coordinate(new Increment()), null);
coordinated.await(); coordinated.await();
Integer count = (Integer) Await.result(ask(counter, "GetCount", timeout), timeout.duration()); Integer count = (Integer) Await.result(ask(counter, "GetCount", timeout), timeout.duration());
@ -86,7 +85,7 @@ public class TransactorDocTest {
Timeout timeout = new Timeout(5, SECONDS); Timeout timeout = new Timeout(5, SECONDS);
Coordinated coordinated = new Coordinated(timeout); Coordinated coordinated = new Coordinated(timeout);
friendlyCounter.tell(coordinated.coordinate(new Increment(friend))); friendlyCounter.tell(coordinated.coordinate(new Increment(friend)), null);
coordinated.await(); coordinated.await();
Integer count1 = (Integer) Await.result(ask(friendlyCounter, "GetCount", timeout), timeout.duration()); Integer count1 = (Integer) Await.result(ask(friendlyCounter, "GetCount", timeout), timeout.duration());

View file

@ -46,7 +46,6 @@ import com.typesafe.config.ConfigFactory;
import java.lang.management.MemoryMXBean; import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage; import java.lang.management.MemoryUsage;
import java.lang.management.OperatingSystemMXBean; import java.lang.management.OperatingSystemMXBean;
import java.util.concurrent.TimeUnit;
import java.util.Date; import java.util.Date;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
@ -58,8 +57,6 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.Assume; import org.junit.Assume;
import akka.zeromq.SocketType;
public class ZeromqDocTestBase { public class ZeromqDocTestBase {
ActorSystem system; ActorSystem system;
@ -95,12 +92,12 @@ public class ZeromqDocTestBase {
//#sub-topic-socket //#sub-topic-socket
//#unsub-topic-socket //#unsub-topic-socket
subTopicSocket.tell(new Unsubscribe("foo.bar")); subTopicSocket.tell(new Unsubscribe("foo.bar"), null);
//#unsub-topic-socket //#unsub-topic-socket
byte[] payload = new byte[0]; byte[] payload = new byte[0];
//#pub-topic //#pub-topic
pubSocket.tell(new ZMQMessage(new Frame("foo.bar"), new Frame(payload))); pubSocket.tell(new ZMQMessage(new Frame("foo.bar"), new Frame(payload)), null);
//#pub-topic //#pub-topic
//#high-watermark //#high-watermark
@ -205,12 +202,12 @@ public class ZeromqDocTestBase {
byte[] heapPayload = ser.serializerFor(Heap.class).toBinary( byte[] heapPayload = ser.serializerFor(Heap.class).toBinary(
new Heap(timestamp, currentHeap.getUsed(), currentHeap.getMax())); new Heap(timestamp, currentHeap.getUsed(), currentHeap.getMax()));
// the first frame is the topic, second is the message // the first frame is the topic, second is the message
pubSocket.tell(new ZMQMessage(new Frame("health.heap"), new Frame(heapPayload))); pubSocket.tell(new ZMQMessage(new Frame("health.heap"), new Frame(heapPayload)), getSelf());
// use akka SerializationExtension to convert to bytes // use akka SerializationExtension to convert to bytes
byte[] loadPayload = ser.serializerFor(Load.class).toBinary(new Load(timestamp, os.getSystemLoadAverage())); byte[] loadPayload = ser.serializerFor(Load.class).toBinary(new Load(timestamp, os.getSystemLoadAverage()));
// the first frame is the topic, second is the message // the first frame is the topic, second is the message
pubSocket.tell(new ZMQMessage(new Frame("health.load"), new Frame(loadPayload))); pubSocket.tell(new ZMQMessage(new Frame("health.load"), new Frame(loadPayload)), getSelf());
} else { } else {
unhandled(message); unhandled(message);
} }

View file

@ -18,8 +18,6 @@ import com.typesafe.config.ConfigFactory;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import static org.junit.Assert.*;
public class DurableMailboxDocTestBase { public class DurableMailboxDocTestBase {
ActorSystem system; ActorSystem system;
@ -41,7 +39,7 @@ public class DurableMailboxDocTestBase {
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class). ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).
withDispatcher("my-dispatcher"), "myactor"); withDispatcher("my-dispatcher"), "myactor");
//#dispatcher-config-use //#dispatcher-config-use
myActor.tell("test"); myActor.tell("test", null);
} }
public static class MyUntypedActor extends UntypedActor { public static class MyUntypedActor extends UntypedActor {

View file

@ -29,7 +29,7 @@ class PrintlnActor extends Actor {
//#fibonacciActor //#fibonacciActor
class FibonacciActor extends Actor { class FibonacciActor extends Actor {
def receive = { def receive = {
case FibonacciNumber(nbr) sender tell fibonacci(nbr) case FibonacciNumber(nbr) sender ! fibonacci(nbr)
} }
private def fibonacci(n: Int): Int = { private def fibonacci(n: Int): Int = {

View file

@ -13,28 +13,31 @@ public class HelloKernel implements Bootable {
final ActorSystem system = ActorSystem.create("hellokernel"); final ActorSystem system = ActorSystem.create("hellokernel");
static class HelloActor extends UntypedActor { static class HelloActor extends UntypedActor {
final ActorRef worldActor = final ActorRef worldActor = getContext().actorOf(
getContext().actorOf(new Props(WorldActor.class)); new Props(WorldActor.class));
public void onReceive(Object message) { public void onReceive(Object message) {
if (message == "start") if (message == "start")
worldActor.tell("Hello"); worldActor.tell("Hello", getSelf());
else if (message instanceof String) else if (message instanceof String)
System.out.println("Received message '%s'".format((String)message)); System.out.println(String.format("Received message '%s'", message));
else unhandled(message); else
unhandled(message);
}
} }
}
static class WorldActor extends UntypedActor { static class WorldActor extends UntypedActor {
public void onReceive(Object message) { public void onReceive(Object message) {
if (message instanceof String) if (message instanceof String)
getSender().tell(((String)message).toUpperCase() + " world!"); getSender().tell(((String) message).toUpperCase() + " world!",
else unhandled(message); getSelf());
else
unhandled(message);
}
} }
}
public void startup() { public void startup() {
system.actorOf(new Props(HelloActor.class)).tell("start"); system.actorOf(new Props(HelloActor.class)).tell("start", null);
} }
public void shutdown() { public void shutdown() {

View file

@ -12,17 +12,23 @@ public class JAdvancedCalculatorActor extends UntypedActor {
if (message instanceof Op.Multiply) { if (message instanceof Op.Multiply) {
Op.Multiply multiply = (Op.Multiply) message; Op.Multiply multiply = (Op.Multiply) message;
System.out.println("Calculating " + multiply.getN1() + " * " + multiply.getN2()); System.out.println("Calculating " + multiply.getN1() + " * "
getSender().tell(new Op.MultiplicationResult(multiply.getN1(), multiply.getN2(), multiply.getN1() * multiply.getN2())); + multiply.getN2());
getSender().tell(
new Op.MultiplicationResult(multiply.getN1(), multiply.getN2(),
multiply.getN1() * multiply.getN2()), getSelf());
} else if (message instanceof Op.Divide) { } else if (message instanceof Op.Divide) {
Op.Divide divide = (Op.Divide) message; Op.Divide divide = (Op.Divide) message;
System.out.println("Calculating " + divide.getN1() + " / " + divide.getN2()); System.out.println("Calculating " + divide.getN1() + " / "
getSender().tell(new Op.DivisionResult(divide.getN1(), divide.getN2(), divide.getN1() / divide.getN2())); + divide.getN2());
getSender().tell(
new Op.DivisionResult(divide.getN1(), divide.getN2(), divide.getN1()
/ divide.getN2()), getSelf());
} else { } else {
unhandled(message); unhandled(message);
} }
} }
} }
//#actor // #actor

View file

@ -14,8 +14,10 @@ public class JCalculatorApplication implements Bootable {
private ActorSystem system; private ActorSystem system;
public JCalculatorApplication() { public JCalculatorApplication() {
system = ActorSystem.create("CalculatorApplication", ConfigFactory.load().getConfig("calculator")); system = ActorSystem.create("CalculatorApplication", ConfigFactory.load()
ActorRef actor = system.actorOf(new Props(JSimpleCalculatorActor.class), "simpleCalculator"); .getConfig("calculator"));
ActorRef actor = system.actorOf(new Props(JSimpleCalculatorActor.class),
"simpleCalculator");
} }
@Override @Override
@ -27,4 +29,4 @@ public class JCalculatorApplication implements Bootable {
system.shutdown(); system.shutdown();
} }
} }
//#setup // #setup

View file

@ -16,13 +16,15 @@ public class JCreationApplication implements Bootable {
private ActorRef remoteActor; private ActorRef remoteActor;
public JCreationApplication() { public JCreationApplication() {
system = ActorSystem.create("CreationApplication", ConfigFactory.load().getConfig("remotecreation")); system = ActorSystem.create("CreationApplication", ConfigFactory.load()
.getConfig("remotecreation"));
actor = system.actorOf(new Props(JCreationActor.class)); actor = system.actorOf(new Props(JCreationActor.class));
remoteActor = system.actorOf(new Props(JAdvancedCalculatorActor.class), "advancedCalculator"); remoteActor = system.actorOf(new Props(JAdvancedCalculatorActor.class),
"advancedCalculator");
} }
public void doSomething(Op.MathOp mathOp) { public void doSomething(Op.MathOp mathOp) {
actor.tell(new InternalMsg.MathOpMsg(remoteActor, mathOp)); actor.tell(new InternalMsg.MathOpMsg(remoteActor, mathOp), null);
} }
@Override @Override
@ -34,4 +36,4 @@ public class JCreationApplication implements Bootable {
system.shutdown(); system.shutdown();
} }
} }
//#setup // #setup

View file

@ -7,7 +7,6 @@ package sample.remote.calculator.java;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.Props; import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.kernel.Bootable; import akka.kernel.Bootable;
import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigFactory;
//#imports //#imports
@ -19,13 +18,15 @@ public class JLookupApplication implements Bootable {
private ActorRef remoteActor; private ActorRef remoteActor;
public JLookupApplication() { public JLookupApplication() {
system = ActorSystem.create("LookupApplication", ConfigFactory.load().getConfig("remotelookup")); system = ActorSystem.create("LookupApplication", ConfigFactory.load()
.getConfig("remotelookup"));
actor = system.actorOf(new Props(JLookupActor.class)); actor = system.actorOf(new Props(JLookupActor.class));
remoteActor = system.actorFor("akka://CalculatorApplication@127.0.0.1:2552/user/simpleCalculator"); remoteActor = system
.actorFor("akka://CalculatorApplication@127.0.0.1:2552/user/simpleCalculator");
} }
public void doSomething(Op.MathOp mathOp) { public void doSomething(Op.MathOp mathOp) {
actor.tell(new InternalMsg.MathOpMsg(remoteActor, mathOp)); actor.tell(new InternalMsg.MathOpMsg(remoteActor, mathOp), null);
} }
@Override @Override
@ -37,4 +38,4 @@ public class JLookupApplication implements Bootable {
system.shutdown(); system.shutdown();
} }
} }
//#setup // #setup

View file

@ -13,16 +13,22 @@ public class JSimpleCalculatorActor extends UntypedActor {
if (message instanceof Op.Add) { if (message instanceof Op.Add) {
Op.Add add = (Op.Add) message; Op.Add add = (Op.Add) message;
System.out.println("Calculating " + add.getN1() + " + " + add.getN2()); System.out.println("Calculating " + add.getN1() + " + " + add.getN2());
getSender().tell(new Op.AddResult(add.getN1(), add.getN2(), add.getN1() + add.getN2())); getSender()
.tell(
new Op.AddResult(add.getN1(), add.getN2(), add.getN1()
+ add.getN2()), getSelf());
} else if (message instanceof Op.Subtract) { } else if (message instanceof Op.Subtract) {
Op.Subtract subtract = (Op.Subtract) message; Op.Subtract subtract = (Op.Subtract) message;
System.out.println("Calculating " + subtract.getN1() + " - " + subtract.getN2()); System.out.println("Calculating " + subtract.getN1() + " - "
getSender().tell(new Op.SubtractResult(subtract.getN1(), subtract.getN2(), subtract.getN1() - subtract.getN2())); + subtract.getN2());
getSender().tell(
new Op.SubtractResult(subtract.getN1(), subtract.getN2(),
subtract.getN1() - subtract.getN2()), getSelf());
} else { } else {
unhandled(message); unhandled(message);
} }
} }
} }
//#actor // #actor

View file

@ -7,11 +7,14 @@ import java.io.Serializable;
public class Op { public class Op {
public interface MathOp extends Serializable {} public interface MathOp extends Serializable {
}
public interface MathResult extends Serializable {} public interface MathResult extends Serializable {
}
static class Add implements MathOp { static class Add implements MathOp {
private static final long serialVersionUID = 1L;
private final int n1; private final int n1;
private final int n2; private final int n2;
@ -30,6 +33,7 @@ public class Op {
} }
static class AddResult implements MathResult { static class AddResult implements MathResult {
private static final long serialVersionUID = 1L;
private final int n1; private final int n1;
private final int n2; private final int n2;
private final int result; private final int result;
@ -54,6 +58,7 @@ public class Op {
} }
static class Subtract implements MathOp { static class Subtract implements MathOp {
private static final long serialVersionUID = 1L;
private final int n1; private final int n1;
private final int n2; private final int n2;
@ -72,6 +77,7 @@ public class Op {
} }
static class SubtractResult implements MathResult { static class SubtractResult implements MathResult {
private static final long serialVersionUID = 1L;
private final int n1; private final int n1;
private final int n2; private final int n2;
private final int result; private final int result;
@ -96,6 +102,7 @@ public class Op {
} }
static class Multiply implements MathOp { static class Multiply implements MathOp {
private static final long serialVersionUID = 1L;
private final int n1; private final int n1;
private final int n2; private final int n2;
@ -114,6 +121,7 @@ public class Op {
} }
static class MultiplicationResult implements MathResult { static class MultiplicationResult implements MathResult {
private static final long serialVersionUID = 1L;
private final int n1; private final int n1;
private final int n2; private final int n2;
private final int result; private final int result;
@ -138,6 +146,7 @@ public class Op {
} }
static class Divide implements MathOp { static class Divide implements MathOp {
private static final long serialVersionUID = 1L;
private final double n1; private final double n1;
private final int n2; private final int n2;
@ -156,6 +165,7 @@ public class Op {
} }
static class DivisionResult implements MathResult { static class DivisionResult implements MathResult {
private static final long serialVersionUID = 1L;
private final double n1; private final double n1;
private final int n2; private final int n2;
private final double result; private final double result;

View file

@ -23,7 +23,7 @@ abstract class UntypedTransactor extends UntypedActor {
case coordinated @ Coordinated(message) { case coordinated @ Coordinated(message) {
val others = coordinate(message) val others = coordinate(message)
for (sendTo others) { for (sendTo others) {
sendTo.actor.tell(coordinated(sendTo.message.getOrElse(message))) sendTo.actor ! coordinated(sendTo.message.getOrElse(message))
} }
before(message) before(message)
coordinated.atomic { txn atomically(message) } coordinated.atomic { txn atomically(message) }

View file

@ -4,13 +4,13 @@
package akka.transactor; package akka.transactor;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.STM;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.STM;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
public class UntypedCoordinatedCounter extends UntypedActor { public class UntypedCoordinatedCounter extends UntypedActor {
private String name; private String name;
@ -35,7 +35,7 @@ public class UntypedCoordinatedCounter extends UntypedActor {
}; };
if (!friends.isEmpty()) { if (!friends.isEmpty()) {
Increment coordMessage = new Increment(friends.subList(1, friends.size()), latch); Increment coordMessage = new Increment(friends.subList(1, friends.size()), latch);
friends.get(0).tell(coordinated.coordinate(coordMessage)); friends.get(0).tell(coordinated.coordinate(coordMessage), getSelf());
} }
coordinated.atomic(new Runnable() { coordinated.atomic(new Runnable() {
public void run() { public void run() {
@ -45,7 +45,7 @@ public class UntypedCoordinatedCounter extends UntypedActor {
}); });
} }
} else if ("GetCount".equals(incoming)) { } else if ("GetCount".equals(incoming)) {
getSender().tell(count.get()); getSender().tell(count.get(), getSelf());
} }
} }
} }

View file

@ -75,7 +75,7 @@ public class UntypedCoordinatedIncrementTest {
public void incrementAllCountersWithSuccessfulTransaction() throws Exception { public void incrementAllCountersWithSuccessfulTransaction() throws Exception {
CountDownLatch incrementLatch = new CountDownLatch(numCounters); CountDownLatch incrementLatch = new CountDownLatch(numCounters);
Increment message = new Increment(counters.subList(1, counters.size()), incrementLatch); Increment message = new Increment(counters.subList(1, counters.size()), incrementLatch);
counters.get(0).tell(new Coordinated(message, timeout)); counters.get(0).tell(new Coordinated(message, timeout), null);
try { try {
incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS); incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException exception) { } catch (InterruptedException exception) {
@ -97,7 +97,7 @@ public class UntypedCoordinatedIncrementTest {
List<ActorRef> actors = new ArrayList<ActorRef>(counters); List<ActorRef> actors = new ArrayList<ActorRef>(counters);
actors.add(failer); actors.add(failer);
Increment message = new Increment(actors.subList(1, actors.size()), incrementLatch); Increment message = new Increment(actors.subList(1, actors.size()), incrementLatch);
actors.get(0).tell(new Coordinated(message, timeout)); actors.get(0).tell(new Coordinated(message, timeout), null);
try { try {
incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS); incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException exception) { } catch (InterruptedException exception) {

View file

@ -4,15 +4,12 @@
package akka.transactor; package akka.transactor;
import akka.actor.ActorRef;
import akka.transactor.UntypedTransactor;
import akka.transactor.SendTo;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.STM;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.STM;
import akka.actor.ActorRef;
public class UntypedCounter extends UntypedTransactor { public class UntypedCounter extends UntypedTransactor {
private String name; private String name;
@ -52,7 +49,7 @@ public class UntypedCounter extends UntypedTransactor {
@Override public boolean normally(Object message) { @Override public boolean normally(Object message) {
if ("GetCount".equals(message)) { if ("GetCount".equals(message)) {
getSender().tell(count.get()); getSender().tell(count.get(), getSelf());
return true; return true;
} else return false; } else return false;
} }

View file

@ -4,8 +4,6 @@
package akka.transactor; package akka.transactor;
import scala.concurrent.stm.InTxn;
public class UntypedFailer extends UntypedTransactor { public class UntypedFailer extends UntypedTransactor {
public void atomically(Object message) throws Exception { public void atomically(Object message) throws Exception {
throw new ExpectedFailureException(); throw new ExpectedFailureException();

View file

@ -63,6 +63,8 @@ public class UntypedTransactorTest {
for (int i = 1; i <= numCounters; i++) { for (int i = 1; i <= numCounters; i++) {
final String name = "counter" + i; final String name = "counter" + i;
ActorRef counter = system.actorOf(new Props(new UntypedActorFactory() { ActorRef counter = system.actorOf(new Props(new UntypedActorFactory() {
private static final long serialVersionUID = 1L;
public UntypedActor create() { public UntypedActor create() {
return new UntypedCounter(name); return new UntypedCounter(name);
} }
@ -75,8 +77,9 @@ public class UntypedTransactorTest {
@Test @Test
public void incrementAllCountersWithSuccessfulTransaction() throws Exception { public void incrementAllCountersWithSuccessfulTransaction() throws Exception {
CountDownLatch incrementLatch = new CountDownLatch(numCounters); CountDownLatch incrementLatch = new CountDownLatch(numCounters);
Increment message = new Increment(counters.subList(1, counters.size()), incrementLatch); Increment message = new Increment(counters.subList(1, counters.size()),
counters.get(0).tell(message); incrementLatch);
counters.get(0).tell(message, null);
try { try {
incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS); incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException exception) { } catch (InterruptedException exception) {
@ -90,15 +93,19 @@ public class UntypedTransactorTest {
@Test @Test
public void incrementNoCountersWithFailingTransaction() throws Exception { public void incrementNoCountersWithFailingTransaction() throws Exception {
EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter(ExpectedFailureException.class); EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter(
EventFilter coordinatedFilter = (EventFilter) new ErrorFilter(CoordinatedTransactionException.class); ExpectedFailureException.class);
Seq<EventFilter> ignoreExceptions = seq(expectedFailureFilter, coordinatedFilter); EventFilter coordinatedFilter = (EventFilter) new ErrorFilter(
CoordinatedTransactionException.class);
Seq<EventFilter> ignoreExceptions = seq(expectedFailureFilter,
coordinatedFilter);
system.eventStream().publish(new TestEvent.Mute(ignoreExceptions)); system.eventStream().publish(new TestEvent.Mute(ignoreExceptions));
CountDownLatch incrementLatch = new CountDownLatch(numCounters); CountDownLatch incrementLatch = new CountDownLatch(numCounters);
List<ActorRef> actors = new ArrayList<ActorRef>(counters); List<ActorRef> actors = new ArrayList<ActorRef>(counters);
actors.add(failer); actors.add(failer);
Increment message = new Increment(actors.subList(1, actors.size()), incrementLatch); Increment message = new Increment(actors.subList(1, actors.size()),
actors.get(0).tell(message); incrementLatch);
actors.get(0).tell(message, null);
try { try {
incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS); incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException exception) { } catch (InterruptedException exception) {
@ -111,6 +118,8 @@ public class UntypedTransactorTest {
} }
public <A> Seq<A> seq(A... args) { public <A> Seq<A> seq(A... args) {
return JavaConverters.collectionAsScalaIterableConverter(Arrays.asList(args)).asScala().toSeq(); return JavaConverters
.collectionAsScalaIterableConverter(Arrays.asList(args)).asScala()
.toSeq();
} }
} }