diff --git a/akka-stm/src/main/scala/transactor/Coordinated.scala b/akka-stm/src/main/scala/transactor/Coordinated.scala index 97fcaffb2c..8725130a0f 100644 --- a/akka-stm/src/main/scala/transactor/Coordinated.scala +++ b/akka-stm/src/main/scala/transactor/Coordinated.scala @@ -98,6 +98,12 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) { new Coordinated(msg, barrier) } + /** + * Create a new Coordinated object but *do not* increment the number of parties by one. + * Only use this method if you know this is what you need. + */ + def noIncrement(msg: Any) = new Coordinated(msg, barrier) + /** * Java API: get the message for this Coordinated. */ @@ -143,4 +149,10 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) { * in this coordination before committing. The timeout is specified by the transaction factory. */ def atomic[T](jatomic: Atomic[T]): T = atomic(jatomic.factory)(jatomic.atomically) + + /** + * An empty coordinated atomic block. Can be used to complete the number of parties involved + * and wait for all transactions to complete. + */ + def await() = atomic(Coordinated.DefaultFactory) {} } diff --git a/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala b/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala index 7f09b5d9bc..76202c7b90 100644 --- a/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala +++ b/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala @@ -77,7 +77,7 @@ class CoordinatedIncrementSpec extends WordSpec with MustMatchers { "increment no counters with a failing transaction" in { val (counters, failer) = createActors - val failLatch = new CountDownLatch(numCounters + 1) + val failLatch = new CountDownLatch(numCounters) counters(0) ! Coordinated(Increment(counters.tail :+ failer, failLatch)) failLatch.await(timeout.length, timeout.unit) for (counter <- counters) { @@ -88,5 +88,3 @@ class CoordinatedIncrementSpec extends WordSpec with MustMatchers { } } } - - diff --git a/akka-typed-actor/src/main/java/akka/transactor/typed/Coordinated.java b/akka-typed-actor/src/main/java/akka/transactor/typed/Coordinated.java new file mode 100644 index 0000000000..d64fb443b2 --- /dev/null +++ b/akka-typed-actor/src/main/java/akka/transactor/typed/Coordinated.java @@ -0,0 +1,7 @@ +package akka.transactor.typed; + +import java.lang.annotation.*; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface Coordinated { } diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index 8ca5f7094f..c3a10141d8 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -9,6 +9,8 @@ import akka.dispatch.{MessageDispatcher, Future, CompletableFuture, Dispatchers} import akka.config.Supervision._ import akka.util._ import ReflectiveAccess._ +import akka.transactor.Coordinated +import akka.transactor.typed.{Coordination, Coordinated => CoordinatedAnnotation} import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint} import org.codehaus.aspectwerkz.proxy.Proxy @@ -181,6 +183,11 @@ abstract class TypedActor extends Actor with Proxyable { if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint) if (TypedActor.isOneWay(joinPoint)) joinPoint.proceed else self.reply(joinPoint.proceed) + case coordinated @ Coordinated(joinPoint: JoinPoint) => + SenderContextInfo.senderActorRef.value = self + SenderContextInfo.senderProxy.value = proxy + if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint) + coordinated atomic { joinPoint.proceed } case Link(proxy) => self.link(proxy) case Unlink(proxy) => self.unlink(proxy) case unexpected => throw new IllegalActorStateException( @@ -702,6 +709,12 @@ object TypedActor extends Logging { private[akka] def isOneWay(methodRtti: MethodRtti): Boolean = methodRtti.getMethod.getReturnType == java.lang.Void.TYPE + private[akka] def isCoordinated(joinPoint: JoinPoint): Boolean = + isCoordinated(joinPoint.getRtti.asInstanceOf[MethodRtti]) + + private[akka] def isCoordinated(methodRtti: MethodRtti): Boolean = + methodRtti.getMethod.isAnnotationPresent(classOf[CoordinatedAnnotation]) + private[akka] def returnsFuture_?(methodRtti: MethodRtti): Boolean = classOf[Future[_]].isAssignableFrom(methodRtti.getMethod.getReturnType) @@ -783,12 +796,26 @@ private[akka] abstract class ActorAspect { val isOneWay = TypedActor.isOneWay(methodRtti) val senderActorRef = Some(SenderContextInfo.senderActorRef.value) val senderProxy = Some(SenderContextInfo.senderProxy.value) + val isCoordinated = TypedActor.isCoordinated(methodRtti) typedActor.context._sender = senderProxy if (!actorRef.isRunning && !isStopped) { isStopped = true joinPoint.proceed + } else if (isOneWay && isCoordinated) { + val coordinatedOpt = Option(Coordination.coordinated.value) + val coordinated = coordinatedOpt.map( coord => + if (Coordination.firstParty.value) { // already included in coordination + Coordination.firstParty.value = false + coord.noIncrement(joinPoint) + } else { + coord(joinPoint) + }).getOrElse(Coordinated(joinPoint)) + + actorRef.!(coordinated)(senderActorRef) + null.asInstanceOf[AnyRef] + } else if (isOneWay) { actorRef.!(joinPoint)(senderActorRef) null.asInstanceOf[AnyRef] diff --git a/akka-typed-actor/src/main/scala/transactor/typed/Coordination.scala b/akka-typed-actor/src/main/scala/transactor/typed/Coordination.scala new file mode 100644 index 0000000000..a15393c484 --- /dev/null +++ b/akka-typed-actor/src/main/scala/transactor/typed/Coordination.scala @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package akka.transactor.typed + +import akka.transactor.{Coordinated => CoordinatedObject} +import akka.stm.Atomic + +import scala.util.DynamicVariable + +/** + * Coordinating transactions between typed actors. + * + * Example ''(in Scala)'' + * + * {{{ + * trait Counter { + * @Coordinated def increment: Unit + * def get: Int + * } + * + * class CounterImpl extends TypedActor with Counter { + * val ref = Ref(0) + * def increment = ref alter (_ + 1) + * def get = ref.get + * } + * + * + * val counter1 = TypedActor.newInstance(classOf[Counter], classOf[CounterImpl]) + * val counter2 = TypedActor.newInstance(classOf[Counter], classOf[CounterImpl]) + * + * coordinate { + * counter1.increment + * counter2.increment + * } + * }}} + */ +object Coordination { + private[akka] val coordinated = new DynamicVariable[CoordinatedObject](null) + private[akka] val firstParty = new DynamicVariable[Boolean](false) + + /** + * For creating a coordination between typed actors that use + * the [[akka.transactor.typed.Coordinated]] annotation. + * Coordinated transactions will wait for all other transactions in the coordination + * before committing. The timeout is specified by the default transaction factory. + * It's possible to specify whether or not this `coordinate` block waits for all of + * the transactions to complete - the default is that it does. + */ + def coordinate[U](wait: Boolean = true)(body: => U): Unit = { + firstParty.value = !wait + coordinated.withValue(CoordinatedObject()) { + body + if (wait) coordinated.value.await + } + firstParty.value = false + } + + /** + * For creating a coordination between typed actors that use + * the [[akka.transactor.typed.Coordinated]] annotation. + * Coordinated transactions will wait for all other transactions in the coordination + * before committing. The timeout is specified by the default transaction factory. + */ + def coordinate[U](body: => U): Unit = coordinate(true)(body) + + /** + * Java API: coordinate that accepts an [[akka.stm.Atomic]]. + * For creating a coordination between typed actors that use + * the [[akka.transactor.typed.Coordinated]] annotation. + * Coordinated transactions will wait for all other transactions in the coordination + * before committing. The timeout is specified by the default transaction factory. + * Use the `wait` parameter to specify whether or not this `coordinate` block + * waits for all of the transactions to complete. + */ + def coordinate[U](wait: Boolean, jatomic: Atomic[U]): Unit = coordinate(wait)(jatomic.atomically) +} diff --git a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/Bar.java b/akka-typed-actor/src/test/java/akka/actor/Bar.java similarity index 100% rename from akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/Bar.java rename to akka-typed-actor/src/test/java/akka/actor/Bar.java diff --git a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/BarImpl.java b/akka-typed-actor/src/test/java/akka/actor/BarImpl.java similarity index 100% rename from akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/BarImpl.java rename to akka-typed-actor/src/test/java/akka/actor/BarImpl.java diff --git a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/Ext.java b/akka-typed-actor/src/test/java/akka/actor/Ext.java similarity index 100% rename from akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/Ext.java rename to akka-typed-actor/src/test/java/akka/actor/Ext.java diff --git a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/ExtImpl.java b/akka-typed-actor/src/test/java/akka/actor/ExtImpl.java similarity index 100% rename from akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/ExtImpl.java rename to akka-typed-actor/src/test/java/akka/actor/ExtImpl.java diff --git a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/Foo.java b/akka-typed-actor/src/test/java/akka/actor/Foo.java similarity index 100% rename from akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/Foo.java rename to akka-typed-actor/src/test/java/akka/actor/Foo.java diff --git a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/FooImpl.java b/akka-typed-actor/src/test/java/akka/actor/FooImpl.java similarity index 100% rename from akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/FooImpl.java rename to akka-typed-actor/src/test/java/akka/actor/FooImpl.java diff --git a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java b/akka-typed-actor/src/test/java/akka/actor/SamplePojo.java similarity index 100% rename from akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java rename to akka-typed-actor/src/test/java/akka/actor/SamplePojo.java diff --git a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojoImpl.java b/akka-typed-actor/src/test/java/akka/actor/SamplePojoImpl.java similarity index 100% rename from akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojoImpl.java rename to akka-typed-actor/src/test/java/akka/actor/SamplePojoImpl.java diff --git a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojo.java b/akka-typed-actor/src/test/java/akka/actor/SimpleJavaPojo.java similarity index 100% rename from akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojo.java rename to akka-typed-actor/src/test/java/akka/actor/SimpleJavaPojo.java diff --git a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojoCaller.java b/akka-typed-actor/src/test/java/akka/actor/SimpleJavaPojoCaller.java similarity index 100% rename from akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojoCaller.java rename to akka-typed-actor/src/test/java/akka/actor/SimpleJavaPojoCaller.java diff --git a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojoCallerImpl.java b/akka-typed-actor/src/test/java/akka/actor/SimpleJavaPojoCallerImpl.java similarity index 100% rename from akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojoCallerImpl.java rename to akka-typed-actor/src/test/java/akka/actor/SimpleJavaPojoCallerImpl.java diff --git a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojoImpl.java b/akka-typed-actor/src/test/java/akka/actor/SimpleJavaPojoImpl.java similarity index 100% rename from akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojoImpl.java rename to akka-typed-actor/src/test/java/akka/actor/SimpleJavaPojoImpl.java diff --git a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/TypedActorFailer.java b/akka-typed-actor/src/test/java/akka/actor/TypedActorFailer.java similarity index 100% rename from akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/TypedActorFailer.java rename to akka-typed-actor/src/test/java/akka/actor/TypedActorFailer.java diff --git a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/TypedActorFailerImpl.java b/akka-typed-actor/src/test/java/akka/actor/TypedActorFailerImpl.java similarity index 100% rename from akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/TypedActorFailerImpl.java rename to akka-typed-actor/src/test/java/akka/actor/TypedActorFailerImpl.java diff --git a/akka-typed-actor/src/test/java/akka/transactor/test/FailerImpl.java b/akka-typed-actor/src/test/java/akka/transactor/test/FailerImpl.java new file mode 100644 index 0000000000..018418f1f6 --- /dev/null +++ b/akka-typed-actor/src/test/java/akka/transactor/test/FailerImpl.java @@ -0,0 +1,16 @@ +package akka.transactor.test; + +import akka.actor.TypedActor; +import akka.stm.Ref; + +public class FailerImpl extends TypedActor implements TypedCounter { + private Ref count = new Ref(0); + + public void increment() { + throw new RuntimeException("Expected failure"); + } + + public Integer get() { + return count.get(); + } +} \ No newline at end of file diff --git a/akka-typed-actor/src/test/java/akka/transactor/test/TypedCoordinatedIncrementTest.java b/akka-typed-actor/src/test/java/akka/transactor/test/TypedCoordinatedIncrementTest.java new file mode 100644 index 0000000000..13f6d1146e --- /dev/null +++ b/akka-typed-actor/src/test/java/akka/transactor/test/TypedCoordinatedIncrementTest.java @@ -0,0 +1,71 @@ +package akka.transactor.test; + +import static org.junit.Assert.*; +import org.junit.Test; +import org.junit.Before; +import org.junit.After; + +import akka.actor.TypedActor; +import akka.transactor.typed.Coordination; +import akka.stm.Atomic; + +import java.util.ArrayList; +import java.util.List; + +public class TypedCoordinatedIncrementTest { + List counters; + TypedCounter failer; + + int numCounters = 5; + + @Before public void initialise() { + counters = new ArrayList(); + for (int i = 1; i <= numCounters; i++) { + TypedCounter counter = (TypedCounter) TypedActor.newInstance(TypedCounter.class, TypedCounterImpl.class); + counters.add(counter); + } + failer = (TypedCounter) TypedActor.newInstance(TypedCounter.class, FailerImpl.class); + } + + @Test public void incrementAllCountersWithSuccessfulTransaction() { + Coordination.coordinate(true, new Atomic() { + public Object atomically() { + for (TypedCounter counter : counters) { + counter.increment(); + } + return null; + } + }); + for (TypedCounter counter : counters) { + int count = counter.get(); + assertEquals(1, count); + } + } + + @Test public void incrementNoCountersWithFailingTransaction() { + try { + Coordination.coordinate(true, new Atomic() { + public Object atomically() { + for (TypedCounter counter : counters) { + counter.increment(); + } + failer.increment(); + return null; + } + }); + } catch (Exception e) { + // ignore + } + for (TypedCounter counter : counters) { + int count = counter.get(); + assertEquals(0, count); + } + } + + @After public void stop() { + for (TypedCounter counter : counters) { + TypedActor.stop(counter); + } + TypedActor.stop(failer); + } +} diff --git a/akka-typed-actor/src/test/java/akka/transactor/test/TypedCounter.java b/akka-typed-actor/src/test/java/akka/transactor/test/TypedCounter.java new file mode 100644 index 0000000000..f9fd4c0bb6 --- /dev/null +++ b/akka-typed-actor/src/test/java/akka/transactor/test/TypedCounter.java @@ -0,0 +1,8 @@ +package akka.transactor.test; + +import akka.transactor.typed.Coordinated; + +public interface TypedCounter { + @Coordinated public void increment(); + public Integer get(); +} diff --git a/akka-typed-actor/src/test/java/akka/transactor/test/TypedCounterImpl.java b/akka-typed-actor/src/test/java/akka/transactor/test/TypedCounterImpl.java new file mode 100644 index 0000000000..173bbcbc20 --- /dev/null +++ b/akka-typed-actor/src/test/java/akka/transactor/test/TypedCounterImpl.java @@ -0,0 +1,16 @@ +package akka.transactor.test; + +import akka.actor.TypedActor; +import akka.stm.Ref; + +public class TypedCounterImpl extends TypedActor implements TypedCounter { + private Ref count = new Ref(0); + + public void increment() { + count.set(count.get() + 1); + } + + public Integer get() { + return count.get(); + } +} diff --git a/akka-typed-actor/src/test/scala/transactor/JavaCoordinatedIncrementSpec.scala b/akka-typed-actor/src/test/scala/transactor/JavaCoordinatedIncrementSpec.scala new file mode 100644 index 0000000000..fb0d1dc1f4 --- /dev/null +++ b/akka-typed-actor/src/test/scala/transactor/JavaCoordinatedIncrementSpec.scala @@ -0,0 +1,8 @@ +package akka.transactor.test + +import org.scalatest.junit.JUnitWrapperSuite + +class JavaCoordinatedIncrementSpec extends JUnitWrapperSuite( + "akka.transactor.test.TypedCoordinatedIncrementTest", + Thread.currentThread.getContextClassLoader +) diff --git a/akka-typed-actor/src/test/scala/transactor/TypedCoordinatedIncrementSpec.scala b/akka-typed-actor/src/test/scala/transactor/TypedCoordinatedIncrementSpec.scala new file mode 100644 index 0000000000..d5a9bb3769 --- /dev/null +++ b/akka-typed-actor/src/test/scala/transactor/TypedCoordinatedIncrementSpec.scala @@ -0,0 +1,72 @@ +package akka.transactor.test + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers + +import akka.actor.TypedActor +import akka.stm.Ref +import akka.transactor.typed.Coordinated +import akka.transactor.typed.Coordination._ + +object TypedCoordinatedIncrement { + trait Counter { + @Coordinated def increment: Unit + def get: Int + } + + class CounterImpl extends TypedActor with Counter { + val ref = Ref(0) + def increment = ref alter (_ + 1) + def get = ref.get + } + + class FailerImpl extends TypedActor with Counter { + val ref = Ref(0) + def increment = throw new RuntimeException("Expected failure") + def get = ref.get + } +} + +class TypedCoordinatedIncrementSpec extends WordSpec with MustMatchers { + import TypedCoordinatedIncrement._ + + val numCounters = 5 + + def createActors = { + def createCounter(i: Int) = TypedActor.newInstance(classOf[Counter], classOf[CounterImpl]) + val counters = (1 to numCounters) map createCounter + val failer = TypedActor.newInstance(classOf[Counter], classOf[FailerImpl]) + (counters, failer) + } + + "Coordinated typed actor increment" should { + "increment all counters by one with successful transactions" in { + val (counters, failer) = createActors + coordinate { + counters foreach (_.increment) + } + for (counter <- counters) { + counter.get must be === 1 + } + counters foreach (TypedActor.stop) + TypedActor.stop(failer) + } + + "increment no counters with a failing transaction" in { + val (counters, failer) = createActors + try { + coordinate { + counters foreach (_.increment) + failer.increment + } + } catch { + case _ => () + } + for (counter <- counters) { + counter.get must be === 0 + } + counters foreach (TypedActor.stop) + TypedActor.stop(failer) + } + } +} diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index cf76facc7c..3f1fa28e4a 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -290,7 +290,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val akka_actor = project("akka-actor", "akka-actor", new AkkaActorProject(_)) lazy val akka_stm = project("akka-stm", "akka-stm", new AkkaStmProject(_), akka_actor) - lazy val akka_typed_actor = project("akka-typed-actor", "akka-typed-actor", new AkkaTypedActorProject(_), akka_actor) + lazy val akka_typed_actor = project("akka-typed-actor", "akka-typed-actor", new AkkaTypedActorProject(_), akka_stm) lazy val akka_remote = project("akka-remote", "akka-remote", new AkkaRemoteProject(_), akka_typed_actor) lazy val akka_amqp = project("akka-amqp", "akka-amqp", new AkkaAMQPProject(_), akka_remote) lazy val akka_http = project("akka-http", "akka-http", new AkkaHttpProject(_), akka_remote, akka_camel)