Add coordinated transactions for typed actors

This commit is contained in:
Peter Vlugter 2010-11-13 16:19:57 +13:00
parent 49c2575ca7
commit c0a3437db2
26 changed files with 317 additions and 4 deletions

View file

@ -98,6 +98,12 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) {
new Coordinated(msg, barrier)
}
/**
* Create a new Coordinated object but *do not* increment the number of parties by one.
* Only use this method if you know this is what you need.
*/
def noIncrement(msg: Any) = new Coordinated(msg, barrier)
/**
* Java API: get the message for this Coordinated.
*/
@ -143,4 +149,10 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) {
* in this coordination before committing. The timeout is specified by the transaction factory.
*/
def atomic[T](jatomic: Atomic[T]): T = atomic(jatomic.factory)(jatomic.atomically)
/**
* An empty coordinated atomic block. Can be used to complete the number of parties involved
* and wait for all transactions to complete.
*/
def await() = atomic(Coordinated.DefaultFactory) {}
}

View file

@ -77,7 +77,7 @@ class CoordinatedIncrementSpec extends WordSpec with MustMatchers {
"increment no counters with a failing transaction" in {
val (counters, failer) = createActors
val failLatch = new CountDownLatch(numCounters + 1)
val failLatch = new CountDownLatch(numCounters)
counters(0) ! Coordinated(Increment(counters.tail :+ failer, failLatch))
failLatch.await(timeout.length, timeout.unit)
for (counter <- counters) {
@ -88,5 +88,3 @@ class CoordinatedIncrementSpec extends WordSpec with MustMatchers {
}
}
}

View file

@ -0,0 +1,7 @@
package akka.transactor.typed;
import java.lang.annotation.*;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Coordinated { }

View file

@ -9,6 +9,8 @@ import akka.dispatch.{MessageDispatcher, Future, CompletableFuture, Dispatchers}
import akka.config.Supervision._
import akka.util._
import ReflectiveAccess._
import akka.transactor.Coordinated
import akka.transactor.typed.{Coordination, Coordinated => CoordinatedAnnotation}
import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint}
import org.codehaus.aspectwerkz.proxy.Proxy
@ -181,6 +183,11 @@ abstract class TypedActor extends Actor with Proxyable {
if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint)
if (TypedActor.isOneWay(joinPoint)) joinPoint.proceed
else self.reply(joinPoint.proceed)
case coordinated @ Coordinated(joinPoint: JoinPoint) =>
SenderContextInfo.senderActorRef.value = self
SenderContextInfo.senderProxy.value = proxy
if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint)
coordinated atomic { joinPoint.proceed }
case Link(proxy) => self.link(proxy)
case Unlink(proxy) => self.unlink(proxy)
case unexpected => throw new IllegalActorStateException(
@ -702,6 +709,12 @@ object TypedActor extends Logging {
private[akka] def isOneWay(methodRtti: MethodRtti): Boolean =
methodRtti.getMethod.getReturnType == java.lang.Void.TYPE
private[akka] def isCoordinated(joinPoint: JoinPoint): Boolean =
isCoordinated(joinPoint.getRtti.asInstanceOf[MethodRtti])
private[akka] def isCoordinated(methodRtti: MethodRtti): Boolean =
methodRtti.getMethod.isAnnotationPresent(classOf[CoordinatedAnnotation])
private[akka] def returnsFuture_?(methodRtti: MethodRtti): Boolean =
classOf[Future[_]].isAssignableFrom(methodRtti.getMethod.getReturnType)
@ -783,12 +796,26 @@ private[akka] abstract class ActorAspect {
val isOneWay = TypedActor.isOneWay(methodRtti)
val senderActorRef = Some(SenderContextInfo.senderActorRef.value)
val senderProxy = Some(SenderContextInfo.senderProxy.value)
val isCoordinated = TypedActor.isCoordinated(methodRtti)
typedActor.context._sender = senderProxy
if (!actorRef.isRunning && !isStopped) {
isStopped = true
joinPoint.proceed
} else if (isOneWay && isCoordinated) {
val coordinatedOpt = Option(Coordination.coordinated.value)
val coordinated = coordinatedOpt.map( coord =>
if (Coordination.firstParty.value) { // already included in coordination
Coordination.firstParty.value = false
coord.noIncrement(joinPoint)
} else {
coord(joinPoint)
}).getOrElse(Coordinated(joinPoint))
actorRef.!(coordinated)(senderActorRef)
null.asInstanceOf[AnyRef]
} else if (isOneWay) {
actorRef.!(joinPoint)(senderActorRef)
null.asInstanceOf[AnyRef]

View file

@ -0,0 +1,78 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.transactor.typed
import akka.transactor.{Coordinated => CoordinatedObject}
import akka.stm.Atomic
import scala.util.DynamicVariable
/**
* Coordinating transactions between typed actors.
*
* Example ''(in Scala)''
*
* {{{
* trait Counter {
* @Coordinated def increment: Unit
* def get: Int
* }
*
* class CounterImpl extends TypedActor with Counter {
* val ref = Ref(0)
* def increment = ref alter (_ + 1)
* def get = ref.get
* }
*
*
* val counter1 = TypedActor.newInstance(classOf[Counter], classOf[CounterImpl])
* val counter2 = TypedActor.newInstance(classOf[Counter], classOf[CounterImpl])
*
* coordinate {
* counter1.increment
* counter2.increment
* }
* }}}
*/
object Coordination {
private[akka] val coordinated = new DynamicVariable[CoordinatedObject](null)
private[akka] val firstParty = new DynamicVariable[Boolean](false)
/**
* For creating a coordination between typed actors that use
* the [[akka.transactor.typed.Coordinated]] annotation.
* Coordinated transactions will wait for all other transactions in the coordination
* before committing. The timeout is specified by the default transaction factory.
* It's possible to specify whether or not this `coordinate` block waits for all of
* the transactions to complete - the default is that it does.
*/
def coordinate[U](wait: Boolean = true)(body: => U): Unit = {
firstParty.value = !wait
coordinated.withValue(CoordinatedObject()) {
body
if (wait) coordinated.value.await
}
firstParty.value = false
}
/**
* For creating a coordination between typed actors that use
* the [[akka.transactor.typed.Coordinated]] annotation.
* Coordinated transactions will wait for all other transactions in the coordination
* before committing. The timeout is specified by the default transaction factory.
*/
def coordinate[U](body: => U): Unit = coordinate(true)(body)
/**
* Java API: coordinate that accepts an [[akka.stm.Atomic]].
* For creating a coordination between typed actors that use
* the [[akka.transactor.typed.Coordinated]] annotation.
* Coordinated transactions will wait for all other transactions in the coordination
* before committing. The timeout is specified by the default transaction factory.
* Use the `wait` parameter to specify whether or not this `coordinate` block
* waits for all of the transactions to complete.
*/
def coordinate[U](wait: Boolean, jatomic: Atomic[U]): Unit = coordinate(wait)(jatomic.atomically)
}

View file

@ -0,0 +1,16 @@
package akka.transactor.test;
import akka.actor.TypedActor;
import akka.stm.Ref;
public class FailerImpl extends TypedActor implements TypedCounter {
private Ref<Integer> count = new Ref<Integer>(0);
public void increment() {
throw new RuntimeException("Expected failure");
}
public Integer get() {
return count.get();
}
}

View file

@ -0,0 +1,71 @@
package akka.transactor.test;
import static org.junit.Assert.*;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
import akka.actor.TypedActor;
import akka.transactor.typed.Coordination;
import akka.stm.Atomic;
import java.util.ArrayList;
import java.util.List;
public class TypedCoordinatedIncrementTest {
List<TypedCounter> counters;
TypedCounter failer;
int numCounters = 5;
@Before public void initialise() {
counters = new ArrayList<TypedCounter>();
for (int i = 1; i <= numCounters; i++) {
TypedCounter counter = (TypedCounter) TypedActor.newInstance(TypedCounter.class, TypedCounterImpl.class);
counters.add(counter);
}
failer = (TypedCounter) TypedActor.newInstance(TypedCounter.class, FailerImpl.class);
}
@Test public void incrementAllCountersWithSuccessfulTransaction() {
Coordination.coordinate(true, new Atomic<Object>() {
public Object atomically() {
for (TypedCounter counter : counters) {
counter.increment();
}
return null;
}
});
for (TypedCounter counter : counters) {
int count = counter.get();
assertEquals(1, count);
}
}
@Test public void incrementNoCountersWithFailingTransaction() {
try {
Coordination.coordinate(true, new Atomic<Object>() {
public Object atomically() {
for (TypedCounter counter : counters) {
counter.increment();
}
failer.increment();
return null;
}
});
} catch (Exception e) {
// ignore
}
for (TypedCounter counter : counters) {
int count = counter.get();
assertEquals(0, count);
}
}
@After public void stop() {
for (TypedCounter counter : counters) {
TypedActor.stop(counter);
}
TypedActor.stop(failer);
}
}

View file

@ -0,0 +1,8 @@
package akka.transactor.test;
import akka.transactor.typed.Coordinated;
public interface TypedCounter {
@Coordinated public void increment();
public Integer get();
}

View file

@ -0,0 +1,16 @@
package akka.transactor.test;
import akka.actor.TypedActor;
import akka.stm.Ref;
public class TypedCounterImpl extends TypedActor implements TypedCounter {
private Ref<Integer> count = new Ref<Integer>(0);
public void increment() {
count.set(count.get() + 1);
}
public Integer get() {
return count.get();
}
}

View file

@ -0,0 +1,8 @@
package akka.transactor.test
import org.scalatest.junit.JUnitWrapperSuite
class JavaCoordinatedIncrementSpec extends JUnitWrapperSuite(
"akka.transactor.test.TypedCoordinatedIncrementTest",
Thread.currentThread.getContextClassLoader
)

View file

@ -0,0 +1,72 @@
package akka.transactor.test
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.TypedActor
import akka.stm.Ref
import akka.transactor.typed.Coordinated
import akka.transactor.typed.Coordination._
object TypedCoordinatedIncrement {
trait Counter {
@Coordinated def increment: Unit
def get: Int
}
class CounterImpl extends TypedActor with Counter {
val ref = Ref(0)
def increment = ref alter (_ + 1)
def get = ref.get
}
class FailerImpl extends TypedActor with Counter {
val ref = Ref(0)
def increment = throw new RuntimeException("Expected failure")
def get = ref.get
}
}
class TypedCoordinatedIncrementSpec extends WordSpec with MustMatchers {
import TypedCoordinatedIncrement._
val numCounters = 5
def createActors = {
def createCounter(i: Int) = TypedActor.newInstance(classOf[Counter], classOf[CounterImpl])
val counters = (1 to numCounters) map createCounter
val failer = TypedActor.newInstance(classOf[Counter], classOf[FailerImpl])
(counters, failer)
}
"Coordinated typed actor increment" should {
"increment all counters by one with successful transactions" in {
val (counters, failer) = createActors
coordinate {
counters foreach (_.increment)
}
for (counter <- counters) {
counter.get must be === 1
}
counters foreach (TypedActor.stop)
TypedActor.stop(failer)
}
"increment no counters with a failing transaction" in {
val (counters, failer) = createActors
try {
coordinate {
counters foreach (_.increment)
failer.increment
}
} catch {
case _ => ()
}
for (counter <- counters) {
counter.get must be === 0
}
counters foreach (TypedActor.stop)
TypedActor.stop(failer)
}
}
}

View file

@ -290,7 +290,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val akka_actor = project("akka-actor", "akka-actor", new AkkaActorProject(_))
lazy val akka_stm = project("akka-stm", "akka-stm", new AkkaStmProject(_), akka_actor)
lazy val akka_typed_actor = project("akka-typed-actor", "akka-typed-actor", new AkkaTypedActorProject(_), akka_actor)
lazy val akka_typed_actor = project("akka-typed-actor", "akka-typed-actor", new AkkaTypedActorProject(_), akka_stm)
lazy val akka_remote = project("akka-remote", "akka-remote", new AkkaRemoteProject(_), akka_typed_actor)
lazy val akka_amqp = project("akka-amqp", "akka-amqp", new AkkaAMQPProject(_), akka_remote)
lazy val akka_http = project("akka-http", "akka-http", new AkkaHttpProject(_), akka_remote, akka_camel)