diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index c4ab911d86..ce954e41de 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -13,7 +13,7 @@ import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.{TransactionManagement, TransactionSetAbortedException} import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, MessageSerializer, RemoteRequestProtocolIdFactory} -import se.scalablesolutions.akka.serialization.Serializer +import se.scalablesolutions.akka.serialization.{Serializer, BinaryString} import se.scalablesolutions.akka.util.{HashCode, Logging, UUID, ReentrantGuard} import RemoteActorSerialization._ @@ -306,7 +306,7 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef] if (isTypedActor) throw e else None } - if (future.exception.isDefined) throw future.exception.get._2 + if (future.exception.isDefined) throw future.exception.get else future.result } else throw new ActorInitializationException( "Actor has not been started, you need to invoke 'actor.start' before using it") @@ -1179,7 +1179,7 @@ class LocalActorRef private[akka]( } } - senderFuture.foreach(_.completeWithException(this, reason)) + senderFuture.foreach(_.completeWithException(reason)) clearTransaction if (topLevelTransaction) clearTransactionSet @@ -1253,6 +1253,15 @@ class LocalActorRef private[akka]( } else message } +/** + * System messages for RemoteActorRef. + * + * @author Jonas Bonér + */ +object RemoteActorSystemMessage { + val Stop = BinaryString("RemoteActorRef:stop") +} + /** * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node. * This reference is network-aware (remembers its origin) and immutable. @@ -1263,6 +1272,7 @@ private[akka] case class RemoteActorRef private[akka] ( uuuid: String, val className: String, val hostname: String, val port: Int, _timeout: Long, loader: Option[ClassLoader]) // uuid: String, className: String, hostname: String, port: Int, timeOut: Long, isOnRemoteHost: Boolean) extends ActorRef { extends ActorRef { + _uuid = uuuid timeout = _timeout @@ -1291,6 +1301,7 @@ private[akka] case class RemoteActorRef private[akka] ( def stop: Unit = { _isRunning = false _isShutDown = true + postMessageToMailbox(RemoteActorSystemMessage.Stop, None) } /** diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index 1e097f9034..84a7f487df 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -127,17 +127,13 @@ object ActorRegistry extends ListenerManagement { if (id eq null) throw new IllegalActorStateException("Actor.id is null " + actor) val set = actorsById get id - if(set ne null) - set add actor + if (set ne null) set add actor else { val newSet = new ConcurrentSkipListSet[ActorRef] newSet add actor - val oldSet = actorsById.putIfAbsent(id,newSet) - - //Parry for two simultaneous putIfAbsent(id,newSet) - if(oldSet ne null) - oldSet add actor + // Parry for two simultaneous putIfAbsent(id,newSet) + if (oldSet ne null) oldSet add actor } // UUID @@ -154,8 +150,7 @@ object ActorRegistry extends ListenerManagement { actorsByUUID remove actor.uuid val set = actorsById get actor.id - if (set ne null) - set remove actor + if (set ne null) set remove actor //FIXME: safely remove set if empty, leaks memory diff --git a/akka-core/src/main/scala/actor/TypedActor.scala b/akka-core/src/main/scala/actor/TypedActor.scala index 9c68a7dc01..c171a75211 100644 --- a/akka-core/src/main/scala/actor/TypedActor.scala +++ b/akka-core/src/main/scala/actor/TypedActor.scala @@ -629,10 +629,8 @@ private[akka] sealed class TypedActorAspect { } private def getResultOrThrowException[T](future: Future[T]): Option[T] = - if (future.exception.isDefined) { - val (_, cause) = future.exception.get - throw cause - } else future.result + if (future.exception.isDefined) throw future.exception.get + else future.result private def isVoid(rtti: MethodRtti) = rtti.getMethod.getReturnType == java.lang.Void.TYPE diff --git a/akka-core/src/main/scala/actor/UntypedActor.scala b/akka-core/src/main/scala/actor/UntypedActor.scala index 474c75e6de..2228db2de4 100644 --- a/akka-core/src/main/scala/actor/UntypedActor.scala +++ b/akka-core/src/main/scala/actor/UntypedActor.scala @@ -83,6 +83,15 @@ abstract class UntypedTransactor extends UntypedActor { self.makeTransactionRequired } +/** + * Factory closure for an UntypedActor, to be used with 'UntypedActor.actorOf(factory)'. + * + * @author Jonas Bonér + */ +trait UntypedActorFactory { + def create: UntypedActor +} + /** * Extend this abstract class to create a remote UntypedActor. * @@ -119,7 +128,7 @@ object UntypedActor { *
* ActorRef actor = UntypedActor.actorOf(MyUntypedActor.class);
* actor.start();
- * actor.sendOneWay(message, context)
+ * actor.sendOneWay(message, context);
* actor.stop();
*
* You can create and start the actor in one statement like this:
@@ -137,22 +146,23 @@ object UntypedActor {
* NOTE: Use this convenience method with care, do NOT make it possible to get a reference to the
* UntypedActor instance directly, but only through its 'UntypedActorRef' wrapper reference.
*
- * Creates an ActorRef out of the Actor. Allows you to pass in the instance for the Actor. Only
- * use this method when you need to pass in constructor arguments into the 'UntypedActor'.
+ * Creates an UntypedActorRef out of the Actor. Allows you to pass in the instance for the UntypedActor.
+ * Only use this method when you need to pass in constructor arguments into the 'UntypedActor'.
*
+ * You use it by implementing the UntypedActorFactory interface.
* Example in Java:
*
- * ActorRef actor = UntypedActor.actorOf(new MyUntypedActor("service:name", 5));
+ * UntypedActorRef actor = UntypedActor.actorOf(new UntypedActorFactory() {
+ * public UntypedActor create() {
+ * return new MyUntypedActor("service:name", 5);
+ * }
+ * });
* actor.start();
- * actor.sendOneWay(message, context)
+ * actor.sendOneWay(message, context);
* actor.stop();
*
- * You can create and start the actor in one statement like this:
- * - * ActorRef actor = UntypedActor.actorOf(MyUntypedActor.class).start(); - **/ - def actorOf(actorInstance: UntypedActor): UntypedActorRef = UntypedActorRef.wrap(new LocalActorRef(() => actorInstance)) + def actorOf(factory: UntypedActorFactory) = UntypedActorRef.wrap(new LocalActorRef(() => factory.create)) } /** diff --git a/akka-core/src/main/scala/config/SupervisionConfig.scala b/akka-core/src/main/scala/config/SupervisionConfig.scala index cb0829704d..071fdf0c12 100644 --- a/akka-core/src/main/scala/config/SupervisionConfig.scala +++ b/akka-core/src/main/scala/config/SupervisionConfig.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.config -import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import se.scalablesolutions.akka.actor.{ActorRef, UntypedActorRef} import se.scalablesolutions.akka.dispatch.MessageDispatcher sealed abstract class FaultHandlingStrategy @@ -25,11 +25,15 @@ object ScalaConfig { case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server { + def this(actorRef: UntypedActorRef, lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) = + this(actorRef.actorRef, lifeCycle, _remoteAddress) val remoteAddress: Option[RemoteAddress] = if (_remoteAddress eq null) None else Some(_remoteAddress) } object Supervise { def apply(actorRef: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorRef, lifeCycle, remoteAddress) def apply(actorRef: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, null) + def apply(actorRef: UntypedActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorRef, lifeCycle, remoteAddress) + def apply(actorRef: UntypedActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, null) def unapply(supervise: Supervise) = Some((supervise.actorRef, supervise.lifeCycle, supervise.remoteAddress)) } @@ -215,6 +219,9 @@ object JavaConfig { def newSupervised(actorRef: ActorRef) = se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorRef, lifeCycle.transform) + + def newSupervised(actorRef: UntypedActorRef) = + se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorRef, lifeCycle.transform) } } diff --git a/akka-core/src/main/scala/dispatch/Future.scala b/akka-core/src/main/scala/dispatch/Future.scala index d1b4f9572b..c365df3c3c 100644 --- a/akka-core/src/main/scala/dispatch/Future.scala +++ b/akka-core/src/main/scala/dispatch/Future.scala @@ -25,7 +25,7 @@ object Futures { try { promise completeWithResult body } catch { - case e => promise completeWithException (None, e) + case e => promise completeWithException e } promise } @@ -77,12 +77,12 @@ sealed trait Future[T] { def isExpired: Boolean def timeoutInNanos: Long def result: Option[T] - def exception: Option[Tuple2[AnyRef, Throwable]] + def exception: Option[Throwable] } trait CompletableFuture[T] extends Future[T] { def completeWithResult(result: T) - def completeWithException(toBlame: AnyRef, exception: Throwable) + def completeWithException(exception: Throwable) } // Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. @@ -96,7 +96,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { private val _signal = _lock.newCondition private var _completed: Boolean = _ private var _result: Option[T] = None - private var _exception: Option[Tuple2[AnyRef, Throwable]] = None + private var _exception: Option[Throwable] = None def await = try { _lock.lock @@ -147,7 +147,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { _lock.unlock } - def exception: Option[Tuple2[AnyRef, Throwable]] = try { + def exception: Option[Throwable] = try { _lock.lock _exception } finally { @@ -165,11 +165,11 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { _lock.unlock } - def completeWithException(toBlame: AnyRef, exception: Throwable) = try { + def completeWithException(exception: Throwable) = try { _lock.lock if (!_completed) { _completed = true - _exception = Some((toBlame, exception)) + _exception = Some(exception) } } finally { _signal.signalAll diff --git a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala index 55755250e2..6f8da64073 100644 --- a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -34,6 +34,7 @@ trait ThreadPoolBuilder { def buildThreadPool(): Unit = synchronized { ensureNotActive inProcessOfBuilding = false + threadPoolBuilder.allowCoreThreadTimeOut(true) if (boundedExecutorBound > 0) { val boundedExecutor = new BoundedExecutorDecorator(threadPoolBuilder, boundedExecutorBound) boundedExecutorBound = -1 diff --git a/akka-core/src/main/scala/remote/MessageSerializer.scala b/akka-core/src/main/scala/remote/MessageSerializer.scala index 24269c7f8e..8ef6f5d590 100644 --- a/akka-core/src/main/scala/remote/MessageSerializer.scala +++ b/akka-core/src/main/scala/remote/MessageSerializer.scala @@ -25,7 +25,6 @@ object MessageSerializer extends Logging { } def deserialize(messageProtocol: MessageProtocol): Any = { - log.debug("scheme = " + messageProtocol.getSerializationScheme) messageProtocol.getSerializationScheme match { case SerializationSchemeType.JAVA => unbox(SERIALIZER_JAVA.fromBinary(messageProtocol.getMessage.toByteArray, None)) diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index e17c7f57ef..fefb1521ed 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -44,9 +44,16 @@ object RemoteRequestProtocolIdFactory { * Life-cycle events for RemoteClient. */ sealed trait RemoteClientLifeCycleEvent -case class RemoteClientError(@BeanProperty val cause: Throwable, @BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent -case class RemoteClientDisconnected(@BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent -case class RemoteClientConnected(@BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent +case class RemoteClientError( + @BeanProperty val cause: Throwable, + @BeanProperty val host: String, + @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent +case class RemoteClientDisconnected( + @BeanProperty val host: String, + @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent +case class RemoteClientConnected( + @BeanProperty val host: String, + @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent class RemoteClientException private[akka](message: String) extends RuntimeException(message) @@ -259,23 +266,23 @@ class RemoteClientPipelineFactory( remoteAddress: SocketAddress, timer: HashedWheelTimer, client: RemoteClient) extends ChannelPipelineFactory { - def getPipeline: ChannelPipeline = { - def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*) + def getPipeline: ChannelPipeline = { + def join(ch: ChannelHandler*) = Array[ChannelHandler](ch: _*) val engine = RemoteServerSslContext.client.createSSLEngine() engine.setEnabledCipherSuites(engine.getSupportedCipherSuites) //TODO is this sensible? engine.setUseClientMode(true) - val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join() + val ssl = if (RemoteServer.SECURE) join(new SslHandler(engine)) else join() val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt) val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(RemoteReplyProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder - val(enc,dec) = RemoteServer.COMPRESSION_SCHEME match { - case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)),join(new ZlibDecoder)) - case _ => (join(),join()) + val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match { + case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) + case _ => (join(), join()) } val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client) @@ -326,7 +333,7 @@ class RemoteClientHandler( "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader)) } - future.completeWithException(null, parseException(reply, client.loader)) + future.completeWithException(parseException(reply, client.loader)) } futures.remove(reply.getId) } else { diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 89c0a6e437..ec33bd8600 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -309,10 +309,10 @@ object RemoteServerSslContext { //val algorithm = Option(Security.getProperty("ssl.KeyManagerFactory.algorithm")).getOrElse("SunX509") //val store = KeyStore.getInstance("JKS") val s = SSLContext.getInstance(protocol) - s.init(null,null,null) + s.init(null, null, null) val c = SSLContext.getInstance(protocol) - c.init(null,null,null) - (c,s) + c.init(null, null, null) + (c, s) } } @@ -429,25 +429,29 @@ class RemoteServerHandler( if (request.hasSender) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader)) else None - if (request.getIsOneWay) actorRef.!(message)(sender) - else { - try { - val resultOrNone = (actorRef.!!(message)(sender)).as[AnyRef] - val result = if (resultOrNone.isDefined) resultOrNone.get else null + message match { // first match on system messages + case RemoteActorSystemMessage.Stop => actorRef.stop + case _ => // then match on user defined messages + if (request.getIsOneWay) actorRef.!(message)(sender) + else { + try { + val resultOrNone = (actorRef.!!(message)(sender)).as[AnyRef] + val result = if (resultOrNone.isDefined) resultOrNone.get else null - log.debug("Returning result from actor invocation [%s]", result) - val replyBuilder = RemoteReplyProtocol.newBuilder - .setId(request.getId) - .setMessage(MessageSerializer.serialize(result)) - .setIsSuccessful(true) - .setIsActor(true) + log.debug("Returning result from actor invocation [%s]", result) + val replyBuilder = RemoteReplyProtocol.newBuilder + .setId(request.getId) + .setMessage(MessageSerializer.serialize(result)) + .setIsSuccessful(true) + .setIsActor(true) - if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) - channel.write(replyBuilder.build) + if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) + channel.write(replyBuilder.build) - } catch { - case e: Throwable => channel.write(createErrorReplyMessage(e, request, true)) - } + } catch { + case e: Throwable => channel.write(createErrorReplyMessage(e, request, true)) + } + } } } diff --git a/akka-core/src/main/scala/stm/Ref.scala b/akka-core/src/main/scala/stm/Ref.scala index 2ca6802b15..7d99c673a6 100644 --- a/akka-core/src/main/scala/stm/Ref.scala +++ b/akka-core/src/main/scala/stm/Ref.scala @@ -43,8 +43,9 @@ class Ref[T](initialOpt: Option[T] = None) def alter(f: T => T): T = { ensureNotNull - set(f(this.get)) - this.get + val value = f(this.get) + set(value) + value } def getOption: Option[T] = Option(this.get) diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOne.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOne.java new file mode 100644 index 0000000000..03df632582 --- /dev/null +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOne.java @@ -0,0 +1,6 @@ +package se.scalablesolutions.akka.actor; + +public interface RemoteTypedActorOne { + public String requestReply(String s) throws Exception; + public void oneWay() throws Exception; +} \ No newline at end of file diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java new file mode 100644 index 0000000000..0744652181 --- /dev/null +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java @@ -0,0 +1,29 @@ +package se.scalablesolutions.akka.actor.remote; + +import se.scalablesolutions.akka.actor.*; + +import java.util.concurrent.CountDownLatch; + +public class RemoteTypedActorOneImpl extends TypedActor implements RemoteTypedActorOne { + + public static CountDownLatch latch = new CountDownLatch(1); + + public String requestReply(String s) throws Exception { + if (s.equals("ping")) { + RemoteTypedActorLog.messageLog().put("ping"); + return "pong"; + } else if (s.equals("die")) { + throw new RuntimeException("Expected exception; to test fault-tolerance"); + } else return null; + } + + public void oneWay() throws Exception { + RemoteTypedActorLog.oneWayLog().put("oneway"); + } + + @Override + public void preRestart(Throwable e) { + try { RemoteTypedActorLog.messageLog().put(e.getMessage()); } catch(Exception ex) {} + latch.countDown(); + } +} \ No newline at end of file diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorTwo.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorTwo.java new file mode 100644 index 0000000000..58f294c6cd --- /dev/null +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorTwo.java @@ -0,0 +1,6 @@ +package se.scalablesolutions.akka.actor; + +public interface RemoteTypedActorTwo { + public String requestReply(String s) throws Exception; + public void oneWay() throws Exception; +} \ No newline at end of file diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorTwoImpl.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorTwoImpl.java new file mode 100644 index 0000000000..36bb055ef8 --- /dev/null +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorTwoImpl.java @@ -0,0 +1,29 @@ +package se.scalablesolutions.akka.actor.remote; + +import se.scalablesolutions.akka.actor.*; + +import java.util.concurrent.CountDownLatch; + +public class RemoteTypedActorTwoImpl extends TypedActor implements RemoteTypedActorTwo { + + public static CountDownLatch latch = new CountDownLatch(1); + + public String requestReply(String s) throws Exception { + if (s.equals("ping")) { + RemoteTypedActorLog.messageLog().put("ping"); + return "pong"; + } else if (s.equals("die")) { + throw new RuntimeException("Expected exception; to test fault-tolerance"); + } else return null; + } + + public void oneWay() throws Exception { + RemoteTypedActorLog.oneWayLog().put("oneway"); + } + + @Override + public void preRestart(Throwable e) { + try { RemoteTypedActorLog.messageLog().put(e.getMessage()); } catch(Exception ex) {} + latch.countDown(); + } +} \ No newline at end of file diff --git a/akka-core/src/test/scala/actor/untyped-actor/UntypedActorFireForgetRequestReplySpec.scala b/akka-core/src/test/scala/actor/untyped-actor/UntypedActorFireForgetRequestReplySpec.scala index 89a05eca9c..bb883142d6 100644 --- a/akka-core/src/test/scala/actor/untyped-actor/UntypedActorFireForgetRequestReplySpec.scala +++ b/akka-core/src/test/scala/actor/untyped-actor/UntypedActorFireForgetRequestReplySpec.scala @@ -11,6 +11,22 @@ import Actor._ class UntypedActorFireForgetRequestReplySpec extends WordSpec with MustMatchers { "An UntypedActor" should { + "be able to be created using the UntypedActor.actorOf(UntypedActorFactory) factory method" in { + UntypedActorTestState.finished = new CyclicBarrier(2); + UntypedActorTestState.log = "NIL"; + val replyActor = UntypedActor.actorOf(new UntypedActorFactory() { + def create: UntypedActor = new ReplyUntypedActor + }).start + val senderActor = UntypedActor.actorOf(new UntypedActorFactory() { + def create: UntypedActor = new SenderUntypedActor + }).start + senderActor.sendOneWay(replyActor) + senderActor.sendOneWay("ReplyToSendOneWayUsingReply") + try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) } + catch { case e: TimeoutException => fail("Never got the message") } + UntypedActorTestState.log must be ("Reply") + } + "reply to message sent with 'sendOneWay' using 'reply'" in { UntypedActorTestState.finished = new CyclicBarrier(2); UntypedActorTestState.log = "NIL"; diff --git a/akka-core/src/test/scala/actor/untyped-actor/UntypedTransactorSpec.scala b/akka-core/src/test/scala/actor/untyped-actor/UntypedTransactorSpec.scala new file mode 100644 index 0000000000..3ec707119c --- /dev/null +++ b/akka-core/src/test/scala/actor/untyped-actor/UntypedTransactorSpec.scala @@ -0,0 +1,239 @@ +package se.scalablesolutions.akka.actor + +import java.util.concurrent.{TimeUnit, CountDownLatch} +import org.scalatest.junit.JUnitSuite +import org.junit.Test + +import se.scalablesolutions.akka.stm.{Ref, TransactionalMap, TransactionalVector} +import UntypedActor._ + +object UntypedTransactorSpec { + case class GetMapState(key: String) + case object GetVectorState + case object GetVectorSize + case object GetRefState + + case class SetMapState(key: String, value: String) + case class SetVectorState(key: String) + case class SetRefState(key: String) + case class Success(key: String, value: String) + case class Failure(key: String, value: String, failer: UntypedActorRef) + + case class SetMapStateOneWay(key: String, value: String) + case class SetVectorStateOneWay(key: String) + case class SetRefStateOneWay(key: String) + case class SuccessOneWay(key: String, value: String) + case class FailureOneWay(key: String, value: String, failer: UntypedActorRef) + + case object GetNotifier +} +import UntypedTransactorSpec._ + +class StatefulUntypedTransactor(expectedInvocationCount: Int) extends UntypedTransactor { + def this() = this(0) + getContext.setTimeout(5000) + + val notifier = new CountDownLatch(expectedInvocationCount) + + private val mapState = TransactionalMap[String, String]() + private val vectorState = TransactionalVector[String]() + private val refState = Ref[String]() + + def onReceive(message: Any) = message match { + case GetNotifier => + getContext.replyUnsafe(notifier) + case GetMapState(key) => + getContext.replyUnsafe(mapState.get(key).get) + notifier.countDown + case GetVectorSize => + getContext.replyUnsafe(vectorState.length.asInstanceOf[AnyRef]) + notifier.countDown + case GetRefState => + getContext.replyUnsafe(refState.get) + notifier.countDown + case SetMapState(key, msg) => + mapState.put(key, msg) + getContext.replyUnsafe(msg) + notifier.countDown + case SetVectorState(msg) => + vectorState.add(msg) + getContext.replyUnsafe(msg) + notifier.countDown + case SetRefState(msg) => + refState.swap(msg) + getContext.replyUnsafe(msg) + notifier.countDown + case Success(key, msg) => + mapState.put(key, msg) + vectorState.add(msg) + refState.swap(msg) + getContext.replyUnsafe(msg) + notifier.countDown + case Failure(key, msg, failer) => + mapState.put(key, msg) + vectorState.add(msg) + refState.swap(msg) + failer.sendRequestReply("Failure") + getContext.replyUnsafe(msg) + notifier.countDown + case SetMapStateOneWay(key, msg) => + mapState.put(key, msg) + notifier.countDown + case SetVectorStateOneWay(msg) => + vectorState.add(msg) + notifier.countDown + case SetRefStateOneWay(msg) => + refState.swap(msg) + notifier.countDown + case SuccessOneWay(key, msg) => + mapState.put(key, msg) + vectorState.add(msg) + refState.swap(msg) + notifier.countDown + case FailureOneWay(key, msg, failer) => + mapState.put(key, msg) + vectorState.add(msg) + refState.swap(msg) + notifier.countDown + failer.sendOneWay("Failure",getContext) + } +} + +class StatefulUntypedTransactorExpectingTwoInvocations extends StatefulUntypedTransactor(2) + +@serializable +class FailerUntypedTransactor extends UntypedTransactor { + + def onReceive(message: Any) = message match { + case "Failure" => + throw new RuntimeException("Expected exception; to test fault-tolerance") + } +} + +class UntypedTransactorSpec extends JUnitSuite { + + @Test + def shouldOneWayMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start + stateful sendOneWay SetMapStateOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state + stateful sendOneWay SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired + val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch] + assert(notifier.await(1, TimeUnit.SECONDS)) + assert("new state" === (stateful sendRequestReply GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"))) + } + + @Test + def shouldMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = actorOf(classOf[StatefulUntypedTransactor]).start + stateful sendRequestReply SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state + stateful sendRequestReply Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired + assert("new state" === (stateful sendRequestReply GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"))) + } + + @Test + def shouldOneWayMapShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start + val failer = actorOf(classOf[FailerUntypedTransactor]).start + stateful sendOneWay SetMapStateOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state + stateful sendOneWay FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method + val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch] + assert(notifier.await(5, TimeUnit.SECONDS)) + assert("init" === (stateful sendRequestReply GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure"))) // check that state is == init state + } + + @Test + def shouldMapShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = actorOf(classOf[StatefulUntypedTransactor]).start + stateful sendRequestReply SetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state + val failer = actorOf(classOf[FailerUntypedTransactor]).start + try { + stateful sendRequestReply Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method + fail("should have thrown an exception") + } catch {case e: RuntimeException => {}} + assert("init" === (stateful sendRequestReply GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure"))) // check that state is == init state + } + + @Test + def shouldOneWayVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start + stateful sendOneWay SetVectorStateOneWay("init") // set init state + stateful sendOneWay SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired + val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch] + assert(notifier.await(1, TimeUnit.SECONDS)) + assert(2 === (stateful sendRequestReply GetVectorSize)) + } + + @Test + def shouldVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = actorOf(classOf[StatefulUntypedTransactor]).start + stateful sendRequestReply SetVectorState("init") // set init state + stateful sendRequestReply Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired + assert(2 === (stateful sendRequestReply GetVectorSize)) + } + + @Test + def shouldOneWayVectorShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start + stateful sendOneWay SetVectorStateOneWay("init") // set init state + Thread.sleep(1000) + val failer = actorOf(classOf[FailerUntypedTransactor]).start + stateful sendOneWay FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method + val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch] + assert(notifier.await(1, TimeUnit.SECONDS)) + assert(1 === (stateful sendRequestReply GetVectorSize)) + } + + @Test + def shouldVectorShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = actorOf(classOf[StatefulUntypedTransactor]).start + stateful sendRequestReply SetVectorState("init") // set init state + val failer = actorOf(classOf[FailerUntypedTransactor]).start + try { + stateful sendRequestReply Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method + fail("should have thrown an exception") + } catch {case e: RuntimeException => {}} + assert(1 === (stateful sendRequestReply GetVectorSize)) + } + + @Test + def shouldOneWayRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start + stateful sendOneWay SetRefStateOneWay("init") // set init state + stateful sendOneWay SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired + val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch] + assert(notifier.await(1, TimeUnit.SECONDS)) + assert("new state" === (stateful sendRequestReply GetRefState)) + } + + @Test + def shouldRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = actorOf(classOf[StatefulUntypedTransactor]).start + stateful sendRequestReply SetRefState("init") // set init state + stateful sendRequestReply Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired + assert("new state" === (stateful sendRequestReply GetRefState)) + } + + @Test + def shouldOneWayRefShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start + stateful sendOneWay SetRefStateOneWay("init") // set init state + Thread.sleep(1000) + val failer = actorOf(classOf[FailerUntypedTransactor]).start + stateful sendOneWay FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method + val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch] + assert(notifier.await(1, TimeUnit.SECONDS)) + assert("init" === (stateful sendRequestReply (GetRefState, 1000000))) // check that state is == init state + } + + @Test + def shouldRefShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = actorOf(classOf[StatefulUntypedTransactor]).start + stateful sendRequestReply SetRefState("init") // set init state + val failer = actorOf(classOf[FailerUntypedTransactor]).start + try { + stateful sendRequestReply Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method + fail("should have thrown an exception") + } catch {case e: RuntimeException => {}} + assert("init" === (stateful sendRequestReply GetRefState)) // check that state is == init state + } +} \ No newline at end of file diff --git a/akka-core/src/test/scala/dispatch/FutureSpec.scala b/akka-core/src/test/scala/dispatch/FutureSpec.scala index 39fc563007..f740763fdf 100644 --- a/akka-core/src/test/scala/dispatch/FutureSpec.scala +++ b/akka-core/src/test/scala/dispatch/FutureSpec.scala @@ -36,7 +36,7 @@ class FutureSpec extends JUnitSuite { val future = actor !!! "Failure" future.await assert(future.exception.isDefined) - assert("Expected exception; to test fault-tolerance" === future.exception.get._2.getMessage) + assert("Expected exception; to test fault-tolerance" === future.exception.get.getMessage) actor.stop } diff --git a/akka-core/src/test/scala/remote/RemoteTypedActorSpec.scala b/akka-core/src/test/scala/remote/RemoteTypedActorSpec.scala new file mode 100644 index 0000000000..c49882f61d --- /dev/null +++ b/akka-core/src/test/scala/remote/RemoteTypedActorSpec.scala @@ -0,0 +1,128 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB