Merge branch 'master' into wip-1406-more-migration-docs-patriknw

Conflicts:
	akka-docs/project/migration-guide-1.3.x-2.0.x.rst
This commit is contained in:
Patrik Nordwall 2012-01-20 08:33:21 +01:00
commit 2ec28c8a4b
17 changed files with 705 additions and 181 deletions

View file

@ -7,15 +7,11 @@ package akka.docs.transactor;
//#class
import akka.actor.*;
import akka.transactor.*;
import scala.concurrent.stm.*;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.Stm;
public class CoordinatedCounter extends UntypedActor {
private Ref<Integer> count = Stm.ref(0);
private void increment(InTxn txn) {
Integer newValue = count.get(txn) + 1;
count.set(newValue, txn);
}
private Ref.View<Integer> count = Stm.newRef(0);
public void onReceive(Object incoming) throws Exception {
if (incoming instanceof Coordinated) {
@ -26,14 +22,14 @@ public class CoordinatedCounter extends UntypedActor {
if (increment.hasFriend()) {
increment.getFriend().tell(coordinated.coordinate(new Increment()));
}
coordinated.atomic(new Atomically() {
public void atomically(InTxn txn) {
increment(txn);
coordinated.atomic(new Runnable() {
public void run() {
Stm.increment(count, 1);
}
});
}
} else if ("GetCount".equals(incoming)) {
getSender().tell(count.single().get());
getSender().tell(count.get());
} else {
unhandled(incoming);
}

View file

@ -6,7 +6,6 @@ package akka.docs.transactor;
import akka.actor.*;
import akka.transactor.*;
import scala.concurrent.stm.*;
public class Coordinator extends UntypedActor {
public void onReceive(Object incoming) throws Exception {
@ -15,8 +14,8 @@ public class Coordinator extends UntypedActor {
Object message = coordinated.getMessage();
if (message instanceof Message) {
//#coordinated-atomic
coordinated.atomic(new Atomically() {
public void atomically(InTxn txn) {
coordinated.atomic(new Runnable() {
public void run() {
// do something in the coordinated transaction ...
}
});

View file

@ -6,21 +6,21 @@ package akka.docs.transactor;
//#class
import akka.transactor.*;
import scala.concurrent.stm.*;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.Stm;
public class Counter extends UntypedTransactor {
Ref<Integer> count = Stm.ref(0);
Ref.View<Integer> count = Stm.newRef(0);
public void atomically(InTxn txn, Object message) {
public void atomically(Object message) {
if (message instanceof Increment) {
Integer newValue = count.get(txn) + 1;
count.set(newValue, txn);
Stm.increment(count, 1);
}
}
@Override public boolean normally(Object message) {
if ("GetCount".equals(message)) {
getSender().tell(count.single().get());
getSender().tell(count.get());
return true;
} else return false;
}

View file

@ -8,10 +8,11 @@ package akka.docs.transactor;
import akka.actor.*;
import akka.transactor.*;
import java.util.Set;
import scala.concurrent.stm.*;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.Stm;
public class FriendlyCounter extends UntypedTransactor {
Ref<Integer> count = Stm.ref(0);
Ref.View<Integer> count = Stm.newRef(0);
@Override public Set<SendTo> coordinate(Object message) {
if (message instanceof Increment) {
@ -22,16 +23,15 @@ public class FriendlyCounter extends UntypedTransactor {
return nobody();
}
public void atomically(InTxn txn, Object message) {
public void atomically(Object message) {
if (message instanceof Increment) {
Integer newValue = count.get(txn) + 1;
count.set(newValue, txn);
Stm.increment(count, 1);
}
}
@Override public boolean normally(Object message) {
if ("GetCount".equals(message)) {
getSender().tell(count.single().get());
getSender().tell(count.get());
return true;
} else return false;
}

View file

@ -102,7 +102,7 @@ be sent.
:language: java
To enter the coordinated transaction use the atomic method of the coordinated
object, passing in an ``akka.transactor.Atomically`` object.
object, passing in a ``java.lang.Runnable``.
.. includecode:: code/akka/docs/transactor/Coordinator.java#coordinated-atomic
:language: java

View file

@ -6,7 +6,9 @@
.. sidebar:: Contents
.. contents:: :local:
.. contents::
:local:
:depth: 3
Actors
======
@ -77,8 +79,11 @@ Last task of the migration would be to create your own ``ActorSystem``.
Unordered Collection of Migration Items
=======================================
Actors
------
Creating and starting actors
----------------------------
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Actors are created by passing in a ``Props`` instance into the actorOf factory method in
a ``ActorRefProvider``, which is the ``ActorSystem`` or ``ActorContext``.
@ -111,7 +116,7 @@ Documentation:
* :ref:`untyped-actors-java`
Stopping actors
---------------
^^^^^^^^^^^^^^^
``ActorRef.stop()`` has been moved. Use ``ActorSystem`` or ``ActorContext`` to stop actors.
@ -144,7 +149,7 @@ Documentation:
* :ref:`untyped-actors-java`
Identifying Actors
------------------
^^^^^^^^^^^^^^^^^^
In v1.3 actors have ``uuid`` and ``id`` field. In v2.0 each actor has a unique logical ``path``.
@ -167,7 +172,7 @@ Documentation:
* :ref:`untyped-actors-java`
Reply to messages
-----------------
^^^^^^^^^^^^^^^^^
``self.channel`` has been replaced with unified reply mechanism using ``sender`` (Scala)
or ``getSender()`` (Java). This works for both tell (!) and ask (?).
@ -189,7 +194,7 @@ Documentation:
* :ref:`untyped-actors-java`
``ActorRef.ask()``
------------------
^^^^^^^^^^^^^^^^^^
The mechanism for collecting an actors reply in a :class:`Future` has been
reworked for better location transparency: it uses an actor under the hood.
@ -206,7 +211,7 @@ Documentation:
* :ref:`untyped-actors-java`
ActorPool
---------
^^^^^^^^^
The ActorPool has been replaced by dynamically resizable routers.
@ -216,7 +221,7 @@ Documentation:
* :ref:`routing-java`
``UntypedActor.getContext()`` (Java API only)
---------------------------------------------
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
``getContext()`` in the Java API for UntypedActor is renamed to
``getSelf()``.
@ -234,7 +239,7 @@ Documentation:
* :ref:`untyped-actors-java`
Configuration
-------------
^^^^^^^^^^^^^
A new, more powerful, configuration utility has been implemented. The format of the
configuration file is very similar to the format in v1.3. In addition it also supports
@ -287,7 +292,7 @@ Documentation:
* :ref:`configuration`
Logging
-------
^^^^^^^
EventHandler API has been replaced by LoggingAdapter, which publish log messages
to the event bus. You can still plugin your own actor as event listener with the
@ -321,7 +326,7 @@ Documentation:
Scheduler
---------
^^^^^^^^^
The functionality of the scheduler is identical, but the API is slightly adjusted.
@ -362,7 +367,7 @@ Documentation:
* :ref:`scheduler-java`
Supervision
-----------
^^^^^^^^^^^
Akka v2.0 implements parental supervision. Actors can only be created by other actors — where the top-level
actor is provided by the library — and each created actor is supervised by its parent.
@ -438,7 +443,7 @@ Documentation:
* :ref:`untyped-actors-java`
Dispatchers
-----------
^^^^^^^^^^^
Dispatchers are defined in configuration instead of in code.
@ -479,7 +484,7 @@ Documentation:
* :ref:`dispatchers-scala`
Spawn
-----
^^^^^
``spawn`` has been removed and can be implemented like this, if needed. Be careful to not
access any shared mutable state closed over by the body.
@ -495,7 +500,7 @@ Documentation:
* :ref:`jmm`
HotSwap
-------
^^^^^^^
In v2.0 ``become`` and ``unbecome`` metods are located in ``ActorContext``, i.e. ``context.become`` and ``context.unbecome``.
@ -506,12 +511,269 @@ in the actor receiving the message.
* :ref:`actors-scala`
* :ref:`untyped-actors-java`
STM
---
In Akka v2.0 `ScalaSTM`_ is used rather than Multiverse.
.. _ScalaSTM: http://nbronson.github.com/scala-stm/
Agent and Transactor have been ported to ScalaSTM. The API's for Agent and
Transactor are basically the same, other than integration with ScalaSTM. See:
* :ref:`agents-scala`
* :ref:`agents-java`
* :ref:`transactors-scala`
* :ref:`transactors-java`
Imports
^^^^^^^
Scala
~~~~~
To use ScalaSTM the import from Scala is::
import scala.concurrent.stm._
Java
~~~~
For Java there is a special helper object with Java-friendly methods::
import scala.concurrent.stm.japi.Stm;
These methods can also be statically imported::
import static scala.concurrent.stm.japi.Stm.*;
Other imports that are needed are in the stm package, particularly ``Ref``::
import scala.concurrent.stm.Ref;
Transactions
^^^^^^^^^^^^
Scala
~~~~~
Both v1.3 and v2.0 provide an ``atomic`` block, however, the ScalaSTM ``atomic``
is a function from ``InTxn`` to return type.
v1.3::
atomic {
// do something in transaction
}
v2.0::
atomic { implicit txn =>
// do something in transaction
}
Note that in ScalaSTM the ``InTxn`` in the atomic function is usually marked as
implicit as transactional references require an implicit ``InTxn`` on all
methods. That is, the transaction is statically required and it is a
compile-time warning to use a reference without a transaction. There is also a
``Ref.View`` for operations without requiring an ``InTxn`` statically. See below
for more information.
Java
~~~~
In the ScalaSTM Java API helpers there are atomic methods which accept
``java.lang.Runnable`` and ``java.util.concurrent.Callable``.
v1.3::
new Atomic() {
public Object atomically() {
// in transaction
return null;
}
}.execute();
SomeObject result = new Atomic<SomeObject>() {
public SomeObject atomically() {
// in transaction
return ...;
}
}.execute();
v2.0::
import static scala.concurrent.stm.japi.Stm.atomic;
import java.util.concurrent.Callable;
atomic(new Runnable() {
public void run() {
// in transaction
}
});
SomeObject result = atomic(new Callable<SomeObject>() {
public SomeObject call() {
// in transaction
return ...;
}
});
Ref
^^^
Scala
~~~~~
Other than the import, creating a Ref is basically identical between Akka STM in
v1.3 and ScalaSTM used in v2.0.
v1.3::
val ref = Ref(0)
v2.0::
val ref = Ref(0)
The API for Ref is similar. For example:
v1.3::
ref.get // get current value
ref() // same as get
ref.set(1) // set to new value, return old value
ref() = 1 // same as set
ref.swap(2) // same as set
ref alter { _ + 1 } // apply a function, return new value
v2.0::
ref.get // get current value
ref() // same as get
ref.set(1) // set to new value, return nothing
ref() = 1 // same as set
ref.swap(2) // set and return old value
ref transform { _ + 1 } // apply function, return nothing
ref transformIfDefined { case 1 => 2 } // apply partial function if defined
Ref.View
^^^^^^^^
In v1.3 using a ``Ref`` method outside of a transaction would automatically
create a single-operation transaction. In v2.0 (in ScalaSTM) there is a
``Ref.View`` which provides methods without requiring a current
transaction.
Scala
~~~~~
The ``Ref.View`` can be accessed with the ``single`` method::
ref.single() // returns current value
ref.single() = 1 // set new value
// with atomic this would be:
atomic { implicit t => ref() }
atomic { implicit t => ref() = 1 }
Java
~~~~
As ``Ref.View`` in ScalaSTM does not require implicit transactions, this is more
easily used from Java. ``Ref`` could be used, but requires explicit threading of
transactions. There are helper methods in ``japi.Stm`` for creating ``Ref.View``
references.
v1.3::
Ref<Integer> ref = new Ref<Integer>(0);
v2.0::
Ref.View<Integer> ref = Stm.newRef(0);
The ``set`` and ``get`` methods work the same way for both versions.
v1.3::
ref.get(); // get current value
ref.set(1); // set new value
v2.0::
ref.get(); // get current value
ref.set(1); // set new value
There are also ``transform``, ``getAndTransform``, and ``transformAndGet``
methods in ``japi.Stm`` which accept ``scala.runtime.AbstractFunction1``.
There are ``increment`` helper methods for ``Ref.View<Integer>`` and
``Ref.View<Long>`` references.
Transaction lifecycle callbacks
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Scala
~~~~~
It is also possible to hook into the transaction lifecycle in ScalaSTM. See the
ScalaSTM documentation for the full range of possibilities.
v1.3::
atomic {
deferred {
// executes when transaction commits
}
compensating {
// executes when transaction aborts
}
}
v2.0::
atomic { implicit txn =>
txn.afterCommit { txnStatus =>
// executes when transaction commits
}
txn.afterRollback { txnStatus =>
// executes when transaction rolls back
}
}
Java
~~~~
Rather than using the ``deferred`` and ``compensating`` methods in
``akka.stm.StmUtils``, use the ``afterCommit`` and ``afterRollback`` methods in
``scala.concurrent.stm.japi.Stm``, which behave in the same way and accept
``Runnable``.
Transactional Datastructures
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
In ScalaSTM see ``TMap``, ``TSet``, and ``TArray`` for transactional
datastructures.
There are helper methods for creating these from Java in ``japi.Stm``:
``newTMap``, ``newTSet``, and ``newTArray``. These datastructures implement the
``scala.collection`` interfaces and can also be used from Java with Scala's
``JavaConversions``. There are helper methods that apply the conversions,
returning ``java.util`` ``Map``, ``Set``, and ``List``: ``newMap``, ``newSet``,
and ``newList``.
More to be written
------------------
* Futures
* STM
* TypedActors
* Routing
* Remoting
* ...?

View file

@ -9,8 +9,7 @@ import akka.remote._
import RemoteProtocol._
import akka.util._
import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture }
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.channel.socket.nio.{ NioServerSocketChannelFactory, NioClientSocketChannelFactory }
import org.jboss.netty.bootstrap.{ ServerBootstrap, ClientBootstrap }
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
@ -121,6 +120,7 @@ class PassiveRemoteClient(val currentChannel: Channel,
class ActiveRemoteClient private[akka] (
remoteSupport: NettyRemoteSupport,
remoteAddress: RemoteNettyAddress,
localAddress: RemoteSystemAddress[ParsedTransportAddress],
val loader: Option[ClassLoader] = None)
extends RemoteClient(remoteSupport, remoteAddress) {
@ -132,7 +132,11 @@ class ActiveRemoteClient private[akka] (
@volatile
private var bootstrap: ClientBootstrap = _
@volatile
private[remote] var connection: ChannelFuture = _
private var connection: ChannelFuture = _
@volatile
private[remote] var openChannels: DefaultChannelGroup = _
@volatile
private var executionHandler: ExecutionHandler = _
@volatile
private var reconnectionTimeWindowStart = 0L
@ -141,10 +145,6 @@ class ActiveRemoteClient private[akka] (
def currentChannel = connection.getChannel
private val senderRemoteAddress = remoteSupport.remote.remoteAddress
@volatile
private var executionHandler: ExecutionHandler = _
/**
* Connect to remote server.
*/
@ -154,9 +154,9 @@ class ActiveRemoteClient private[akka] (
val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT)
if (SecureCookie.nonEmpty) handshake.setCookie(SecureCookie.get)
handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder
.setSystem(senderRemoteAddress.system)
.setHostname(senderRemoteAddress.transport.host)
.setPort(senderRemoteAddress.transport.port)
.setSystem(localAddress.system)
.setHostname(localAddress.transport.host)
.setPort(localAddress.transport.port)
.build)
connection.getChannel.write(remoteSupport.createControlEnvelope(handshake.build))
}
@ -164,7 +164,7 @@ class ActiveRemoteClient private[akka] (
def attemptReconnect(): Boolean = {
log.debug("Remote client reconnecting to [{}]", remoteAddress)
connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port))
connection.awaitUninterruptibly.getChannel // Wait until the connection attempt succeeds or fails.
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, remoteSupport, remoteAddress))
@ -176,11 +176,11 @@ class ActiveRemoteClient private[akka] (
}
runSwitch switchOn {
openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName)
executionHandler = new ExecutionHandler(remoteSupport.executor)
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(remoteSupport.threadFactory),
Executors.newCachedThreadPool(remoteSupport.threadFactory)))
bootstrap = new ClientBootstrap(remoteSupport.clientChannelFactory)
bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, executionHandler, remoteAddress, this))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
@ -188,7 +188,8 @@ class ActiveRemoteClient private[akka] (
log.debug("Starting remote client connection to [{}]", remoteAddress)
connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port))
connection.awaitUninterruptibly.getChannel // Wait until the connection attempt succeeds or fails.
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, remoteSupport, remoteAddress))
@ -202,6 +203,7 @@ class ActiveRemoteClient private[akka] (
case true true
case false if reconnectIfAlreadyConnected
connection.getChannel.close()
openChannels.remove(connection.getChannel)
log.debug("Remote client reconnecting to [{}]", remoteAddress)
attemptReconnect()
@ -219,13 +221,11 @@ class ActiveRemoteClient private[akka] (
if ((connection ne null) && (connection.getChannel ne null))
connection.getChannel.close()
} finally {
connection = null
executionHandler = null
//Do not do this: executionHandler.releaseExternalResources(), since it's shutting down the shared threadpool
try {
bootstrap.releaseExternalResources()
if (openChannels ne null) openChannels.close.awaitUninterruptibly()
} finally {
bootstrap = null
connection = null
executionHandler = null
}
}
@ -324,7 +324,10 @@ class ActiveRemoteClientHandler(
if (client.isWithinReconnectionTimeWindow) {
timer.newTimeout(new TimerTask() {
def run(timeout: Timeout) =
if (client.isRunning) client.connect(reconnectIfAlreadyConnected = true)
if (client.isRunning) {
client.openChannels.remove(event.getChannel)
client.connect(reconnectIfAlreadyConnected = true)
}
}, client.remoteSupport.clientSettings.ReconnectDelay.toMillis, TimeUnit.MILLISECONDS)
} else runOnceNow {
client.remoteSupport.shutdownClientConnection(remoteAddress) // spawn in another thread
@ -369,8 +372,10 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
val serverSettings = remote.remoteSettings.serverSettings
val clientSettings = remote.remoteSettings.clientSettings
val threadFactory = new MonitorableThreadFactory("NettyRemoteSupport", remote.remoteSettings.Daemonic)
val timer: HashedWheelTimer = new HashedWheelTimer
val timer: HashedWheelTimer = new HashedWheelTimer(threadFactory)
val executor = new OrderedMemoryAwareThreadPoolExecutor(
serverSettings.ExecutionPoolSize,
serverSettings.MaxChannelMemorySize,
@ -379,6 +384,10 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
serverSettings.ExecutionPoolKeepAlive.unit,
threadFactory)
val clientChannelFactory = new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(threadFactory),
Executors.newCachedThreadPool(threadFactory))
private val remoteClients = new HashMap[RemoteNettyAddress, RemoteClient]
private val clientsLock = new ReentrantReadWriteLock
@ -411,7 +420,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
//Recheck for addition, race between upgrades
case Some(client) client //If already populated by other writer
case None //Populate map
val client = new ActiveRemoteClient(this, recipientAddress, loader)
val client = new ActiveRemoteClient(this, recipientAddress, remote.remoteAddress, loader)
client.connect()
remoteClients += recipientAddress -> client
client
@ -479,26 +488,20 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
/**
* Server section
*/
private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None)
@volatile
private var currentServer: NettyRemoteServer = _
def name = currentServer.get match {
case Some(server) server.name
case None remote.remoteAddress.toString
def name = currentServer match {
case null remote.remoteAddress.toString
case server server.name
}
private val _isRunning = new Switch(false)
def isRunning = _isRunning.isOn
def start(loader: Option[ClassLoader] = None): Unit = {
_isRunning switchOn {
try {
currentServer.set(Some(new NettyRemoteServer(this, loader, address)))
} catch {
case e: Exception notifyListeners(RemoteServerError(e, this))
}
}
}
def start(loader: Option[ClassLoader] = None): Unit =
_isRunning switchOn { currentServer = new NettyRemoteServer(this, loader, address) }
/**
* Common section
@ -512,9 +515,19 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
} finally {
clientsLock.writeLock().unlock()
try {
currentServer.getAndSet(None) foreach { _.shutdown() }
val s = currentServer
currentServer = null
s.shutdown()
} finally {
try { timer.stop() } finally { executor.shutdown() }
try {
timer.stop()
} finally {
try {
clientChannelFactory.releaseExternalResources()
} finally {
executor.shutdown()
}
}
}
}
}

View file

@ -1,67 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import scala.concurrent.stm._
/**
* Java API.
*
* For creating Java-friendly coordinated atomic blocks.
*
* @see [[akka.transactor.Coordinated]]
*/
trait Atomically {
def atomically(txn: InTxn): Unit
}
/**
* Java API.
*
* For creating completion handlers.
*/
trait CompletionHandler {
def handle(status: Txn.Status): Unit
}
/**
* Java API.
*
* To ease some of the pain of using Scala STM from Java until
* the proper Java API is created.
*/
object Stm {
/**
* Create an STM Ref with an initial value.
*/
def ref[A](initialValue: A): Ref[A] = Ref(initialValue)
/**
* Add a CompletionHandler to run after the current transaction
* has committed.
*/
def afterCommit(handler: CompletionHandler): Unit = {
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterCommit(status handler.handle(status))(txn.get)
}
/**
* Add a CompletionHandler to run after the current transaction
* has rolled back.
*/
def afterRollback(handler: CompletionHandler): Unit = {
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterRollback(status handler.handle(status))(txn.get)
}
/**
* Add a CompletionHandler to run after the current transaction
* has committed or rolled back.
*/
def afterCompletion(handler: CompletionHandler): Unit = {
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterCompletion(status handler.handle(status))(txn.get)
}
}

View file

@ -6,7 +6,8 @@ package akka.transactor
import akka.AkkaException
import akka.util.Timeout
import scala.concurrent.stm._
import scala.concurrent.stm.{ CommitBarrier, InTxn }
import java.util.concurrent.Callable
/**
* Akka-specific exception for coordinated transactions.
@ -125,7 +126,7 @@ class Coordinated(val message: Any, member: CommitBarrier.Member) {
*
* @throws CoordinatedTransactionException if the coordinated transaction fails.
*/
def atomic[T](body: InTxn T): T = {
def atomic[A](body: InTxn A): A = {
member.atomic(body) match {
case Right(result) result
case Left(CommitBarrier.MemberUncaughtExceptionCause(x))
@ -136,13 +137,22 @@ class Coordinated(val message: Any, member: CommitBarrier.Member) {
}
/**
* Java API: coordinated atomic method that accepts an [[akka.transactor.Atomically]].
* Java API: coordinated atomic method that accepts a `java.lang.Runnable`.
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified when creating the Coordinated.
*
* @throws CoordinatedTransactionException if the coordinated transaction fails.
*/
def atomic(atomically: Atomically): Unit = atomic(txn atomically.atomically(txn))
def atomic(runnable: Runnable): Unit = atomic { _ runnable.run }
/**
* Java API: coordinated atomic method that accepts a `java.util.concurrent.Callable`.
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified when creating the Coordinated.
*
* @throws CoordinatedTransactionException if the coordinated transaction fails.
*/
def atomic[A](callable: Callable[A]): A = atomic { _ callable.call }
/**
* An empty coordinated atomic block. Can be used to complete the number of members involved

View file

@ -25,7 +25,7 @@ abstract class UntypedTransactor extends UntypedActor {
sendTo.actor.tell(coordinated(sendTo.message.getOrElse(message)))
}
before(message)
coordinated.atomic { txn atomically(txn, message) }
coordinated.atomic { txn atomically(message) }
after(message)
}
case message {
@ -84,7 +84,7 @@ abstract class UntypedTransactor extends UntypedActor {
* The Receive block to run inside the coordinated transaction.
*/
@throws(classOf[Exception])
def atomically(txn: InTxn, message: Any) {}
def atomically(message: Any)
/**
* A Receive block that runs after the coordinated transaction.

View file

@ -0,0 +1,147 @@
/* scala-stm - (c) 2009-2011, Stanford University, PPL */
package scala.concurrent.stm.japi
import java.util.concurrent.Callable
import java.util.{ List JList, Map JMap, Set JSet }
import scala.collection.JavaConversions
import scala.concurrent.stm
import scala.concurrent.stm._
import scala.runtime.AbstractFunction1
/**
* Java-friendly API for ScalaSTM.
* These methods can also be statically imported.
*/
object Stm {
/**
* Create a Ref with an initial value. Return a `Ref.View`, which does not
* require implicit transactions.
* @param initialValue the initial value for the newly created `Ref.View`
* @return a new `Ref.View`
*/
def newRef[A](initialValue: A): Ref.View[A] = Ref(initialValue).single
/**
* Create an empty TMap. Return a `TMap.View`, which does not require
* implicit transactions. See newMap for included java conversion.
* @return a new, empty `TMap.View`
*/
def newTMap[A, B](): TMap.View[A, B] = TMap.empty[A, B].single
/**
* Create an empty TMap. Return a `java.util.Map` view of this TMap.
* @return a new, empty `TMap.View` wrapped as a `java.util.Map`.
*/
def newMap[A, B](): JMap[A, B] = JavaConversions.mutableMapAsJavaMap(newTMap[A, B])
/**
* Create an empty TSet. Return a `TSet.View`, which does not require
* implicit transactions. See newSet for included java conversion.
* @return a new, empty `TSet.View`
*/
def newTSet[A](): TSet.View[A] = TSet.empty[A].single
/**
* Create an empty TSet. Return a `java.util.Set` view of this TSet.
* @return a new, empty `TSet.View` wrapped as a `java.util.Set`.
*/
def newSet[A](): JSet[A] = JavaConversions.mutableSetAsJavaSet(newTSet[A])
/**
* Create a TArray containing `length` elements. Return a `TArray.View`,
* which does not require implicit transactions. See newList for included
* java conversion.
* @param length the length of the `TArray.View` to be created
* @return a new `TArray.View` containing `length` elements (initially null)
*/
def newTArray[A <: AnyRef](length: Int): TArray.View[A] = TArray.ofDim[A](length)(ClassManifest.classType(AnyRef.getClass)).single
/**
* Create an empty TArray. Return a `java.util.List` view of this Array.
* @param length the length of the `TArray.View` to be created
* @return a new, empty `TArray.View` wrapped as a `java.util.List`.
*/
def newList[A <: AnyRef](length: Int): JList[A] = JavaConversions.mutableSeqAsJavaList(newTArray[A](length))
/**
* Atomic block that takes a `Runnable`.
* @param runnable the `Runnable` to run within a transaction
*/
def atomic(runnable: Runnable): Unit = stm.atomic { txn runnable.run }
/**
* Atomic block that takes a `Callable`.
* @param callable the `Callable` to run within a transaction
* @return the value returned by the `Callable`
*/
def atomic[A](callable: Callable[A]): A = stm.atomic { txn callable.call }
/**
* Transform the value stored by `ref` by applying the function `f`.
* @param ref the `Ref.View` to be transformed
* @param f the function to be applied
*/
def transform[A](ref: Ref.View[A], f: AbstractFunction1[A, A]): Unit = ref.transform(f)
/**
* Transform the value stored by `ref` by applying the function `f` and
* return the old value.
* @param ref the `Ref.View` to be transformed
* @param f the function to be applied
* @return the old value of `ref`
*/
def getAndTransform[A](ref: Ref.View[A], f: AbstractFunction1[A, A]): A = ref.getAndTransform(f)
/**
* Transform the value stored by `ref` by applying the function `f` and
* return the new value.
* @param ref the `Ref.View` to be transformed
* @param f the function to be applied
* @return the new value of `ref`
*/
def transformAndGet[A](ref: Ref.View[A], f: AbstractFunction1[A, A]): A = ref.transformAndGet(f)
/**
* Increment the `java.lang.Integer` value of a `Ref.View`.
* @param ref the `Ref.View<Integer>` to be incremented
* @param delta the amount to increment
*/
def increment(ref: Ref.View[java.lang.Integer], delta: Int): Unit = ref.transform { v v.intValue + delta }
/**
* Increment the `java.lang.Long` value of a `Ref.View`.
* @param ref the `Ref.View<Long>` to be incremented
* @param delta the amount to increment
*/
def increment(ref: Ref.View[java.lang.Long], delta: Long): Unit = ref.transform { v v.longValue + delta }
/**
* Add a task to run after the current transaction has committed.
* @param task the `Runnable` task to run after transaction commit
*/
def afterCommit(task: Runnable): Unit = {
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterCommit(status task.run)(txn.get)
}
/**
* Add a task to run after the current transaction has rolled back.
* @param task the `Runnable` task to run after transaction rollback
*/
def afterRollback(task: Runnable): Unit = {
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterRollback(status task.run)(txn.get)
}
/**
* Add a task to run after the current transaction has either rolled back
* or committed.
* @param task the `Runnable` task to run after transaction completion
*/
def afterCompletion(task: Runnable): Unit = {
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterCompletion(status task.run)(txn.get)
}
}

View file

@ -7,24 +7,20 @@ package akka.transactor;
import akka.actor.ActorRef;
import akka.actor.Actors;
import akka.actor.UntypedActor;
import scala.concurrent.stm.*;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.Stm;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class UntypedCoordinatedCounter extends UntypedActor {
private String name;
private Ref<Integer> count = Stm.ref(0);
private Ref.View<Integer> count = Stm.newRef(0);
public UntypedCoordinatedCounter(String name) {
this.name = name;
}
private void increment(InTxn txn) {
Integer newValue = count.get(txn) + 1;
count.set(newValue, txn);
}
public void onReceive(Object incoming) throws Exception {
if (incoming instanceof Coordinated) {
Coordinated coordinated = (Coordinated) incoming;
@ -33,8 +29,8 @@ public class UntypedCoordinatedCounter extends UntypedActor {
Increment increment = (Increment) message;
List<ActorRef> friends = increment.getFriends();
final CountDownLatch latch = increment.getLatch();
final CompletionHandler countDown = new CompletionHandler() {
public void handle(Txn.Status status) {
final Runnable countDown = new Runnable() {
public void run() {
latch.countDown();
}
};
@ -42,15 +38,15 @@ public class UntypedCoordinatedCounter extends UntypedActor {
Increment coordMessage = new Increment(friends.subList(1, friends.size()), latch);
friends.get(0).tell(coordinated.coordinate(coordMessage));
}
coordinated.atomic(new Atomically() {
public void atomically(InTxn txn) {
increment(txn);
coordinated.atomic(new Runnable() {
public void run() {
Stm.increment(count, 1);
Stm.afterCompletion(countDown);
}
});
}
} else if ("GetCount".equals(incoming)) {
getSender().tell(count.single().get());
getSender().tell(count.get());
}
}
}

View file

@ -7,7 +7,8 @@ package akka.transactor;
import akka.actor.ActorRef;
import akka.transactor.UntypedTransactor;
import akka.transactor.SendTo;
import scala.concurrent.stm.*;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.Stm;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@ -15,17 +16,12 @@ import java.util.concurrent.TimeUnit;
public class UntypedCounter extends UntypedTransactor {
private String name;
private Ref<Integer> count = Stm.ref(0);
private Ref.View<Integer> count = Stm.newRef(0);
public UntypedCounter(String name) {
this.name = name;
}
private void increment(InTxn txn) {
Integer newValue = count.get(txn) + 1;
count.set(newValue, txn);
}
@Override public Set<SendTo> coordinate(Object message) {
if (message instanceof Increment) {
Increment increment = (Increment) message;
@ -41,12 +37,12 @@ public class UntypedCounter extends UntypedTransactor {
}
}
public void atomically(InTxn txn, Object message) {
public void atomically(Object message) {
if (message instanceof Increment) {
increment(txn);
Stm.increment(count, 1);
final Increment increment = (Increment) message;
CompletionHandler countDown = new CompletionHandler() {
public void handle(Txn.Status status) {
Runnable countDown = new Runnable() {
public void run() {
increment.getLatch().countDown();
}
};
@ -56,7 +52,7 @@ public class UntypedCounter extends UntypedTransactor {
@Override public boolean normally(Object message) {
if ("GetCount".equals(message)) {
getSender().tell(count.single().get());
getSender().tell(count.get());
return true;
} else return false;
}

View file

@ -7,7 +7,7 @@ package akka.transactor;
import scala.concurrent.stm.InTxn;
public class UntypedFailer extends UntypedTransactor {
public void atomically(InTxn txn, Object message) throws Exception {
public void atomically(Object message) throws Exception {
throw new ExpectedFailureException();
}
}

View file

@ -0,0 +1,156 @@
/* scala-stm - (c) 2009-2011, Stanford University, PPL */
package scala.concurrent.stm;
import static org.junit.Assert.*;
import org.junit.Test;
import scala.concurrent.stm.japi.Stm;
import static scala.concurrent.stm.japi.Stm.*;
import scala.runtime.AbstractFunction1;
import java.util.concurrent.Callable;
import java.util.Map;
import java.util.Set;
import java.util.List;
public class JavaAPITests {
@Test
public void createIntegerRef() {
Ref.View<Integer> ref = newRef(0);
int unboxed = ref.get();
assertEquals(0, unboxed);
}
@Test
public void atomicWithRunnable() {
final Ref.View<Integer> ref = newRef(0);
atomic(new Runnable() {
public void run() {
ref.set(10);
}
});
int value = ref.get();
assertEquals(10, value);
}
@Test
public void atomicWithCallable() {
final Ref.View<Integer> ref = newRef(0);
int oldValue = atomic(new Callable<Integer>() {
public Integer call() {
return ref.swap(10);
}
});
assertEquals(0, oldValue);
int newValue = ref.get();
assertEquals(10, newValue);
}
@Test(expected = TestException.class)
public void failingTransaction() {
final Ref.View<Integer> ref = newRef(0);
try {
atomic(new Runnable() {
public void run() {
ref.set(10);
throw new TestException();
}
});
} catch (TestException e) {
int value = ref.get();
assertEquals(0, value);
throw e;
}
}
@Test
public void transformInteger() {
Ref.View<Integer> ref = newRef(0);
transform(ref, new AbstractFunction1<Integer, Integer>() {
public Integer apply(Integer i) {
return i + 10;
}
});
int value = ref.get();
assertEquals(10, value);
}
@Test
public void incrementInteger() {
Ref.View<Integer> ref = newRef(0);
increment(ref, 10);
int value = ref.get();
assertEquals(10, value);
}
@Test
public void incrementLong() {
Ref.View<Long> ref = newRef(0L);
increment(ref, 10L);
long value = ref.get();
assertEquals(10L, value);
}
@Test
public void createAndUseTMap() {
Map<Integer, String> map = newMap();
map.put(1, "one");
map.put(2, "two");
assertEquals("one", map.get(1));
assertEquals("two", map.get(2));
assertTrue(map.containsKey(2));
map.remove(2);
assertFalse(map.containsKey(2));
}
@Test(expected = TestException.class)
public void failingTMapTransaction() {
final Map<Integer, String> map = newMap();
try {
atomic(new Runnable() {
public void run() {
map.put(1, "one");
map.put(2, "two");
assertTrue(map.containsKey(1));
assertTrue(map.containsKey(2));
throw new TestException();
}
});
} catch (TestException e) {
assertFalse(map.containsKey(1));
assertFalse(map.containsKey(2));
throw e;
}
}
@Test
public void createAndUseTSet() {
Set<String> set = newSet();
set.add("one");
set.add("two");
assertTrue(set.contains("one"));
assertTrue(set.contains("two"));
assertEquals(2, set.size());
set.add("one");
assertEquals(2, set.size());
set.remove("two");
assertFalse(set.contains("two"));
assertEquals(1, set.size());
}
@Test
public void createAndUseTArray() {
List<String> list = newList(3);
assertEquals(null, list.get(0));
assertEquals(null, list.get(1));
assertEquals(null, list.get(2));
list.set(0, "zero");
list.set(1, "one");
list.set(2, "two");
assertEquals("zero", list.get(0));
assertEquals("one", list.get(1));
assertEquals("two", list.get(2));
}
}

View file

@ -0,0 +1,9 @@
/* scala-stm - (c) 2009-2011, Stanford University, PPL */
package scala.concurrent.stm;
public class TestException extends RuntimeException {
public TestException() {
super("Expected failure");
}
}

View file

@ -0,0 +1,7 @@
/* scala-stm - (c) 2009-2011, Stanford University, PPL */
package scala.concurrent.stm
import org.scalatest.junit.JUnitWrapperSuite
class JavaAPISuite extends JUnitWrapperSuite("scala.concurrent.stm.JavaAPITests", Thread.currentThread.getContextClassLoader)