Add untyped transactor

This commit is contained in:
Peter Vlugter 2010-11-12 16:09:31 +13:00
parent 4cdc46c23f
commit db6d90df43
22 changed files with 422 additions and 39 deletions

View file

@ -7,16 +7,11 @@ package akka.transactor
import akka.actor.{Actor, ActorRef} import akka.actor.{Actor, ActorRef}
import akka.stm.{DefaultTransactionConfig, TransactionFactory} import akka.stm.{DefaultTransactionConfig, TransactionFactory}
/** /**
* Transactor. See companion class for information. * Used for specifying actor refs and messages to send to during coordination.
*/ */
object Transactor { case class SendTo(actor: ActorRef, message: Option[Any] = None)
/**
* Used internally by Transactor for collecting actor refs and messages
* to send to during coordination.
*/
case class SendTo(actor: ActorRef, message: Option[Any] = None)
}
/** /**
* An actor with built-in support for coordinated transactions. * An actor with built-in support for coordinated transactions.
@ -100,8 +95,6 @@ object Transactor {
* @see [[akka.stm.Coordinated]] for more information about the underlying mechanism * @see [[akka.stm.Coordinated]] for more information about the underlying mechanism
*/ */
trait Transactor extends Actor { trait Transactor extends Actor {
import Transactor.SendTo
private lazy val txFactory = transactionFactory private lazy val txFactory = transactionFactory
/** /**
@ -131,7 +124,7 @@ trait Transactor extends Actor {
/** /**
* Override this method to coordinate with other transactors. * Override this method to coordinate with other transactors.
* The other transactors are added to the coordinated transaction barrier * The other transactors are added to the coordinated transaction barrier
* and sent a Coordinated message. The message to send is either specified * and sent a Coordinated message. The message to send can be specified
* or otherwise the same message as received is sent. Use the 'include' and * or otherwise the same message as received is sent. Use the 'include' and
* 'sendTo' methods to easily create the set of transactors to be involved. * 'sendTo' methods to easily create the set of transactors to be involved.
*/ */
@ -148,13 +141,13 @@ trait Transactor extends Actor {
def nobody: Set[SendTo] = Set.empty def nobody: Set[SendTo] = Set.empty
/** /**
* Incude other actors in this coordinated transaction and send * Include other actors in this coordinated transaction and send
* them the same message as received. Use as the result in 'coordinated'. * them the same message as received. Use as the result in 'coordinated'.
*/ */
def include(actors: ActorRef*): Set[SendTo] = actors map (SendTo(_)) toSet def include(actors: ActorRef*): Set[SendTo] = actors map (SendTo(_)) toSet
/** /**
* Incude other actors in this coordinated transaction and specify the message * Include other actors in this coordinated transaction and specify the message
* to send by providing ActorRef -> Message pairs. Use as the result in 'coordinated'. * to send by providing ActorRef -> Message pairs. Use as the result in 'coordinated'.
*/ */
def sendTo(pairs: (ActorRef, Any)*): Set[SendTo] = pairs map (p => SendTo(p._1, Some(p._2))) toSet def sendTo(pairs: (ActorRef, Any)*): Set[SendTo] = pairs map (p => SendTo(p._1, Some(p._2))) toSet

View file

@ -0,0 +1,110 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.transactor
import akka.actor.{UntypedActor, ActorRef}
import akka.stm.{DefaultTransactionConfig, TransactionFactory}
import java.util.{Set => JSet}
import scala.collection.JavaConversions._
/**
* An UntypedActor version of transactor for using from Java.
*/
abstract class UntypedTransactor extends UntypedActor {
private lazy val txFactory = transactionFactory
/**
* Create default transaction factory. Override to provide custom configuration.
*/
def transactionFactory = TransactionFactory(DefaultTransactionConfig)
/**
* Implement a general pattern for using coordinated transactions.
*/
@throws(classOf[Exception])
final def onReceive(message: Any): Unit = {
message match {
case coordinated @ Coordinated(message) => {
val others = coordinate(message)
for (sendTo <- others) {
sendTo.actor.sendOneWay(coordinated(sendTo.message.getOrElse(message)))
}
before(message)
coordinated.atomic(txFactory) { atomically(message) }
after(message)
}
case message => {
val normal = normally(message)
if (!normal) onReceive(Coordinated(message))
}
}
}
/**
* Override this method to coordinate with other transactors.
* The other transactors are added to the coordinated transaction barrier
* and sent a Coordinated message. The message to send can be specified
* or otherwise the same message as received is sent. Use the 'include' and
* 'sendTo' methods to easily create the set of transactors to be involved.
*/
@throws(classOf[Exception])
def coordinate(message: Any): JSet[SendTo] = nobody
/**
* Empty set of transactors to send to.
*/
def nobody: JSet[SendTo] = Set[SendTo]()
/**
* For including one other actor in this coordinated transaction and sending
* them the same message as received. Use as the result in `coordinated`.
*/
def include(actor: ActorRef): JSet[SendTo] = Set(SendTo(actor))
/**
* For including one other actor in this coordinated transaction and specifying the
* message to send. Use as the result in `coordinated`.
*/
def include(actor: ActorRef, message: Any): JSet[SendTo] = Set(SendTo(actor, Some(message)))
/**
* For including another actor in this coordinated transaction and sending
* them the same message as received. Use to create the result in `coordinated`.
*/
def sendTo(actor: ActorRef): SendTo = SendTo(actor)
/**
* For including another actor in this coordinated transaction and specifying the
* message to send. Use to create the result in `coordinated`.
*/
def sendTo(actor: ActorRef, message: Any): SendTo = SendTo(actor, Some(message))
/**
* A Receive block that runs before the coordinated transaction is entered.
*/
@throws(classOf[Exception])
def before(message: Any): Unit = {}
/**
* The Receive block to run inside the coordinated transaction.
*/
@throws(classOf[Exception])
def atomically(message: Any): Unit = {}
/**
* A Receive block that runs after the coordinated transaction.
*/
@throws(classOf[Exception])
def after(message: Any): Unit = {}
/**
* Bypass transactionality and behave like a normal actor.
*/
@throws(classOf[Exception])
def normally(message: Any): Boolean = false
}

View file

@ -1,4 +1,4 @@
package akka.stm.test; package akka.stm.example;
public class Address { public class Address {
private String location; private String location;

View file

@ -1,4 +1,4 @@
package akka.stm.test; package akka.stm.example;
import akka.stm.*; import akka.stm.*;

View file

@ -1,4 +1,4 @@
package akka.stm.test; package akka.stm.example;
import akka.stm.*; import akka.stm.*;

View file

@ -1,4 +1,4 @@
package akka.stm.test; package akka.stm.example;
public class StmExamples { public class StmExamples {
public static void main(String[] args) { public static void main(String[] args) {

View file

@ -1,4 +1,4 @@
package akka.stm.test; package akka.stm.example;
import akka.stm.*; import akka.stm.*;

View file

@ -1,4 +1,4 @@
package akka.stm.test; package akka.stm.example;
import akka.stm.*; import akka.stm.*;

View file

@ -1,4 +1,4 @@
package akka.stm.test; package akka.stm.example;
import akka.stm.*; import akka.stm.*;

View file

@ -1,4 +1,4 @@
package akka.stm.test; package akka.stm.example;
public class User { public class User {
private String name; private String name;

View file

@ -0,0 +1,21 @@
package akka.transactor.example;
import akka.actor.ActorRef;
public class Increment {
private ActorRef friend = null;
public Increment() {}
public Increment(ActorRef friend) {
this.friend = friend;
}
public boolean hasFriend() {
return friend != null;
}
public ActorRef getFriend() {
return friend;
}
}

View file

@ -0,0 +1,33 @@
package akka.transactor.example;
import akka.transactor.UntypedTransactor;
import akka.transactor.SendTo;
import akka.stm.Ref;
import java.util.Set;
public class UntypedCounter extends UntypedTransactor {
Ref<Integer> count = new Ref<Integer>(0);
@Override public Set<SendTo> coordinate(Object message) {
if (message instanceof Increment) {
Increment increment = (Increment) message;
if (increment.hasFriend())
return include(increment.getFriend(), new Increment());
}
return nobody();
}
public void atomically(Object message) {
if (message instanceof Increment) {
count.set(count.get() + 1);
}
}
@Override public boolean normally(Object message) {
if ("GetCount".equals(message)) {
getContext().replyUnsafe(count.get());
return true;
} else return false;
}
}

View file

@ -0,0 +1,43 @@
package akka.transactor.example;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.dispatch.Future;
import akka.dispatch.Futures;
public class UntypedTransactorExample {
public static void main(String[] args) throws InterruptedException {
System.out.println();
System.out.println("Untyped transactor example");
System.out.println();
ActorRef counter1 = UntypedActor.actorOf(UntypedCounter.class).start();
ActorRef counter2 = UntypedActor.actorOf(UntypedCounter.class).start();
counter1.sendOneWay(new Increment(counter2));
Thread.sleep(3000);
Future future1 = counter1.sendRequestReplyFuture("GetCount");
Future future2 = counter2.sendRequestReplyFuture("GetCount");
future1.await();
if (future1.isCompleted()) {
if (future1.result().isDefined()) {
int result = (Integer) future1.result().get();
System.out.println("counter 1: " + result);
}
}
future2.await();
if (future2.isCompleted()) {
if (future2.result().isDefined()) {
int result = (Integer) future2.result().get();
System.out.println("counter 2: " + result);
}
}
counter1.stop();
counter2.stop();
}
}

View file

@ -5,11 +5,19 @@ import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
public class Increment { public class Increment {
List<ActorRef> friends; private List<ActorRef> friends;
CountDownLatch latch; private CountDownLatch latch;
public Increment(List<ActorRef> friends, CountDownLatch latch) { public Increment(List<ActorRef> friends, CountDownLatch latch) {
this.friends = friends; this.friends = friends;
this.latch = latch; this.latch = latch;
} }
public List<ActorRef> getFriends() {
return friends;
}
public CountDownLatch getLatch() {
return latch;
}
} }

View file

@ -12,14 +12,14 @@ import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class CoordinatedCounter extends UntypedActor { public class UntypedCoordinatedCounter extends UntypedActor {
String name; private String name;
Ref<Integer> count = new Ref(0); private Ref<Integer> count = new Ref(0);
TransactionFactory txFactory = new TransactionFactoryBuilder() private TransactionFactory txFactory = new TransactionFactoryBuilder()
.setTimeout(new Duration(3, TimeUnit.SECONDS)) .setTimeout(new Duration(3, TimeUnit.SECONDS))
.build(); .build();
public CoordinatedCounter(String name) { public UntypedCoordinatedCounter(String name) {
this.name = name; this.name = name;
} }
@ -34,8 +34,8 @@ public class CoordinatedCounter extends UntypedActor {
Object message = coordinated.getMessage(); Object message = coordinated.getMessage();
if (message instanceof Increment) { if (message instanceof Increment) {
Increment increment = (Increment) message; Increment increment = (Increment) message;
List<ActorRef> friends = increment.friends; List<ActorRef> friends = increment.getFriends();
final CountDownLatch latch = increment.latch; final CountDownLatch latch = increment.getLatch();
if (!friends.isEmpty()) { if (!friends.isEmpty()) {
Increment coordMessage = new Increment(friends.subList(1, friends.size()), latch); Increment coordMessage = new Increment(friends.subList(1, friends.size()), latch);
friends.get(0).sendOneWay(coordinated.coordinate(coordMessage)); friends.get(0).sendOneWay(coordinated.coordinate(coordMessage));

View file

@ -17,7 +17,7 @@ import java.util.concurrent.TimeUnit;
import scala.Option; import scala.Option;
public class CoordinatedIncrementTest { public class UntypedCoordinatedIncrementTest {
List<ActorRef> counters; List<ActorRef> counters;
ActorRef failer; ActorRef failer;
@ -30,13 +30,13 @@ public class CoordinatedIncrementTest {
final String name = "counter" + i; final String name = "counter" + i;
ActorRef counter = UntypedActor.actorOf(new UntypedActorFactory() { ActorRef counter = UntypedActor.actorOf(new UntypedActorFactory() {
public UntypedActor create() { public UntypedActor create() {
return new CoordinatedCounter(name); return new UntypedCoordinatedCounter(name);
} }
}); });
counter.start(); counter.start();
counters.add(counter); counters.add(counter);
} }
failer = UntypedActor.actorOf(CoordinatedFailer.class); failer = UntypedActor.actorOf(UntypedFailer.class);
failer.start(); failer.start();
} }

View file

@ -0,0 +1,77 @@
package akka.transactor.test;
import akka.transactor.UntypedTransactor;
import akka.transactor.SendTo;
import akka.actor.ActorRef;
import akka.stm.*;
import akka.util.Duration;
import org.multiverse.api.StmUtils;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class UntypedCounter extends UntypedTransactor {
private String name;
private Ref<Integer> count = new Ref<Integer>(0);
public UntypedCounter(String name) {
this.name = name;
}
@Override public TransactionFactory transactionFactory() {
return new TransactionFactoryBuilder()
.setTimeout(new Duration(3, TimeUnit.SECONDS))
.build();
}
private void increment() {
System.out.println(name + ": incrementing");
count.set(count.get() + 1);
}
@Override public Set<SendTo> coordinate(Object message) {
if (message instanceof Increment) {
Increment increment = (Increment) message;
List<ActorRef> friends = increment.getFriends();
if (!friends.isEmpty()) {
Increment coordMessage = new Increment(friends.subList(1, friends.size()), increment.getLatch());
return include(friends.get(0), coordMessage);
} else {
return nobody();
}
} else {
return nobody();
}
}
@Override public void before(Object message) {
System.out.println(name + ": before transaction");
}
public void atomically(Object message) {
if (message instanceof Increment) {
increment();
final Increment increment = (Increment) message;
StmUtils.scheduleDeferredTask(new Runnable() {
public void run() { increment.getLatch().countDown(); }
});
StmUtils.scheduleCompensatingTask(new Runnable() {
public void run() { increment.getLatch().countDown(); }
});
}
}
@Override public void after(Object message) {
System.out.println(name + ": after transaction");
}
@Override public boolean normally(Object message) {
if ("GetCount".equals(message)) {
getContext().replyUnsafe(count.get());
return true;
} else return false;
}
}

View file

@ -2,7 +2,7 @@ package akka.transactor.test;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
public class CoordinatedFailer extends UntypedActor { public class UntypedFailer extends UntypedActor {
public void onReceive(Object incoming) throws Exception { public void onReceive(Object incoming) throws Exception {
throw new RuntimeException("Expected failure"); throw new RuntimeException("Expected failure");
} }

View file

@ -0,0 +1,87 @@
package akka.transactor.test;
import static org.junit.Assert.*;
import org.junit.Test;
import org.junit.Before;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.dispatch.Future;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import scala.Option;
public class UntypedTransactorTest {
List<ActorRef> counters;
ActorRef failer;
int numCounters = 5;
int timeout = 5;
@Before public void initialise() {
counters = new ArrayList<ActorRef>();
for (int i = 1; i <= numCounters; i++) {
final String name = "counter" + i;
ActorRef counter = UntypedActor.actorOf(new UntypedActorFactory() {
public UntypedActor create() {
return new UntypedCounter(name);
}
});
counter.start();
counters.add(counter);
}
failer = UntypedActor.actorOf(UntypedFailer.class);
failer.start();
}
@Test public void incrementAllCountersWithSuccessfulTransaction() {
CountDownLatch incrementLatch = new CountDownLatch(numCounters);
Increment message = new Increment(counters.subList(1, counters.size()), incrementLatch);
counters.get(0).sendOneWay(message);
try {
incrementLatch.await(timeout, TimeUnit.SECONDS);
} catch (InterruptedException exception) {}
for (ActorRef counter : counters) {
Future future = counter.sendRequestReplyFuture("GetCount");
future.await();
if (future.isCompleted()) {
Option resultOption = future.result();
if (resultOption.isDefined()) {
Object result = resultOption.get();
int count = (Integer) result;
assertEquals(1, count);
}
}
}
}
@Test public void incrementNoCountersWithFailingTransaction() {
CountDownLatch incrementLatch = new CountDownLatch(numCounters);
List<ActorRef> actors = new ArrayList<ActorRef>(counters);
actors.add(failer);
Increment message = new Increment(actors.subList(1, actors.size()), incrementLatch);
actors.get(0).sendOneWay(message);
try {
incrementLatch.await(timeout, TimeUnit.SECONDS);
} catch (InterruptedException exception) {}
for (ActorRef counter : counters) {
Future future = counter.sendRequestReplyFuture("GetCount");
future.await();
if (future.isCompleted()) {
Option resultOption = future.result();
if (resultOption.isDefined()) {
Object result = resultOption.get();
int count = (Integer) result;
assertEquals(0, count);
}
}
}
}
}

View file

@ -1,5 +0,0 @@
package akka.transactor.test
import org.scalatest.junit.JUnitWrapperSuite
class JavaCoordinatedSpec extends JUnitWrapperSuite("akka.transactor.test.CoordinatedIncrementTest", Thread.currentThread.getContextClassLoader)

View file

@ -0,0 +1,8 @@
package akka.transactor.test
import org.scalatest.junit.JUnitWrapperSuite
class JavaUntypedCoordinatedSpec extends JUnitWrapperSuite(
"akka.transactor.test.UntypedCoordinatedIncrementTest",
Thread.currentThread.getContextClassLoader
)

View file

@ -0,0 +1,8 @@
package akka.transactor.test
import org.scalatest.junit.JUnitWrapperSuite
class JavaUntypedTransactorSpec extends JUnitWrapperSuite(
"akka.transactor.test.UntypedTransactorTest",
Thread.currentThread.getContextClassLoader
)