diff --git a/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java b/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java index 4ee4ca4cfc..cf93a048c7 100644 --- a/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java +++ b/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java @@ -7,15 +7,11 @@ package akka.docs.transactor; //#class import akka.actor.*; import akka.transactor.*; -import scala.concurrent.stm.*; +import scala.concurrent.stm.Ref; +import scala.concurrent.stm.japi.Stm; public class CoordinatedCounter extends UntypedActor { - private Ref count = Stm.ref(0); - - private void increment(InTxn txn) { - Integer newValue = count.get(txn) + 1; - count.set(newValue, txn); - } + private Ref.View count = Stm.newRef(0); public void onReceive(Object incoming) throws Exception { if (incoming instanceof Coordinated) { @@ -26,14 +22,14 @@ public class CoordinatedCounter extends UntypedActor { if (increment.hasFriend()) { increment.getFriend().tell(coordinated.coordinate(new Increment())); } - coordinated.atomic(new Atomically() { - public void atomically(InTxn txn) { - increment(txn); + coordinated.atomic(new Runnable() { + public void run() { + Stm.increment(count, 1); } }); } } else if ("GetCount".equals(incoming)) { - getSender().tell(count.single().get()); + getSender().tell(count.get()); } else { unhandled(incoming); } diff --git a/akka-docs/java/code/akka/docs/transactor/Coordinator.java b/akka-docs/java/code/akka/docs/transactor/Coordinator.java index 8fa925824b..f1f04761cd 100644 --- a/akka-docs/java/code/akka/docs/transactor/Coordinator.java +++ b/akka-docs/java/code/akka/docs/transactor/Coordinator.java @@ -6,7 +6,6 @@ package akka.docs.transactor; import akka.actor.*; import akka.transactor.*; -import scala.concurrent.stm.*; public class Coordinator extends UntypedActor { public void onReceive(Object incoming) throws Exception { @@ -15,8 +14,8 @@ public class Coordinator extends UntypedActor { Object message = coordinated.getMessage(); if (message instanceof Message) { //#coordinated-atomic - coordinated.atomic(new Atomically() { - public void atomically(InTxn txn) { + coordinated.atomic(new Runnable() { + public void run() { // do something in the coordinated transaction ... } }); diff --git a/akka-docs/java/code/akka/docs/transactor/Counter.java b/akka-docs/java/code/akka/docs/transactor/Counter.java index 4e6f3d8a16..0160a34048 100644 --- a/akka-docs/java/code/akka/docs/transactor/Counter.java +++ b/akka-docs/java/code/akka/docs/transactor/Counter.java @@ -6,21 +6,21 @@ package akka.docs.transactor; //#class import akka.transactor.*; -import scala.concurrent.stm.*; +import scala.concurrent.stm.Ref; +import scala.concurrent.stm.japi.Stm; public class Counter extends UntypedTransactor { - Ref count = Stm.ref(0); + Ref.View count = Stm.newRef(0); - public void atomically(InTxn txn, Object message) { + public void atomically(Object message) { if (message instanceof Increment) { - Integer newValue = count.get(txn) + 1; - count.set(newValue, txn); + Stm.increment(count, 1); } } @Override public boolean normally(Object message) { if ("GetCount".equals(message)) { - getSender().tell(count.single().get()); + getSender().tell(count.get()); return true; } else return false; } diff --git a/akka-docs/java/code/akka/docs/transactor/FriendlyCounter.java b/akka-docs/java/code/akka/docs/transactor/FriendlyCounter.java index f643b5d892..14a01f859e 100644 --- a/akka-docs/java/code/akka/docs/transactor/FriendlyCounter.java +++ b/akka-docs/java/code/akka/docs/transactor/FriendlyCounter.java @@ -8,10 +8,11 @@ package akka.docs.transactor; import akka.actor.*; import akka.transactor.*; import java.util.Set; -import scala.concurrent.stm.*; +import scala.concurrent.stm.Ref; +import scala.concurrent.stm.japi.Stm; public class FriendlyCounter extends UntypedTransactor { - Ref count = Stm.ref(0); + Ref.View count = Stm.newRef(0); @Override public Set coordinate(Object message) { if (message instanceof Increment) { @@ -22,16 +23,15 @@ public class FriendlyCounter extends UntypedTransactor { return nobody(); } - public void atomically(InTxn txn, Object message) { + public void atomically(Object message) { if (message instanceof Increment) { - Integer newValue = count.get(txn) + 1; - count.set(newValue, txn); + Stm.increment(count, 1); } } @Override public boolean normally(Object message) { if ("GetCount".equals(message)) { - getSender().tell(count.single().get()); + getSender().tell(count.get()); return true; } else return false; } diff --git a/akka-docs/java/transactors.rst b/akka-docs/java/transactors.rst index f7471412a9..9dd69664b6 100644 --- a/akka-docs/java/transactors.rst +++ b/akka-docs/java/transactors.rst @@ -102,7 +102,7 @@ be sent. :language: java To enter the coordinated transaction use the atomic method of the coordinated -object, passing in an ``akka.transactor.Atomically`` object. +object, passing in a ``java.lang.Runnable``. .. includecode:: code/akka/docs/transactor/Coordinator.java#coordinated-atomic :language: java diff --git a/akka-docs/project/migration-guide-1.3.x-2.0.x.rst b/akka-docs/project/migration-guide-1.3.x-2.0.x.rst index 219136ac95..751535feba 100644 --- a/akka-docs/project/migration-guide-1.3.x-2.0.x.rst +++ b/akka-docs/project/migration-guide-1.3.x-2.0.x.rst @@ -6,7 +6,9 @@ .. sidebar:: Contents - .. contents:: :local: + .. contents:: + :local: + :depth: 3 Actors ====== @@ -77,8 +79,11 @@ Last task of the migration would be to create your own ``ActorSystem``. Unordered Collection of Migration Items ======================================= +Actors +------ + Creating and starting actors ----------------------------- +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Actors are created by passing in a ``Props`` instance into the actorOf factory method in a ``ActorRefProvider``, which is the ``ActorSystem`` or ``ActorContext``. @@ -111,7 +116,7 @@ Documentation: * :ref:`untyped-actors-java` Stopping actors ---------------- +^^^^^^^^^^^^^^^ ``ActorRef.stop()`` has been moved. Use ``ActorSystem`` or ``ActorContext`` to stop actors. @@ -144,7 +149,7 @@ Documentation: * :ref:`untyped-actors-java` Identifying Actors ------------------- +^^^^^^^^^^^^^^^^^^ In v1.3 actors have ``uuid`` and ``id`` field. In v2.0 each actor has a unique logical ``path``. @@ -167,7 +172,7 @@ Documentation: * :ref:`untyped-actors-java` Reply to messages ------------------ +^^^^^^^^^^^^^^^^^ ``self.channel`` has been replaced with unified reply mechanism using ``sender`` (Scala) or ``getSender()`` (Java). This works for both tell (!) and ask (?). @@ -189,7 +194,7 @@ Documentation: * :ref:`untyped-actors-java` ``ActorRef.ask()`` ------------------- +^^^^^^^^^^^^^^^^^^ The mechanism for collecting an actor’s reply in a :class:`Future` has been reworked for better location transparency: it uses an actor under the hood. @@ -206,7 +211,7 @@ Documentation: * :ref:`untyped-actors-java` ActorPool ---------- +^^^^^^^^^ The ActorPool has been replaced by dynamically resizable routers. @@ -216,7 +221,7 @@ Documentation: * :ref:`routing-java` ``UntypedActor.getContext()`` (Java API only) ---------------------------------------------- +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ``getContext()`` in the Java API for UntypedActor is renamed to ``getSelf()``. @@ -234,7 +239,7 @@ Documentation: * :ref:`untyped-actors-java` Configuration -------------- +^^^^^^^^^^^^^ A new, more powerful, configuration utility has been implemented. The format of the configuration file is very similar to the format in v1.3. In addition it also supports @@ -287,7 +292,7 @@ Documentation: * :ref:`configuration` Logging -------- +^^^^^^^ EventHandler API has been replaced by LoggingAdapter, which publish log messages to the event bus. You can still plugin your own actor as event listener with the @@ -321,7 +326,7 @@ Documentation: Scheduler ---------- +^^^^^^^^^ The functionality of the scheduler is identical, but the API is slightly adjusted. @@ -362,7 +367,7 @@ Documentation: * :ref:`scheduler-java` Supervision ------------ +^^^^^^^^^^^ Akka v2.0 implements parental supervision. Actors can only be created by other actors — where the top-level actor is provided by the library — and each created actor is supervised by its parent. @@ -438,7 +443,7 @@ Documentation: * :ref:`untyped-actors-java` Dispatchers ------------ +^^^^^^^^^^^ Dispatchers are defined in configuration instead of in code. @@ -479,7 +484,7 @@ Documentation: * :ref:`dispatchers-scala` Spawn ------ +^^^^^ ``spawn`` has been removed and can be implemented like this, if needed. Be careful to not access any shared mutable state closed over by the body. @@ -495,7 +500,7 @@ Documentation: * :ref:`jmm` HotSwap -------- +^^^^^^^ In v2.0 ``become`` and ``unbecome`` metods are located in ``ActorContext``, i.e. ``context.become`` and ``context.unbecome``. @@ -506,12 +511,269 @@ in the actor receiving the message. * :ref:`actors-scala` * :ref:`untyped-actors-java` +STM +--- + +In Akka v2.0 `ScalaSTM`_ is used rather than Multiverse. + +.. _ScalaSTM: http://nbronson.github.com/scala-stm/ + +Agent and Transactor have been ported to ScalaSTM. The API's for Agent and +Transactor are basically the same, other than integration with ScalaSTM. See: + + * :ref:`agents-scala` + * :ref:`agents-java` + * :ref:`transactors-scala` + * :ref:`transactors-java` + +Imports +^^^^^^^ + +Scala +~~~~~ + +To use ScalaSTM the import from Scala is:: + + import scala.concurrent.stm._ + +Java +~~~~ + +For Java there is a special helper object with Java-friendly methods:: + + import scala.concurrent.stm.japi.Stm; + +These methods can also be statically imported:: + + import static scala.concurrent.stm.japi.Stm.*; + +Other imports that are needed are in the stm package, particularly ``Ref``:: + + import scala.concurrent.stm.Ref; + +Transactions +^^^^^^^^^^^^ + +Scala +~~~~~ + +Both v1.3 and v2.0 provide an ``atomic`` block, however, the ScalaSTM ``atomic`` +is a function from ``InTxn`` to return type. + +v1.3:: + + atomic { + // do something in transaction + } + +v2.0:: + + atomic { implicit txn => + // do something in transaction + } + +Note that in ScalaSTM the ``InTxn`` in the atomic function is usually marked as +implicit as transactional references require an implicit ``InTxn`` on all +methods. That is, the transaction is statically required and it is a +compile-time warning to use a reference without a transaction. There is also a +``Ref.View`` for operations without requiring an ``InTxn`` statically. See below +for more information. + +Java +~~~~ + +In the ScalaSTM Java API helpers there are atomic methods which accept +``java.lang.Runnable`` and ``java.util.concurrent.Callable``. + +v1.3:: + + new Atomic() { + public Object atomically() { + // in transaction + return null; + } + }.execute(); + + SomeObject result = new Atomic() { + public SomeObject atomically() { + // in transaction + return ...; + } + }.execute(); + +v2.0:: + + import static scala.concurrent.stm.japi.Stm.atomic; + import java.util.concurrent.Callable; + + atomic(new Runnable() { + public void run() { + // in transaction + } + }); + + SomeObject result = atomic(new Callable() { + public SomeObject call() { + // in transaction + return ...; + } + }); + +Ref +^^^ + +Scala +~~~~~ + +Other than the import, creating a Ref is basically identical between Akka STM in +v1.3 and ScalaSTM used in v2.0. + +v1.3:: + + val ref = Ref(0) + +v2.0:: + + val ref = Ref(0) + +The API for Ref is similar. For example: + +v1.3:: + + ref.get // get current value + ref() // same as get + + ref.set(1) // set to new value, return old value + ref() = 1 // same as set + ref.swap(2) // same as set + + ref alter { _ + 1 } // apply a function, return new value + +v2.0:: + + ref.get // get current value + ref() // same as get + + ref.set(1) // set to new value, return nothing + ref() = 1 // same as set + ref.swap(2) // set and return old value + + ref transform { _ + 1 } // apply function, return nothing + + ref transformIfDefined { case 1 => 2 } // apply partial function if defined + +Ref.View +^^^^^^^^ + +In v1.3 using a ``Ref`` method outside of a transaction would automatically +create a single-operation transaction. In v2.0 (in ScalaSTM) there is a +``Ref.View`` which provides methods without requiring a current +transaction. + +Scala +~~~~~ + +The ``Ref.View`` can be accessed with the ``single`` method:: + + ref.single() // returns current value + ref.single() = 1 // set new value + + // with atomic this would be: + + atomic { implicit t => ref() } + atomic { implicit t => ref() = 1 } + +Java +~~~~ + +As ``Ref.View`` in ScalaSTM does not require implicit transactions, this is more +easily used from Java. ``Ref`` could be used, but requires explicit threading of +transactions. There are helper methods in ``japi.Stm`` for creating ``Ref.View`` +references. + +v1.3:: + + Ref ref = new Ref(0); + +v2.0:: + + Ref.View ref = Stm.newRef(0); + +The ``set`` and ``get`` methods work the same way for both versions. + +v1.3:: + + ref.get(); // get current value + ref.set(1); // set new value + +v2.0:: + + ref.get(); // get current value + ref.set(1); // set new value + +There are also ``transform``, ``getAndTransform``, and ``transformAndGet`` +methods in ``japi.Stm`` which accept ``scala.runtime.AbstractFunction1``. + +There are ``increment`` helper methods for ``Ref.View`` and +``Ref.View`` references. + +Transaction lifecycle callbacks +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Scala +~~~~~ + +It is also possible to hook into the transaction lifecycle in ScalaSTM. See the +ScalaSTM documentation for the full range of possibilities. + +v1.3:: + + atomic { + deferred { + // executes when transaction commits + } + compensating { + // executes when transaction aborts + } + } + +v2.0:: + + atomic { implicit txn => + txn.afterCommit { txnStatus => + // executes when transaction commits + } + txn.afterRollback { txnStatus => + // executes when transaction rolls back + } + } + +Java +~~~~ + +Rather than using the ``deferred`` and ``compensating`` methods in +``akka.stm.StmUtils``, use the ``afterCommit`` and ``afterRollback`` methods in +``scala.concurrent.stm.japi.Stm``, which behave in the same way and accept +``Runnable``. + +Transactional Datastructures +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +In ScalaSTM see ``TMap``, ``TSet``, and ``TArray`` for transactional +datastructures. + +There are helper methods for creating these from Java in ``japi.Stm``: +``newTMap``, ``newTSet``, and ``newTArray``. These datastructures implement the +``scala.collection`` interfaces and can also be used from Java with Scala's +``JavaConversions``. There are helper methods that apply the conversions, +returning ``java.util`` ``Map``, ``Set``, and ``List``: ``newMap``, ``newSet``, +and ``newList``. + + More to be written ------------------ * Futures -* STM * TypedActors * Routing * Remoting -* ...? \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index bfb30bc940..f7d6b1d8b3 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -9,8 +9,7 @@ import akka.remote._ import RemoteProtocol._ import akka.util._ import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture } -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory +import org.jboss.netty.channel.socket.nio.{ NioServerSocketChannelFactory, NioClientSocketChannelFactory } import org.jboss.netty.bootstrap.{ ServerBootstrap, ClientBootstrap } import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } @@ -121,6 +120,7 @@ class PassiveRemoteClient(val currentChannel: Channel, class ActiveRemoteClient private[akka] ( remoteSupport: NettyRemoteSupport, remoteAddress: RemoteNettyAddress, + localAddress: RemoteSystemAddress[ParsedTransportAddress], val loader: Option[ClassLoader] = None) extends RemoteClient(remoteSupport, remoteAddress) { @@ -132,7 +132,11 @@ class ActiveRemoteClient private[akka] ( @volatile private var bootstrap: ClientBootstrap = _ @volatile - private[remote] var connection: ChannelFuture = _ + private var connection: ChannelFuture = _ + @volatile + private[remote] var openChannels: DefaultChannelGroup = _ + @volatile + private var executionHandler: ExecutionHandler = _ @volatile private var reconnectionTimeWindowStart = 0L @@ -141,10 +145,6 @@ class ActiveRemoteClient private[akka] ( def currentChannel = connection.getChannel - private val senderRemoteAddress = remoteSupport.remote.remoteAddress - @volatile - private var executionHandler: ExecutionHandler = _ - /** * Connect to remote server. */ @@ -154,9 +154,9 @@ class ActiveRemoteClient private[akka] ( val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT) if (SecureCookie.nonEmpty) handshake.setCookie(SecureCookie.get) handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder - .setSystem(senderRemoteAddress.system) - .setHostname(senderRemoteAddress.transport.host) - .setPort(senderRemoteAddress.transport.port) + .setSystem(localAddress.system) + .setHostname(localAddress.transport.host) + .setPort(localAddress.transport.port) .build) connection.getChannel.write(remoteSupport.createControlEnvelope(handshake.build)) } @@ -164,7 +164,7 @@ class ActiveRemoteClient private[akka] ( def attemptReconnect(): Boolean = { log.debug("Remote client reconnecting to [{}]", remoteAddress) connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port)) - connection.awaitUninterruptibly.getChannel // Wait until the connection attempt succeeds or fails. + openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. if (!connection.isSuccess) { notifyListeners(RemoteClientError(connection.getCause, remoteSupport, remoteAddress)) @@ -176,11 +176,11 @@ class ActiveRemoteClient private[akka] ( } runSwitch switchOn { + openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName) + executionHandler = new ExecutionHandler(remoteSupport.executor) - bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(remoteSupport.threadFactory), - Executors.newCachedThreadPool(remoteSupport.threadFactory))) + bootstrap = new ClientBootstrap(remoteSupport.clientChannelFactory) bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, executionHandler, remoteAddress, this)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) @@ -188,7 +188,8 @@ class ActiveRemoteClient private[akka] ( log.debug("Starting remote client connection to [{}]", remoteAddress) connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port)) - connection.awaitUninterruptibly.getChannel // Wait until the connection attempt succeeds or fails. + + openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. if (!connection.isSuccess) { notifyListeners(RemoteClientError(connection.getCause, remoteSupport, remoteAddress)) @@ -202,6 +203,7 @@ class ActiveRemoteClient private[akka] ( case true ⇒ true case false if reconnectIfAlreadyConnected ⇒ connection.getChannel.close() + openChannels.remove(connection.getChannel) log.debug("Remote client reconnecting to [{}]", remoteAddress) attemptReconnect() @@ -219,13 +221,11 @@ class ActiveRemoteClient private[akka] ( if ((connection ne null) && (connection.getChannel ne null)) connection.getChannel.close() } finally { - connection = null - executionHandler = null - //Do not do this: executionHandler.releaseExternalResources(), since it's shutting down the shared threadpool try { - bootstrap.releaseExternalResources() + if (openChannels ne null) openChannels.close.awaitUninterruptibly() } finally { - bootstrap = null + connection = null + executionHandler = null } } @@ -324,7 +324,10 @@ class ActiveRemoteClientHandler( if (client.isWithinReconnectionTimeWindow) { timer.newTimeout(new TimerTask() { def run(timeout: Timeout) = - if (client.isRunning) client.connect(reconnectIfAlreadyConnected = true) + if (client.isRunning) { + client.openChannels.remove(event.getChannel) + client.connect(reconnectIfAlreadyConnected = true) + } }, client.remoteSupport.clientSettings.ReconnectDelay.toMillis, TimeUnit.MILLISECONDS) } else runOnceNow { client.remoteSupport.shutdownClientConnection(remoteAddress) // spawn in another thread @@ -369,8 +372,10 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre val serverSettings = remote.remoteSettings.serverSettings val clientSettings = remote.remoteSettings.clientSettings + val threadFactory = new MonitorableThreadFactory("NettyRemoteSupport", remote.remoteSettings.Daemonic) - val timer: HashedWheelTimer = new HashedWheelTimer + val timer: HashedWheelTimer = new HashedWheelTimer(threadFactory) + val executor = new OrderedMemoryAwareThreadPoolExecutor( serverSettings.ExecutionPoolSize, serverSettings.MaxChannelMemorySize, @@ -379,6 +384,10 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre serverSettings.ExecutionPoolKeepAlive.unit, threadFactory) + val clientChannelFactory = new NioClientSocketChannelFactory( + Executors.newCachedThreadPool(threadFactory), + Executors.newCachedThreadPool(threadFactory)) + private val remoteClients = new HashMap[RemoteNettyAddress, RemoteClient] private val clientsLock = new ReentrantReadWriteLock @@ -411,7 +420,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre //Recheck for addition, race between upgrades case Some(client) ⇒ client //If already populated by other writer case None ⇒ //Populate map - val client = new ActiveRemoteClient(this, recipientAddress, loader) + val client = new ActiveRemoteClient(this, recipientAddress, remote.remoteAddress, loader) client.connect() remoteClients += recipientAddress -> client client @@ -479,26 +488,20 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre /** * Server section */ - private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None) + @volatile + private var currentServer: NettyRemoteServer = _ - def name = currentServer.get match { - case Some(server) ⇒ server.name - case None ⇒ remote.remoteAddress.toString + def name = currentServer match { + case null ⇒ remote.remoteAddress.toString + case server ⇒ server.name } private val _isRunning = new Switch(false) def isRunning = _isRunning.isOn - def start(loader: Option[ClassLoader] = None): Unit = { - _isRunning switchOn { - try { - currentServer.set(Some(new NettyRemoteServer(this, loader, address))) - } catch { - case e: Exception ⇒ notifyListeners(RemoteServerError(e, this)) - } - } - } + def start(loader: Option[ClassLoader] = None): Unit = + _isRunning switchOn { currentServer = new NettyRemoteServer(this, loader, address) } /** * Common section @@ -512,9 +515,19 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre } finally { clientsLock.writeLock().unlock() try { - currentServer.getAndSet(None) foreach { _.shutdown() } + val s = currentServer + currentServer = null + s.shutdown() } finally { - try { timer.stop() } finally { executor.shutdown() } + try { + timer.stop() + } finally { + try { + clientChannelFactory.releaseExternalResources() + } finally { + executor.shutdown() + } + } } } } diff --git a/akka-transactor/src/main/scala/akka/transactor/Atomically.scala b/akka-transactor/src/main/scala/akka/transactor/Atomically.scala deleted file mode 100644 index b568fa18a0..0000000000 --- a/akka-transactor/src/main/scala/akka/transactor/Atomically.scala +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.transactor - -import scala.concurrent.stm._ - -/** - * Java API. - * - * For creating Java-friendly coordinated atomic blocks. - * - * @see [[akka.transactor.Coordinated]] - */ -trait Atomically { - def atomically(txn: InTxn): Unit -} - -/** - * Java API. - * - * For creating completion handlers. - */ -trait CompletionHandler { - def handle(status: Txn.Status): Unit -} - -/** - * Java API. - * - * To ease some of the pain of using Scala STM from Java until - * the proper Java API is created. - */ -object Stm { - /** - * Create an STM Ref with an initial value. - */ - def ref[A](initialValue: A): Ref[A] = Ref(initialValue) - - /** - * Add a CompletionHandler to run after the current transaction - * has committed. - */ - def afterCommit(handler: CompletionHandler): Unit = { - val txn = Txn.findCurrent - if (txn.isDefined) Txn.afterCommit(status ⇒ handler.handle(status))(txn.get) - } - - /** - * Add a CompletionHandler to run after the current transaction - * has rolled back. - */ - def afterRollback(handler: CompletionHandler): Unit = { - val txn = Txn.findCurrent - if (txn.isDefined) Txn.afterRollback(status ⇒ handler.handle(status))(txn.get) - } - - /** - * Add a CompletionHandler to run after the current transaction - * has committed or rolled back. - */ - def afterCompletion(handler: CompletionHandler): Unit = { - val txn = Txn.findCurrent - if (txn.isDefined) Txn.afterCompletion(status ⇒ handler.handle(status))(txn.get) - } -} diff --git a/akka-transactor/src/main/scala/akka/transactor/Coordinated.scala b/akka-transactor/src/main/scala/akka/transactor/Coordinated.scala index bcc716e3c4..761f5a50e6 100644 --- a/akka-transactor/src/main/scala/akka/transactor/Coordinated.scala +++ b/akka-transactor/src/main/scala/akka/transactor/Coordinated.scala @@ -6,7 +6,8 @@ package akka.transactor import akka.AkkaException import akka.util.Timeout -import scala.concurrent.stm._ +import scala.concurrent.stm.{ CommitBarrier, InTxn } +import java.util.concurrent.Callable /** * Akka-specific exception for coordinated transactions. @@ -125,7 +126,7 @@ class Coordinated(val message: Any, member: CommitBarrier.Member) { * * @throws CoordinatedTransactionException if the coordinated transaction fails. */ - def atomic[T](body: InTxn ⇒ T): T = { + def atomic[A](body: InTxn ⇒ A): A = { member.atomic(body) match { case Right(result) ⇒ result case Left(CommitBarrier.MemberUncaughtExceptionCause(x)) ⇒ @@ -136,13 +137,22 @@ class Coordinated(val message: Any, member: CommitBarrier.Member) { } /** - * Java API: coordinated atomic method that accepts an [[akka.transactor.Atomically]]. + * Java API: coordinated atomic method that accepts a `java.lang.Runnable`. * Delimits the coordinated transaction. The transaction will wait for all other transactions * in this coordination before committing. The timeout is specified when creating the Coordinated. * * @throws CoordinatedTransactionException if the coordinated transaction fails. */ - def atomic(atomically: Atomically): Unit = atomic(txn ⇒ atomically.atomically(txn)) + def atomic(runnable: Runnable): Unit = atomic { _ ⇒ runnable.run } + + /** + * Java API: coordinated atomic method that accepts a `java.util.concurrent.Callable`. + * Delimits the coordinated transaction. The transaction will wait for all other transactions + * in this coordination before committing. The timeout is specified when creating the Coordinated. + * + * @throws CoordinatedTransactionException if the coordinated transaction fails. + */ + def atomic[A](callable: Callable[A]): A = atomic { _ ⇒ callable.call } /** * An empty coordinated atomic block. Can be used to complete the number of members involved diff --git a/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala b/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala index 679696f487..ce77959a4f 100644 --- a/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala +++ b/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala @@ -25,7 +25,7 @@ abstract class UntypedTransactor extends UntypedActor { sendTo.actor.tell(coordinated(sendTo.message.getOrElse(message))) } before(message) - coordinated.atomic { txn ⇒ atomically(txn, message) } + coordinated.atomic { txn ⇒ atomically(message) } after(message) } case message ⇒ { @@ -84,7 +84,7 @@ abstract class UntypedTransactor extends UntypedActor { * The Receive block to run inside the coordinated transaction. */ @throws(classOf[Exception]) - def atomically(txn: InTxn, message: Any) {} + def atomically(message: Any) /** * A Receive block that runs after the coordinated transaction. diff --git a/akka-transactor/src/main/scala/scala/concurrent/stm/japi/Stm.scala b/akka-transactor/src/main/scala/scala/concurrent/stm/japi/Stm.scala new file mode 100644 index 0000000000..d9ed5a8330 --- /dev/null +++ b/akka-transactor/src/main/scala/scala/concurrent/stm/japi/Stm.scala @@ -0,0 +1,147 @@ +/* scala-stm - (c) 2009-2011, Stanford University, PPL */ + +package scala.concurrent.stm.japi + +import java.util.concurrent.Callable +import java.util.{ List ⇒ JList, Map ⇒ JMap, Set ⇒ JSet } +import scala.collection.JavaConversions +import scala.concurrent.stm +import scala.concurrent.stm._ +import scala.runtime.AbstractFunction1 + +/** + * Java-friendly API for ScalaSTM. + * These methods can also be statically imported. + */ +object Stm { + + /** + * Create a Ref with an initial value. Return a `Ref.View`, which does not + * require implicit transactions. + * @param initialValue the initial value for the newly created `Ref.View` + * @return a new `Ref.View` + */ + def newRef[A](initialValue: A): Ref.View[A] = Ref(initialValue).single + + /** + * Create an empty TMap. Return a `TMap.View`, which does not require + * implicit transactions. See newMap for included java conversion. + * @return a new, empty `TMap.View` + */ + def newTMap[A, B](): TMap.View[A, B] = TMap.empty[A, B].single + + /** + * Create an empty TMap. Return a `java.util.Map` view of this TMap. + * @return a new, empty `TMap.View` wrapped as a `java.util.Map`. + */ + def newMap[A, B](): JMap[A, B] = JavaConversions.mutableMapAsJavaMap(newTMap[A, B]) + + /** + * Create an empty TSet. Return a `TSet.View`, which does not require + * implicit transactions. See newSet for included java conversion. + * @return a new, empty `TSet.View` + */ + def newTSet[A](): TSet.View[A] = TSet.empty[A].single + + /** + * Create an empty TSet. Return a `java.util.Set` view of this TSet. + * @return a new, empty `TSet.View` wrapped as a `java.util.Set`. + */ + def newSet[A](): JSet[A] = JavaConversions.mutableSetAsJavaSet(newTSet[A]) + + /** + * Create a TArray containing `length` elements. Return a `TArray.View`, + * which does not require implicit transactions. See newList for included + * java conversion. + * @param length the length of the `TArray.View` to be created + * @return a new `TArray.View` containing `length` elements (initially null) + */ + def newTArray[A <: AnyRef](length: Int): TArray.View[A] = TArray.ofDim[A](length)(ClassManifest.classType(AnyRef.getClass)).single + + /** + * Create an empty TArray. Return a `java.util.List` view of this Array. + * @param length the length of the `TArray.View` to be created + * @return a new, empty `TArray.View` wrapped as a `java.util.List`. + */ + def newList[A <: AnyRef](length: Int): JList[A] = JavaConversions.mutableSeqAsJavaList(newTArray[A](length)) + + /** + * Atomic block that takes a `Runnable`. + * @param runnable the `Runnable` to run within a transaction + */ + def atomic(runnable: Runnable): Unit = stm.atomic { txn ⇒ runnable.run } + + /** + * Atomic block that takes a `Callable`. + * @param callable the `Callable` to run within a transaction + * @return the value returned by the `Callable` + */ + def atomic[A](callable: Callable[A]): A = stm.atomic { txn ⇒ callable.call } + + /** + * Transform the value stored by `ref` by applying the function `f`. + * @param ref the `Ref.View` to be transformed + * @param f the function to be applied + */ + def transform[A](ref: Ref.View[A], f: AbstractFunction1[A, A]): Unit = ref.transform(f) + + /** + * Transform the value stored by `ref` by applying the function `f` and + * return the old value. + * @param ref the `Ref.View` to be transformed + * @param f the function to be applied + * @return the old value of `ref` + */ + def getAndTransform[A](ref: Ref.View[A], f: AbstractFunction1[A, A]): A = ref.getAndTransform(f) + + /** + * Transform the value stored by `ref` by applying the function `f` and + * return the new value. + * @param ref the `Ref.View` to be transformed + * @param f the function to be applied + * @return the new value of `ref` + */ + def transformAndGet[A](ref: Ref.View[A], f: AbstractFunction1[A, A]): A = ref.transformAndGet(f) + + /** + * Increment the `java.lang.Integer` value of a `Ref.View`. + * @param ref the `Ref.View` to be incremented + * @param delta the amount to increment + */ + def increment(ref: Ref.View[java.lang.Integer], delta: Int): Unit = ref.transform { v ⇒ v.intValue + delta } + + /** + * Increment the `java.lang.Long` value of a `Ref.View`. + * @param ref the `Ref.View` to be incremented + * @param delta the amount to increment + */ + def increment(ref: Ref.View[java.lang.Long], delta: Long): Unit = ref.transform { v ⇒ v.longValue + delta } + + /** + * Add a task to run after the current transaction has committed. + * @param task the `Runnable` task to run after transaction commit + */ + def afterCommit(task: Runnable): Unit = { + val txn = Txn.findCurrent + if (txn.isDefined) Txn.afterCommit(status ⇒ task.run)(txn.get) + } + + /** + * Add a task to run after the current transaction has rolled back. + * @param task the `Runnable` task to run after transaction rollback + */ + def afterRollback(task: Runnable): Unit = { + val txn = Txn.findCurrent + if (txn.isDefined) Txn.afterRollback(status ⇒ task.run)(txn.get) + } + + /** + * Add a task to run after the current transaction has either rolled back + * or committed. + * @param task the `Runnable` task to run after transaction completion + */ + def afterCompletion(task: Runnable): Unit = { + val txn = Txn.findCurrent + if (txn.isDefined) Txn.afterCompletion(status ⇒ task.run)(txn.get) + } +} diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java index 5d59691b9e..7d169b1548 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java @@ -7,24 +7,20 @@ package akka.transactor; import akka.actor.ActorRef; import akka.actor.Actors; import akka.actor.UntypedActor; -import scala.concurrent.stm.*; +import scala.concurrent.stm.Ref; +import scala.concurrent.stm.japi.Stm; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class UntypedCoordinatedCounter extends UntypedActor { private String name; - private Ref count = Stm.ref(0); + private Ref.View count = Stm.newRef(0); public UntypedCoordinatedCounter(String name) { this.name = name; } - private void increment(InTxn txn) { - Integer newValue = count.get(txn) + 1; - count.set(newValue, txn); - } - public void onReceive(Object incoming) throws Exception { if (incoming instanceof Coordinated) { Coordinated coordinated = (Coordinated) incoming; @@ -33,8 +29,8 @@ public class UntypedCoordinatedCounter extends UntypedActor { Increment increment = (Increment) message; List friends = increment.getFriends(); final CountDownLatch latch = increment.getLatch(); - final CompletionHandler countDown = new CompletionHandler() { - public void handle(Txn.Status status) { + final Runnable countDown = new Runnable() { + public void run() { latch.countDown(); } }; @@ -42,15 +38,15 @@ public class UntypedCoordinatedCounter extends UntypedActor { Increment coordMessage = new Increment(friends.subList(1, friends.size()), latch); friends.get(0).tell(coordinated.coordinate(coordMessage)); } - coordinated.atomic(new Atomically() { - public void atomically(InTxn txn) { - increment(txn); + coordinated.atomic(new Runnable() { + public void run() { + Stm.increment(count, 1); Stm.afterCompletion(countDown); } }); } } else if ("GetCount".equals(incoming)) { - getSender().tell(count.single().get()); + getSender().tell(count.get()); } } } diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java b/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java index bc3524845f..446b79a747 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java @@ -7,7 +7,8 @@ package akka.transactor; import akka.actor.ActorRef; import akka.transactor.UntypedTransactor; import akka.transactor.SendTo; -import scala.concurrent.stm.*; +import scala.concurrent.stm.Ref; +import scala.concurrent.stm.japi.Stm; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -15,17 +16,12 @@ import java.util.concurrent.TimeUnit; public class UntypedCounter extends UntypedTransactor { private String name; - private Ref count = Stm.ref(0); + private Ref.View count = Stm.newRef(0); public UntypedCounter(String name) { this.name = name; } - private void increment(InTxn txn) { - Integer newValue = count.get(txn) + 1; - count.set(newValue, txn); - } - @Override public Set coordinate(Object message) { if (message instanceof Increment) { Increment increment = (Increment) message; @@ -41,12 +37,12 @@ public class UntypedCounter extends UntypedTransactor { } } - public void atomically(InTxn txn, Object message) { + public void atomically(Object message) { if (message instanceof Increment) { - increment(txn); + Stm.increment(count, 1); final Increment increment = (Increment) message; - CompletionHandler countDown = new CompletionHandler() { - public void handle(Txn.Status status) { + Runnable countDown = new Runnable() { + public void run() { increment.getLatch().countDown(); } }; @@ -56,7 +52,7 @@ public class UntypedCounter extends UntypedTransactor { @Override public boolean normally(Object message) { if ("GetCount".equals(message)) { - getSender().tell(count.single().get()); + getSender().tell(count.get()); return true; } else return false; } diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedFailer.java b/akka-transactor/src/test/java/akka/transactor/UntypedFailer.java index 1e9308b2c1..2bc1c556d8 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedFailer.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedFailer.java @@ -7,7 +7,7 @@ package akka.transactor; import scala.concurrent.stm.InTxn; public class UntypedFailer extends UntypedTransactor { - public void atomically(InTxn txn, Object message) throws Exception { + public void atomically(Object message) throws Exception { throw new ExpectedFailureException(); } } diff --git a/akka-transactor/src/test/java/scala/concurrent/stm/JavaAPITests.java b/akka-transactor/src/test/java/scala/concurrent/stm/JavaAPITests.java new file mode 100644 index 0000000000..63fb6abb74 --- /dev/null +++ b/akka-transactor/src/test/java/scala/concurrent/stm/JavaAPITests.java @@ -0,0 +1,156 @@ +/* scala-stm - (c) 2009-2011, Stanford University, PPL */ + +package scala.concurrent.stm; + +import static org.junit.Assert.*; +import org.junit.Test; + +import scala.concurrent.stm.japi.Stm; +import static scala.concurrent.stm.japi.Stm.*; + +import scala.runtime.AbstractFunction1; +import java.util.concurrent.Callable; + +import java.util.Map; +import java.util.Set; +import java.util.List; + +public class JavaAPITests { + @Test + public void createIntegerRef() { + Ref.View ref = newRef(0); + int unboxed = ref.get(); + assertEquals(0, unboxed); + } + + @Test + public void atomicWithRunnable() { + final Ref.View ref = newRef(0); + atomic(new Runnable() { + public void run() { + ref.set(10); + } + }); + int value = ref.get(); + assertEquals(10, value); + } + + @Test + public void atomicWithCallable() { + final Ref.View ref = newRef(0); + int oldValue = atomic(new Callable() { + public Integer call() { + return ref.swap(10); + } + }); + assertEquals(0, oldValue); + int newValue = ref.get(); + assertEquals(10, newValue); + } + + @Test(expected = TestException.class) + public void failingTransaction() { + final Ref.View ref = newRef(0); + try { + atomic(new Runnable() { + public void run() { + ref.set(10); + throw new TestException(); + } + }); + } catch (TestException e) { + int value = ref.get(); + assertEquals(0, value); + throw e; + } + } + + @Test + public void transformInteger() { + Ref.View ref = newRef(0); + transform(ref, new AbstractFunction1() { + public Integer apply(Integer i) { + return i + 10; + } + }); + int value = ref.get(); + assertEquals(10, value); + } + + @Test + public void incrementInteger() { + Ref.View ref = newRef(0); + increment(ref, 10); + int value = ref.get(); + assertEquals(10, value); + } + + @Test + public void incrementLong() { + Ref.View ref = newRef(0L); + increment(ref, 10L); + long value = ref.get(); + assertEquals(10L, value); + } + + @Test + public void createAndUseTMap() { + Map map = newMap(); + map.put(1, "one"); + map.put(2, "two"); + assertEquals("one", map.get(1)); + assertEquals("two", map.get(2)); + assertTrue(map.containsKey(2)); + map.remove(2); + assertFalse(map.containsKey(2)); + } + + @Test(expected = TestException.class) + public void failingTMapTransaction() { + final Map map = newMap(); + try { + atomic(new Runnable() { + public void run() { + map.put(1, "one"); + map.put(2, "two"); + assertTrue(map.containsKey(1)); + assertTrue(map.containsKey(2)); + throw new TestException(); + } + }); + } catch (TestException e) { + assertFalse(map.containsKey(1)); + assertFalse(map.containsKey(2)); + throw e; + } + } + + @Test + public void createAndUseTSet() { + Set set = newSet(); + set.add("one"); + set.add("two"); + assertTrue(set.contains("one")); + assertTrue(set.contains("two")); + assertEquals(2, set.size()); + set.add("one"); + assertEquals(2, set.size()); + set.remove("two"); + assertFalse(set.contains("two")); + assertEquals(1, set.size()); + } + + @Test + public void createAndUseTArray() { + List list = newList(3); + assertEquals(null, list.get(0)); + assertEquals(null, list.get(1)); + assertEquals(null, list.get(2)); + list.set(0, "zero"); + list.set(1, "one"); + list.set(2, "two"); + assertEquals("zero", list.get(0)); + assertEquals("one", list.get(1)); + assertEquals("two", list.get(2)); + } +} diff --git a/akka-transactor/src/test/java/scala/concurrent/stm/TestException.java b/akka-transactor/src/test/java/scala/concurrent/stm/TestException.java new file mode 100644 index 0000000000..cc810761d4 --- /dev/null +++ b/akka-transactor/src/test/java/scala/concurrent/stm/TestException.java @@ -0,0 +1,9 @@ +/* scala-stm - (c) 2009-2011, Stanford University, PPL */ + +package scala.concurrent.stm; + +public class TestException extends RuntimeException { + public TestException() { + super("Expected failure"); + } +} diff --git a/akka-transactor/src/test/scala/scala/concurrent/stm/JavaAPISuite.scala b/akka-transactor/src/test/scala/scala/concurrent/stm/JavaAPISuite.scala new file mode 100644 index 0000000000..3d0c48e90f --- /dev/null +++ b/akka-transactor/src/test/scala/scala/concurrent/stm/JavaAPISuite.scala @@ -0,0 +1,7 @@ +/* scala-stm - (c) 2009-2011, Stanford University, PPL */ + +package scala.concurrent.stm + +import org.scalatest.junit.JUnitWrapperSuite + +class JavaAPISuite extends JUnitWrapperSuite("scala.concurrent.stm.JavaAPITests", Thread.currentThread.getContextClassLoader)