Merge commit 'remotes/origin/master' into 320-krasserm, resolve conflicts and compile errors.

This commit is contained in:
Martin Krasser 2010-07-17 07:04:10 +02:00
commit 3056c8b7e1
49 changed files with 1806 additions and 555 deletions

1
.gitignore vendored
View file

@ -32,6 +32,7 @@ tm.out
*.iws
*.ipr
*.iml
run-codefellow
.project
.settings
.classpath

View file

@ -10,8 +10,6 @@ import se.scalablesolutions.akka.config.OneForOneStrategy
import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory}
import java.lang.IllegalArgumentException
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.serialization.Serializer
/**
* AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors.
*
@ -20,7 +18,6 @@ import se.scalablesolutions.akka.serialization.Serializer
* @author Irmo Manie
*/
object AMQP {
case class ConnectionParameters(
host: String = ConnectionFactory.DEFAULT_HOST,
port: Int = ConnectionFactory.DEFAULT_AMQP_PORT,
@ -57,7 +54,6 @@ object AMQP {
queueExclusive: Boolean = false,
selfAcknowledging: Boolean = true,
channelParameters: Option[ChannelParameters] = None) {
if (queueDurable && queueName.isEmpty) {
throw new IllegalArgumentException("A queue name is required when requesting a durable queue.")
}
@ -84,27 +80,25 @@ object AMQP {
consumer
}
def newRpcClient(connection: ActorRef,
def newRpcClient[O,I](connection: ActorRef,
exchangeParameters: ExchangeParameters,
routingKey: String,
inSerializer: Serializer,
outSerializer: Serializer,
serializer: RpcClientSerializer[O,I],
channelParameters: Option[ChannelParameters] = None): ActorRef = {
val rpcActor: ActorRef = actorOf(new RpcClientActor(exchangeParameters, routingKey, inSerializer, outSerializer, channelParameters))
val rpcActor: ActorRef = actorOf(new RpcClientActor[O,I](exchangeParameters, routingKey, serializer, channelParameters))
connection.startLink(rpcActor)
rpcActor ! Start
rpcActor
}
def newRpcServer(connection: ActorRef,
exchangeParameters: ExchangeParameters,
routingKey: String,
inSerializer: Serializer,
outSerializer: Serializer,
requestHandler: PartialFunction[AnyRef, AnyRef],
channelParameters: Option[ChannelParameters] = None) = {
def newRpcServer[I,O](connection: ActorRef,
exchangeParameters: ExchangeParameters,
routingKey: String,
serializer: RpcServerSerializer[I,O],
requestHandler: PartialFunction[I, O],
channelParameters: Option[ChannelParameters] = None) = {
val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters))
val rpcServer = actorOf(new RpcServerActor(producer, inSerializer, outSerializer, requestHandler))
val rpcServer = actorOf(new RpcServerActor[I,O](producer, serializer, requestHandler))
val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer
, channelParameters = channelParameters
, selfAcknowledging = false))
@ -133,4 +127,17 @@ object AMQP {
connectionActor
}
}
trait FromBinary[T] {
def fromBinary(bytes: Array[Byte]): T
}
trait ToBinary[T] {
def toBinary(t: T): Array[Byte]
}
case class RpcClientSerializer[O,I](toBinary: ToBinary[O], fromBinary: FromBinary[I])
case class RpcServerSerializer[I,O](fromBinary: FromBinary[I], toBinary: ToBinary[O])
}

View file

@ -8,10 +8,10 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
import Actor._
import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.amqp.AMQP._
import se.scalablesolutions.akka.serialization.Serializer
import java.lang.Class
import java.lang.String
object ExampleSession {
def main(args: Array[String]) = {
println("==== DIRECT ===")
direct
@ -97,6 +97,7 @@ object ExampleSession {
}
def callback = {
val channelCountdown = new CountDownLatch(2)
val connectionCallback = actor {
@ -129,21 +130,36 @@ object ExampleSession {
}
def rpc = {
val connection = AMQP.newConnection()
val exchangeParameters = ExchangeParameters("my_rpc_exchange", ExchangeType.Topic)
val stringSerializer = new Serializer {
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]) = new String(bytes)
def toBinary(obj: AnyRef) = obj.asInstanceOf[String].getBytes
/** Server */
val serverFromBinary = new FromBinary[String] {
def fromBinary(bytes: Array[Byte]) = new String(bytes)
}
val serverToBinary = new ToBinary[Int] {
def toBinary(t: Int) = Array(t.toByte)
}
val rpcServerSerializer = new RpcServerSerializer[String, Int](serverFromBinary, serverToBinary)
val rpcServer = AMQP.newRpcServer(connection, exchangeParameters, "rpc.in.key", stringSerializer, stringSerializer, {
case "rpc_request" => "rpc_response"
val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, {
case "rpc_request" => 3
case _ => error("unknown request")
})
val rpcClient = AMQP.newRpcClient(connection, exchangeParameters, "rpc.in.key", stringSerializer, stringSerializer)
/** Client */
val clientToBinary = new ToBinary[String] {
def toBinary(t: String) = t.getBytes
}
val clientFromBinary = new FromBinary[Int] {
def fromBinary(bytes: Array[Byte]) = bytes.head.toInt
}
val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary)
val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.in.key", rpcClientSerializer)
val response = (rpcClient !! "rpc_request")
log.info("Response: " + response)

View file

@ -6,12 +6,13 @@ package se.scalablesolutions.akka.amqp
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameters}
import com.rabbitmq.client.{Channel, RpcClient}
class RpcClientActor(exchangeParameters: ExchangeParameters,
import com.rabbitmq.client.{Channel, RpcClient}
import se.scalablesolutions.akka.amqp.AMQP.{RpcClientSerializer, ChannelParameters, ExchangeParameters}
class RpcClientActor[I,O](exchangeParameters: ExchangeParameters,
routingKey: String,
inSerializer: Serializer,
outSerializer: Serializer,
serializer: RpcClientSerializer[I,O],
channelParameters: Option[ChannelParameters] = None) extends FaultTolerantChannelActor(exchangeParameters, channelParameters) {
import exchangeParameters._
@ -21,29 +22,22 @@ class RpcClientActor(exchangeParameters: ExchangeParameters,
log.info("%s started", this)
def specificMessageHandler = {
case payload: AnyRef => {
case payload: I => {
rpcClient match {
case Some(client) =>
val response: Array[Byte] = client.primitiveCall(inSerializer.toBinary(payload))
self.reply(outSerializer.fromBinary(response, None))
val response: Array[Byte] = client.primitiveCall(serializer.toBinary.toBinary(payload))
self.reply(serializer.fromBinary.fromBinary(response))
case None => error("%s has no client to send messages with".format(this))
}
}
}
protected def setupChannel(ch: Channel) = {
rpcClient = Some(new RpcClient(ch, exchangeName, routingKey))
}
protected def setupChannel(ch: Channel) = rpcClient = Some(new RpcClient(ch, exchangeName, routingKey))
override def preRestart(reason: Throwable) = {
rpcClient = None
super.preRestart(reason)
}
override def toString(): String =
"AMQP.RpcClient[exchange=" +exchangeName +
", routingKey=" + routingKey+ "]"
override def toString = "AMQP.RpcClient[exchange=" +exchangeName + ", routingKey=" + routingKey+ "]"
}

View file

@ -6,9 +6,9 @@ package se.scalablesolutions.akka.amqp
import se.scalablesolutions.akka.actor.{ActorRef, Actor}
import com.rabbitmq.client.AMQP.BasicProperties
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.amqp.AMQP.RpcServerSerializer
class RpcServerActor(producer: ActorRef, inSerializer: Serializer, outSerializer: Serializer, requestHandler: PartialFunction[AnyRef, AnyRef]) extends Actor {
class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: PartialFunction[I, O]) extends Actor {
log.info("%s started", this)
@ -16,8 +16,8 @@ class RpcServerActor(producer: ActorRef, inSerializer: Serializer, outSerializer
case Delivery(payload, _, tag, props, sender) => {
log.debug("%s handling delivery with tag %d", this, tag)
val request = inSerializer.fromBinary(payload, None)
val response: Array[Byte] = outSerializer.toBinary(requestHandler(request))
val request = serializer.fromBinary.fromBinary(payload)
val response: Array[Byte] = serializer.toBinary.toBinary(requestHandler(request))
log.debug("%s sending reply to %s", this, props.getReplyTo)
val replyProps = new BasicProperties

View file

@ -11,8 +11,8 @@ import se.scalablesolutions.akka.amqp._
import se.scalablesolutions.akka.actor.Actor._
import org.scalatest.matchers.MustMatchers
import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters}
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.amqp.AMQP._
class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging {
@ -29,23 +29,34 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging
}
val exchangeParameters = ExchangeParameters("text_topic_exchange", ExchangeType.Topic)
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
val stringSerializer = new Serializer {
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]) = new String(bytes)
def toBinary(obj: AnyRef) = obj.asInstanceOf[String].getBytes
}
val channelParameters = ChannelParameters(channelCallback
= Some(channelCallback))
val rpcServer = AMQP.newRpcServer(connection, exchangeParameters, "rpc.routing", stringSerializer, stringSerializer, {
case "some_payload" => "some_result"
case _ => error("Unhandled message")
val serverFromBinary = new FromBinary[String] {
def fromBinary(bytes: Array[Byte]) = new String(bytes)
}
val serverToBinary = new ToBinary[Int] {
def toBinary(t: Int) = Array(t.toByte)
}
val rpcServerSerializer = new RpcServerSerializer[String, Int](serverFromBinary, serverToBinary)
val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, {
case "some_payload" => 3
case _ => error("unknown request")
}, channelParameters = Some(channelParameters))
val rpcClient = AMQP.newRpcClient(connection, exchangeParameters, "rpc.routing", stringSerializer, stringSerializer
, channelParameters = Some(channelParameters))
val clientToBinary = new ToBinary[String] {
def toBinary(t: String) = t.getBytes
}
val clientFromBinary = new FromBinary[Int] {
def fromBinary(bytes: Array[Byte]) = bytes.head.toInt
}
val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary)
val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer,
channelParameters = Some(channelParameters))
countDown.await(2, TimeUnit.SECONDS) must be (true)
val response = rpcClient !! "some_payload"
response must be (Some("some_result"))
response must be (Some(3))
} finally {
connection.stop
}

View file

@ -22,6 +22,7 @@ import se.scalablesolutions.akka.stm.TransactionConfig
import scala.reflect.BeanProperty
import CamelMessageConversion.toExchangeAdapter
import java.lang.Throwable
/**
* Camel component for sending messages to and receiving replies from actors.
@ -250,8 +251,8 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
def supervisor: Option[ActorRef] = unsupported
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](message: Any, timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]) = unsupported
protected[akka] def mailbox: Deque[MessageInvocation] = unsupported
protected[akka] def restart(reason: Throwable): Unit = unsupported
protected[akka] def restartLinkedActors(reason: Throwable): Unit = unsupported
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported
protected[akka] def linkedActors: JavaMap[String, ActorRef] = unsupported
protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported

View file

@ -667,29 +667,36 @@ public final class RemoteProtocol {
public boolean hasTimeout() { return hasTimeout; }
public long getTimeout() { return timeout_; }
// optional .LifeCycleProtocol lifeCycle = 9;
public static final int LIFECYCLE_FIELD_NUMBER = 9;
// optional uint64 receiveTimeout = 9;
public static final int RECEIVETIMEOUT_FIELD_NUMBER = 9;
private boolean hasReceiveTimeout;
private long receiveTimeout_ = 0L;
public boolean hasReceiveTimeout() { return hasReceiveTimeout; }
public long getReceiveTimeout() { return receiveTimeout_; }
// optional .LifeCycleProtocol lifeCycle = 10;
public static final int LIFECYCLE_FIELD_NUMBER = 10;
private boolean hasLifeCycle;
private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol lifeCycle_;
public boolean hasLifeCycle() { return hasLifeCycle; }
public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol getLifeCycle() { return lifeCycle_; }
// optional .RemoteActorRefProtocol supervisor = 10;
public static final int SUPERVISOR_FIELD_NUMBER = 10;
// optional .RemoteActorRefProtocol supervisor = 11;
public static final int SUPERVISOR_FIELD_NUMBER = 11;
private boolean hasSupervisor;
private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol supervisor_;
public boolean hasSupervisor() { return hasSupervisor; }
public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol getSupervisor() { return supervisor_; }
// optional bytes hotswapStack = 11;
public static final int HOTSWAPSTACK_FIELD_NUMBER = 11;
// optional bytes hotswapStack = 12;
public static final int HOTSWAPSTACK_FIELD_NUMBER = 12;
private boolean hasHotswapStack;
private com.google.protobuf.ByteString hotswapStack_ = com.google.protobuf.ByteString.EMPTY;
public boolean hasHotswapStack() { return hasHotswapStack; }
public com.google.protobuf.ByteString getHotswapStack() { return hotswapStack_; }
// repeated .RemoteRequestProtocol messages = 12;
public static final int MESSAGES_FIELD_NUMBER = 12;
// repeated .RemoteRequestProtocol messages = 13;
public static final int MESSAGES_FIELD_NUMBER = 13;
private java.util.List<se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol> messages_ =
java.util.Collections.emptyList();
public java.util.List<se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol> getMessagesList() {
@ -750,17 +757,20 @@ public final class RemoteProtocol {
if (hasTimeout()) {
output.writeUInt64(8, getTimeout());
}
if (hasReceiveTimeout()) {
output.writeUInt64(9, getReceiveTimeout());
}
if (hasLifeCycle()) {
output.writeMessage(9, getLifeCycle());
output.writeMessage(10, getLifeCycle());
}
if (hasSupervisor()) {
output.writeMessage(10, getSupervisor());
output.writeMessage(11, getSupervisor());
}
if (hasHotswapStack()) {
output.writeBytes(11, getHotswapStack());
output.writeBytes(12, getHotswapStack());
}
for (se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol element : getMessagesList()) {
output.writeMessage(12, element);
output.writeMessage(13, element);
}
getUnknownFields().writeTo(output);
}
@ -803,21 +813,25 @@ public final class RemoteProtocol {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(8, getTimeout());
}
if (hasReceiveTimeout()) {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(9, getReceiveTimeout());
}
if (hasLifeCycle()) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(9, getLifeCycle());
.computeMessageSize(10, getLifeCycle());
}
if (hasSupervisor()) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(10, getSupervisor());
.computeMessageSize(11, getSupervisor());
}
if (hasHotswapStack()) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(11, getHotswapStack());
.computeBytesSize(12, getHotswapStack());
}
for (se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol element : getMessagesList()) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(12, element);
.computeMessageSize(13, element);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
@ -1005,6 +1019,9 @@ public final class RemoteProtocol {
if (other.hasTimeout()) {
setTimeout(other.getTimeout());
}
if (other.hasReceiveTimeout()) {
setReceiveTimeout(other.getReceiveTimeout());
}
if (other.hasLifeCycle()) {
mergeLifeCycle(other.getLifeCycle());
}
@ -1082,7 +1099,11 @@ public final class RemoteProtocol {
setTimeout(input.readUInt64());
break;
}
case 74: {
case 72: {
setReceiveTimeout(input.readUInt64());
break;
}
case 82: {
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol.newBuilder();
if (hasLifeCycle()) {
subBuilder.mergeFrom(getLifeCycle());
@ -1091,7 +1112,7 @@ public final class RemoteProtocol {
setLifeCycle(subBuilder.buildPartial());
break;
}
case 82: {
case 90: {
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.newBuilder();
if (hasSupervisor()) {
subBuilder.mergeFrom(getSupervisor());
@ -1100,11 +1121,11 @@ public final class RemoteProtocol {
setSupervisor(subBuilder.buildPartial());
break;
}
case 90: {
case 98: {
setHotswapStack(input.readBytes());
break;
}
case 98: {
case 106: {
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.newBuilder();
input.readMessage(subBuilder, extensionRegistry);
addMessages(subBuilder.buildPartial());
@ -1293,7 +1314,25 @@ public final class RemoteProtocol {
return this;
}
// optional .LifeCycleProtocol lifeCycle = 9;
// optional uint64 receiveTimeout = 9;
public boolean hasReceiveTimeout() {
return result.hasReceiveTimeout();
}
public long getReceiveTimeout() {
return result.getReceiveTimeout();
}
public Builder setReceiveTimeout(long value) {
result.hasReceiveTimeout = true;
result.receiveTimeout_ = value;
return this;
}
public Builder clearReceiveTimeout() {
result.hasReceiveTimeout = false;
result.receiveTimeout_ = 0L;
return this;
}
// optional .LifeCycleProtocol lifeCycle = 10;
public boolean hasLifeCycle() {
return result.hasLifeCycle();
}
@ -1330,7 +1369,7 @@ public final class RemoteProtocol {
return this;
}
// optional .RemoteActorRefProtocol supervisor = 10;
// optional .RemoteActorRefProtocol supervisor = 11;
public boolean hasSupervisor() {
return result.hasSupervisor();
}
@ -1367,7 +1406,7 @@ public final class RemoteProtocol {
return this;
}
// optional bytes hotswapStack = 11;
// optional bytes hotswapStack = 12;
public boolean hasHotswapStack() {
return result.hasHotswapStack();
}
@ -1388,7 +1427,7 @@ public final class RemoteProtocol {
return this;
}
// repeated .RemoteRequestProtocol messages = 12;
// repeated .RemoteRequestProtocol messages = 13;
public java.util.List<se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol> getMessagesList() {
return java.util.Collections.unmodifiableList(result.messages_);
}
@ -4210,40 +4249,40 @@ public final class RemoteProtocol {
"\n\024RemoteProtocol.proto\"v\n\026RemoteActorRef" +
"Protocol\022\014\n\004uuid\030\001 \002(\t\022\026\n\016actorClassname" +
"\030\002 \002(\t\022%\n\013homeAddress\030\003 \002(\0132\020.AddressPro" +
"tocol\022\017\n\007timeout\030\004 \001(\004\"\350\002\n\032SerializedAct" +
"tocol\022\017\n\007timeout\030\004 \001(\004\"\200\003\n\032SerializedAct" +
"orRefProtocol\022\014\n\004uuid\030\001 \002(\t\022\n\n\002id\030\002 \002(\t\022" +
"\026\n\016actorClassname\030\003 \002(\t\022)\n\017originalAddre" +
"ss\030\004 \002(\0132\020.AddressProtocol\022\025\n\ractorInsta" +
"nce\030\005 \001(\014\022\033\n\023serializerClassname\030\006 \001(\t\022\024" +
"\n\014isTransactor\030\007 \001(\010\022\017\n\007timeout\030\010 \001(\004\022%\n" +
"\tlifeCycle\030\t \001(\0132\022.LifeCycleProtocol\022+\n\n",
"supervisor\030\n \001(\0132\027.RemoteActorRefProtoco" +
"l\022\024\n\014hotswapStack\030\013 \001(\014\022(\n\010messages\030\014 \003(" +
"\0132\026.RemoteRequestProtocol\"r\n\017MessageProt" +
"ocol\0225\n\023serializationScheme\030\001 \002(\0162\030.Seri" +
"alizationSchemeType\022\017\n\007message\030\002 \002(\014\022\027\n\017" +
"messageManifest\030\003 \001(\014\"\374\001\n\025RemoteRequestP" +
"rotocol\022\n\n\002id\030\001 \002(\004\022!\n\007message\030\002 \002(\0132\020.M" +
"essageProtocol\022\016\n\006method\030\003 \001(\t\022\016\n\006target" +
"\030\004 \002(\t\022\014\n\004uuid\030\005 \002(\t\022\017\n\007timeout\030\006 \002(\004\022\026\n" +
"\016supervisorUuid\030\007 \001(\t\022\017\n\007isActor\030\010 \002(\010\022\020",
"\n\010isOneWay\030\t \002(\010\022\021\n\tisEscaped\030\n \002(\010\022\'\n\006s" +
"ender\030\013 \001(\0132\027.RemoteActorRefProtocol\"\252\001\n" +
"\023RemoteReplyProtocol\022\n\n\002id\030\001 \002(\004\022!\n\007mess" +
"age\030\002 \001(\0132\020.MessageProtocol\022%\n\texception" +
"\030\003 \001(\0132\022.ExceptionProtocol\022\026\n\016supervisor" +
"Uuid\030\004 \001(\t\022\017\n\007isActor\030\005 \002(\010\022\024\n\014isSuccess" +
"ful\030\006 \002(\010\"_\n\021LifeCycleProtocol\022!\n\tlifeCy" +
"cle\030\001 \002(\0162\016.LifeCycleType\022\022\n\npreRestart\030" +
"\002 \001(\t\022\023\n\013postRestart\030\003 \001(\t\"1\n\017AddressPro" +
"tocol\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n",
"\021ExceptionProtocol\022\021\n\tclassname\030\001 \002(\t\022\017\n" +
"\007message\030\002 \002(\t*]\n\027SerializationSchemeTyp" +
"e\022\010\n\004JAVA\020\001\022\013\n\007SBINARY\020\002\022\016\n\nSCALA_JSON\020\003" +
"\022\r\n\tJAVA_JSON\020\004\022\014\n\010PROTOBUF\020\005*-\n\rLifeCyc" +
"leType\022\r\n\tPERMANENT\020\001\022\r\n\tTEMPORARY\020\002B-\n)" +
"se.scalablesolutions.akka.remote.protoco" +
"lH\001"
"\n\014isTransactor\030\007 \001(\010\022\017\n\007timeout\030\010 \001(\004\022\026\n" +
"\016receiveTimeout\030\t \001(\004\022%\n\tlifeCycle\030\n \001(\013",
"2\022.LifeCycleProtocol\022+\n\nsupervisor\030\013 \001(\013" +
"2\027.RemoteActorRefProtocol\022\024\n\014hotswapStac" +
"k\030\014 \001(\014\022(\n\010messages\030\r \003(\0132\026.RemoteReques" +
"tProtocol\"r\n\017MessageProtocol\0225\n\023serializ" +
"ationScheme\030\001 \002(\0162\030.SerializationSchemeT" +
"ype\022\017\n\007message\030\002 \002(\014\022\027\n\017messageManifest\030" +
"\003 \001(\014\"\374\001\n\025RemoteRequestProtocol\022\n\n\002id\030\001 " +
"\002(\004\022!\n\007message\030\002 \002(\0132\020.MessageProtocol\022\016" +
"\n\006method\030\003 \001(\t\022\016\n\006target\030\004 \002(\t\022\014\n\004uuid\030\005" +
" \002(\t\022\017\n\007timeout\030\006 \002(\004\022\026\n\016supervisorUuid\030",
"\007 \001(\t\022\017\n\007isActor\030\010 \002(\010\022\020\n\010isOneWay\030\t \002(\010" +
"\022\021\n\tisEscaped\030\n \002(\010\022\'\n\006sender\030\013 \001(\0132\027.Re" +
"moteActorRefProtocol\"\252\001\n\023RemoteReplyProt" +
"ocol\022\n\n\002id\030\001 \002(\004\022!\n\007message\030\002 \001(\0132\020.Mess" +
"ageProtocol\022%\n\texception\030\003 \001(\0132\022.Excepti" +
"onProtocol\022\026\n\016supervisorUuid\030\004 \001(\t\022\017\n\007is" +
"Actor\030\005 \002(\010\022\024\n\014isSuccessful\030\006 \002(\010\"_\n\021Lif" +
"eCycleProtocol\022!\n\tlifeCycle\030\001 \002(\0162\016.Life" +
"CycleType\022\022\n\npreRestart\030\002 \001(\t\022\023\n\013postRes" +
"tart\030\003 \001(\t\"1\n\017AddressProtocol\022\020\n\010hostnam",
"e\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021ExceptionProtoc" +
"ol\022\021\n\tclassname\030\001 \002(\t\022\017\n\007message\030\002 \002(\t*]" +
"\n\027SerializationSchemeType\022\010\n\004JAVA\020\001\022\013\n\007S" +
"BINARY\020\002\022\016\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022" +
"\014\n\010PROTOBUF\020\005*-\n\rLifeCycleType\022\r\n\tPERMAN" +
"ENT\020\001\022\r\n\tTEMPORARY\020\002B-\n)se.scalablesolut" +
"ions.akka.remote.protocolH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -4263,7 +4302,7 @@ public final class RemoteProtocol {
internal_static_SerializedActorRefProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_SerializedActorRefProtocol_descriptor,
new java.lang.String[] { "Uuid", "Id", "ActorClassname", "OriginalAddress", "ActorInstance", "SerializerClassname", "IsTransactor", "Timeout", "LifeCycle", "Supervisor", "HotswapStack", "Messages", },
new java.lang.String[] { "Uuid", "Id", "ActorClassname", "OriginalAddress", "ActorInstance", "SerializerClassname", "IsTransactor", "Timeout", "ReceiveTimeout", "LifeCycle", "Supervisor", "HotswapStack", "Messages", },
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.class,
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.Builder.class);
internal_static_MessageProtocol_descriptor =

View file

@ -36,10 +36,11 @@ message SerializedActorRefProtocol {
optional string serializerClassname = 6;
optional bool isTransactor = 7;
optional uint64 timeout = 8;
optional LifeCycleProtocol lifeCycle = 9;
optional RemoteActorRefProtocol supervisor = 10;
optional bytes hotswapStack = 11;
repeated RemoteRequestProtocol messages = 12;
optional uint64 receiveTimeout = 9;
optional LifeCycleProtocol lifeCycle = 10;
optional RemoteActorRefProtocol supervisor = 11;
optional bytes hotswapStack = 12;
repeated RemoteRequestProtocol messages = 13;
}
/**

View file

@ -474,9 +474,9 @@ object ActiveObject extends Logging {
val parent = clazz.getSuperclass
if (parent != null) injectActiveObjectContext0(activeObject, parent)
else {
log.trace(
"Can't set 'ActiveObjectContext' for ActiveObject [%s] since no field of this type could be found.",
activeObject.getClass.getName)
log.ifTrace("Can't set 'ActiveObjectContext' for ActiveObject [" +
activeObject.getClass.getName +
"] since no field of this type could be found.")
None
}
}
@ -486,7 +486,6 @@ object ActiveObject extends Logging {
private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor =
Supervisor(SupervisorConfig(restartStrategy, components))
}
private[akka] object AspectInitRegistry extends ListenerManagement {
@ -634,11 +633,12 @@ private[akka] sealed class ActiveObjectAspect {
joinPoint: JoinPoint, isOneWay: Boolean, isVoid: Boolean, sender: AnyRef, senderFuture: CompletableFuture[Any]) {
override def toString: String = synchronized {
"Invocation [joinPoint: " + joinPoint.toString +
", isOneWay: " + isOneWay +
", isVoid: " + isVoid +
", sender: " + sender +
", senderFuture: " + senderFuture +
"Invocation [" +
"\n\t\tmethod = " + joinPoint.getRtti.asInstanceOf[MethodRtti].getMethod.getName + " @ " + joinPoint.getTarget.getClass.getName +
"\n\t\tisOneWay = " + isOneWay +
"\n\t\tisVoid = " + isVoid +
"\n\t\tsender = " + sender +
"\n\t\tsenderFuture = " + senderFuture +
"]"
}
@ -687,8 +687,6 @@ private[akka] class Dispatcher(transactionalRequired: Boolean,
private var context: Option[ActiveObjectContext] = None
private var targetClass:Class[_] = _
def this(transactionalRequired: Boolean) = this(transactionalRequired,None)
private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef, ctx: Option[ActiveObjectContext]) = {
@ -701,6 +699,8 @@ private[akka] class Dispatcher(transactionalRequired: Boolean,
context = ctx
val methods = targetInstance.getClass.getDeclaredMethods.toList
if (self.lifeCycle.isEmpty) self.lifeCycle = Some(LifeCycle(Permanent))
// See if we have any config define restart callbacks
restartCallbacks match {
case None => {}
@ -758,14 +758,14 @@ private[akka] class Dispatcher(transactionalRequired: Boolean,
}
def receive = {
case Invocation(joinPoint, isOneWay, _, sender, senderFuture) =>
case invocation @ Invocation(joinPoint, isOneWay, _, sender, senderFuture) =>
ActiveObject.log.ifTrace("Invoking active object with message:\n" + invocation)
context.foreach { ctx =>
if (sender ne null) ctx._sender = sender
if (senderFuture ne null) ctx._senderFuture = senderFuture
}
ActiveObjectContext.sender.value = joinPoint.getThis // set next sender
self.senderFuture.foreach(ActiveObjectContext.senderFuture.value = _)
if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint)
if (isOneWay) joinPoint.proceed
else self.reply(joinPoint.proceed)
@ -773,61 +773,53 @@ private[akka] class Dispatcher(transactionalRequired: Boolean,
// Jan Kronquist: started work on issue 121
case Link(target) => self.link(target)
case Unlink(target) => self.unlink(target)
case unexpected =>
throw new IllegalActorStateException("Unexpected message [" + unexpected + "] sent to [" + this + "]")
case unexpected => throw new IllegalActorStateException(
"Unexpected message [" + unexpected + "] sent to [" + this + "]")
}
override def preRestart(reason: Throwable) {
try {
// Since preRestart is called we know that this dispatcher
// is about to be restarted. Put the instance in a thread
// local so the new dispatcher can be initialized with the contents of the
// old.
//FIXME - This should be considered as a workaround.
crashedActorTl.set(this)
if (preRestart.isDefined) preRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
// Since preRestart is called we know that this dispatcher
// is about to be restarted. Put the instance in a thread
// local so the new dispatcher can be initialized with the
// contents of the old.
//FIXME - This should be considered as a workaround.
crashedActorTl.set(this)
preRestart.foreach(_.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*))
} catch { case e: InvocationTargetException => throw e.getCause }
}
override def postRestart(reason: Throwable) {
try {
if (postRestart.isDefined) {
postRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
}
postRestart.foreach(_.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*))
} catch { case e: InvocationTargetException => throw e.getCause }
}
override def init = {
// Get the crashed dispatcher from thread local and intitialize this actor with the
// contents of the old dispatcher
val oldActor = crashedActorTl.get();
if(oldActor != null) {
initialize(oldActor.targetClass,oldActor.target.get,oldActor.context)
crashedActorTl.set(null)
}
// Get the crashed dispatcher from thread local and intitialize this actor with the
// contents of the old dispatcher
val oldActor = crashedActorTl.get();
if (oldActor != null) {
initialize(oldActor.targetClass, oldActor.target.get, oldActor.context)
crashedActorTl.set(null)
}
}
override def shutdown = {
try {
if (zhutdown.isDefined) {
zhutdown.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
}
} catch {
case e: InvocationTargetException => throw e.getCause
} finally {
zhutdown.foreach(_.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*))
} catch { case e: InvocationTargetException => throw e.getCause
} finally {
AspectInitRegistry.unregister(target.get);
}
}
override def initTransactionalState = {
try {
try {
if (initTxState.isDefined && target.isDefined) initTxState.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
} catch { case e: InvocationTargetException => throw e.getCause }
}
private def serializeArguments(joinPoint: JoinPoint) = {
val args = joinPoint.getRtti.asInstanceOf[MethodRtti].getParameterValues
var unserializable = false

View file

@ -46,8 +46,9 @@ case class Exit(dead: ActorRef, killer: Throwable) extends LifeCycleMessage
case class Link(child: ActorRef) extends LifeCycleMessage
case class Unlink(child: ActorRef) extends LifeCycleMessage
case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage
case object Kill extends LifeCycleMessage
case object ReceiveTimeout extends LifeCycleMessage
case class MaximumNumberOfRestartsWithinTimeRangeReached(
victim: ActorRef, maxNrOfRetries: Int, withinTimeRange: Int, lastExceptionCausingRestart: Throwable) extends LifeCycleMessage
// Exceptions for Actors
class ActorStartException private[akka](message: String) extends RuntimeException(message)
@ -284,6 +285,7 @@ object Actor extends Logging {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Actor extends Logging {
/**
* Type alias because traits cannot have companion objects.
*/
@ -300,12 +302,12 @@ trait Actor extends Logging {
Actor.actorRefInCreation.value = None
if (ref.isEmpty) throw new ActorInitializationException(
"ActorRef for instance of actor [" + getClass.getName + "] is not in scope." +
"\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." +
"\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." +
"\n\tEither use:" +
"\n\t\t'val actor = Actor.actorOf[MyActor]', or" +
"\n\t\t'val actor = Actor.actorOf(new MyActor(..))', or" +
"\n\t\t'val actor = Actor.actor { case msg => .. } }'")
"\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." +
"\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." +
"\n\tEither use:" +
"\n\t\t'val actor = Actor.actorOf[MyActor]', or" +
"\n\t\t'val actor = Actor.actorOf(new MyActor(..))', or" +
"\n\t\t'val actor = Actor.actor { case msg => .. } }'")
else ref
}
@ -426,12 +428,11 @@ trait Actor extends Logging {
private val lifeCycles: Receive = {
case HotSwap(code) => self.hotswap = code; self.checkReceiveTimeout // FIXME : how to reschedule receivetimeout on hotswap?
case Restart(reason) => self.restart(reason)
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Link(child) => self.link(child)
case Unlink(child) => self.unlink(child)
case UnlinkAndStop(child) => self.unlink(child); child.stop
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
case Restart(reason) => throw reason
}
}

View file

@ -10,26 +10,27 @@ import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, F
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.stm.TransactionManagement._
import se.scalablesolutions.akka.stm.TransactionManagement
import se.scalablesolutions.akka.stm.{TransactionManagement, TransactionSetAbortedException}
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, MessageSerializer, RemoteRequestProtocolIdFactory}
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util.{HashCode, Logging, UUID, ReentrantGuard}
import RemoteActorSerialization._
import org.multiverse.api.ThreadLocalTransaction._
import org.multiverse.commitbarriers.CountDownCommitBarrier
import jsr166x.{Deque, ConcurrentLinkedDeque}
import org.multiverse.api.exceptions.DeadTransactionException
import java.net.InetSocketAddress
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.{Map => JMap}
import java.lang.reflect.Field
import RemoteActorSerialization._
import jsr166x.{Deque, ConcurrentLinkedDeque}
import com.google.protobuf.ByteString
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
/**
* ActorRef is an immutable and serializable handle to an Actor.
@ -71,9 +72,7 @@ trait ActorRef extends TransactionManagement {
@volatile protected[this] var _isShutDown = false
@volatile protected[akka] var _isBeingRestarted = false
@volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServer.HOSTNAME, RemoteServer.PORT)
@volatile protected[akka] var _timeoutActor: Option[ActorRef] = None
@volatile protected[akka] var startOnCreation = false
@volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false
protected[this] val guard = new ReentrantGuard
@ -99,12 +98,12 @@ trait ActorRef extends TransactionManagement {
@volatile var timeout: Long = Actor.TIMEOUT
/**
* User overridable callback/setting.
* <p/>
* Defines the default timeout for an initial receive invocation.
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
*/
@volatile var receiveTimeout: Option[Long] = None
* User overridable callback/setting.
* <p/>
* Defines the default timeout for an initial receive invocation.
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
*/
@volatile var receiveTimeout: Option[Long] = None
/**
* User overridable callback/setting.
@ -166,12 +165,12 @@ trait ActorRef extends TransactionManagement {
* The default is also that all actors that are created and spawned from within this actor
* is sharing the same dispatcher as its creator.
*/
private[akka] var _dispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
@volatile private[akka] var _dispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
/**
* Holds the hot swapped partial function.
*/
protected[akka] var hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack
@volatile protected[akka] var hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack
/**
* User overridable callback/setting.
@ -184,12 +183,12 @@ trait ActorRef extends TransactionManagement {
/**
* Configuration for TransactionFactory. User overridable.
*/
protected[akka] var _transactionConfig: TransactionConfig = DefaultGlobalTransactionConfig
@volatile protected[akka] var _transactionConfig: TransactionConfig = DefaultGlobalTransactionConfig
/**
* TransactionFactory to be used for atomic when isTransactor. Configuration is overridable.
*/
private[akka] var _transactionFactory: Option[TransactionFactory] = None
@volatile private[akka] var _transactionFactory: Option[TransactionFactory] = None
/**
* This lock ensures thread safety in the dispatching: only one message can
@ -215,12 +214,10 @@ trait ActorRef extends TransactionManagement {
* Is defined if the message was sent from another Actor, else None.
*/
def sender: Option[ActorRef] = {
//Five lines of map-performance-avoidance, could be just: currentMessage map { _.sender }
// Five lines of map-performance-avoidance, could be just: currentMessage map { _.sender }
val msg = currentMessage
if(msg.isEmpty)
None
else
msg.get.sender
if(msg.isEmpty) None
else msg.get.sender
}
/**
@ -228,12 +225,10 @@ trait ActorRef extends TransactionManagement {
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
*/
def senderFuture: Option[CompletableFuture[Any]] = {
//Five lines of map-performance-avoidance, could be just: currentMessage map { _.senderFuture }
// Five lines of map-performance-avoidance, could be just: currentMessage map { _.senderFuture }
val msg = currentMessage
if(msg.isEmpty)
None
else
msg.get.senderFuture
if(msg.isEmpty) None
else msg.get.senderFuture
}
/**
@ -442,7 +437,7 @@ trait ActorRef extends TransactionManagement {
/**
* Starts up the actor and its message queue.
*/
def start: ActorRef
def start(): ActorRef
/**
* Shuts down the actor its dispatcher and message queue.
@ -549,11 +544,11 @@ trait ActorRef extends TransactionManagement {
protected[akka] def mailbox: Deque[MessageInvocation]
protected[akka] def restart(reason: Throwable): Unit
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit
protected[akka] def restartLinkedActors(reason: Throwable): Unit
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit
protected[akka] def registerSupervisorAsRemoteActor: Option[String]
@ -571,23 +566,19 @@ trait ActorRef extends TransactionManagement {
override def toString = "Actor[" + id + ":" + uuid + "]"
protected[akka] def cancelReceiveTimeout = {
_timeoutActor.foreach {
x =>
if (x.isRunning) Scheduler.unschedule(x)
_timeoutActor = None
log.debug("Timeout canceled for %s", this)
}
}
protected [akka] def checkReceiveTimeout = {
protected[akka] def checkReceiveTimeout = {
cancelReceiveTimeout
receiveTimeout.foreach { timeout =>
receiveTimeout.foreach { time =>
log.debug("Scheduling timeout for %s", this)
_timeoutActor = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, timeout, TimeUnit.MILLISECONDS))
_timeoutActor = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, time, TimeUnit.MILLISECONDS))
}
}
protected[akka] def cancelReceiveTimeout = _timeoutActor.foreach { timeoutActor =>
if (timeoutActor.isRunning) Scheduler.unschedule(timeoutActor)
_timeoutActor = None
log.debug("Timeout canceled for %s", this)
}
}
/**
@ -599,8 +590,24 @@ sealed class LocalActorRef private[akka](
private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None))
extends ActorRef {
private var isDeserialized = false
private var loader: Option[ClassLoader] = None
@volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None // only mutable to maintain identity across nodes
@volatile private[akka] var _linkedActors: Option[ConcurrentHashMap[String, ActorRef]] = None
@volatile private[akka] var _supervisor: Option[ActorRef] = None
@volatile private var isInInitialization = false
@volatile private var runActorInitialization = false
@volatile private var isDeserialized = false
@volatile private var loader: Option[ClassLoader] = None
@volatile private var maxNrOfRetriesCount: Int = 0
@volatile private var restartsWithinTimeRangeTimestamp: Long = 0L
protected[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation]
protected[this] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) }
// Needed to be able to null out the 'val self: ActorRef' member variables to make the Actor
// instance elegible for garbage collection
private val actorSelfFields = findActorSelfField(actor.getClass)
if (runActorInitialization && !isDeserialized) initializeActorInstance
private[akka] def this(clazz: Class[_ <: Actor]) = this(Left(Some(clazz)))
private[akka] def this(factory: () => Actor) = this(Right(Some(factory)))
@ -614,6 +621,7 @@ sealed class LocalActorRef private[akka](
__port: Int,
__isTransactor: Boolean,
__timeout: Long,
__receiveTimeout: Option[Long],
__lifeCycle: Option[LifeCycle],
__supervisor: Option[ActorRef],
__hotswap: Option[PartialFunction[Any, Unit]],
@ -635,6 +643,7 @@ sealed class LocalActorRef private[akka](
homeAddress = (__hostname, __port)
isTransactor = __isTransactor
timeout = __timeout
receiveTimeout = __receiveTimeout
lifeCycle = __lifeCycle
_supervisor = __supervisor
hotswap = __hotswap
@ -643,30 +652,11 @@ sealed class LocalActorRef private[akka](
actorSelfFields._3.set(actor, Some(this))
start
__messages.foreach(message => this ! MessageSerializer.deserialize(message.getMessage))
checkReceiveTimeout
ActorRegistry.register(this)
}
// Only mutable for RemoteServer in order to maintain identity across nodes
@volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None
@volatile private[akka] var _linkedActors: Option[ConcurrentHashMap[String, ActorRef]] = None
@volatile private[akka] var _supervisor: Option[ActorRef] = None
protected[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation]
protected[this] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) }
@volatile private var isInInitialization = false
@volatile private var runActorInitialization = false
// Needed to be able to null out the 'val self: ActorRef' member variables to make the Actor
// instance elegible for garbage collection
private val actorSelfFields = findActorSelfField(actor.getClass)
if (runActorInitialization && !isDeserialized) initializeActorInstance
/**
* Returns the mailbox.
*/
def mailbox: Deque[MessageInvocation] = _mailbox
// ========= PUBLIC FUNCTIONS =========
/**
* Returns the class for the Actor instance that is managed by the ActorRef.
@ -681,7 +671,7 @@ sealed class LocalActorRef private[akka](
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/
def dispatcher_=(md: MessageDispatcher): Unit = guard.withGuard {
def dispatcher_=(md: MessageDispatcher): Unit = {
if (!isRunning || isBeingRestarted) _dispatcher = md
else throw new ActorInitializationException(
"Can not swap dispatcher for " + toString + " after it has been started")
@ -690,7 +680,7 @@ sealed class LocalActorRef private[akka](
/**
* Get the dispatcher for this actor.
*/
def dispatcher: MessageDispatcher = guard.withGuard { _dispatcher }
def dispatcher: MessageDispatcher = _dispatcher
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
@ -734,19 +724,19 @@ sealed class LocalActorRef private[akka](
/**
* Get the transaction configuration for this actor.
*/
def transactionConfig: TransactionConfig = guard.withGuard { _transactionConfig }
def transactionConfig: TransactionConfig = _transactionConfig
/**
* Set the contact address for this actor. This is used for replying to messages
* sent asynchronously when no reply channel exists.
*/
def homeAddress_=(address: InetSocketAddress): Unit = guard.withGuard { _homeAddress = address }
def homeAddress_=(address: InetSocketAddress): Unit = _homeAddress = address
/**
* Returns the remote address for the actor, if any, else None.
*/
def remoteAddress: Option[InetSocketAddress] = guard.withGuard { _remoteAddress }
protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit = guard.withGuard { _remoteAddress = addr }
def remoteAddress: Option[InetSocketAddress] = _remoteAddress
protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit = _remoteAddress = addr
/**
* Starts up the actor and its message queue.
@ -897,6 +887,11 @@ sealed class LocalActorRef private[akka](
}
}
/**
* Returns the mailbox.
*/
def mailbox: Deque[MessageInvocation] = _mailbox
/**
* Returns the mailbox size.
*/
@ -910,7 +905,7 @@ sealed class LocalActorRef private[akka](
/**
* Shuts down and removes all linked actors.
*/
def shutdownLinkedActors(): Unit = guard.withGuard {
def shutdownLinkedActors(): Unit = {
linkedActorsAsList.foreach(_.stop)
linkedActors.clear
}
@ -918,41 +913,11 @@ sealed class LocalActorRef private[akka](
/**
* Returns the supervisor, if there is one.
*/
def supervisor: Option[ActorRef] = guard.withGuard { _supervisor }
def supervisor: Option[ActorRef] = _supervisor
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = guard.withGuard { _supervisor = sup }
// ========= AKKA PROTECTED FUNCTIONS =========
private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = guard.withGuard {
val actorRef = Actor.actorOf(manifest[T].erasure.asInstanceOf[Class[T]].newInstance)
if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) actorRef.dispatcher = dispatcher
actorRef
}
private[this] def newActor: Actor = {
isInInitialization = true
Actor.actorRefInCreation.value = Some(this)
val actor = actorFactory match {
case Left(Some(clazz)) =>
try {
clazz.newInstance
} catch {
case e: InstantiationException => throw new ActorInitializationException(
"Could not instantiate Actor due to:\n" + e +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")
}
case Right(Some(factory)) =>
factory()
case _ =>
throw new ActorInitializationException(
"Can't create Actor, no Actor class or factory function in scope")
}
if (actor eq null) throw new ActorInitializationException(
"Actor instance passed to ActorRef can not be 'null'")
isInInitialization = false
actor
}
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
joinTransaction(message)
@ -992,84 +957,22 @@ sealed class LocalActorRef private[akka](
}
}
private def joinTransaction(message: Any) = if (isTransactionSetInScope) {
import org.multiverse.api.ThreadLocalTransaction
val txSet = getTransactionSetInScope
Actor.log.trace("Joining transaction set [%s];\n\tactor %s\n\twith message [%s]", txSet, toString, message) // FIXME test to run bench without this trace call
val mtx = ThreadLocalTransaction.getThreadLocalTransaction
if ((mtx eq null) || mtx.getStatus.isDead) txSet.incParties
else txSet.incParties(mtx, 1)
}
/**
* Callback for the dispatcher. This is the ingle entry point to the user Actor implementation.
*/
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = actor.synchronized {
if (isShutdown) {
Actor.log.warning("Actor [%s] is shut down, ignoring message [%s]", toString, messageHandle)
return
}
currentMessage = Option(messageHandle)
try {
dispatch(messageHandle)
} catch {
case e =>
Actor.log.error(e, "Could not invoke actor [%s]", this)
throw e
} finally {
currentMessage = None //TODO: Don't reset this, we might want to resend the message
}
}
private def dispatch[T](messageHandle: MessageInvocation) = {
val message = messageHandle.message //serializeMessage(messageHandle.message)
var topLevelTransaction = false
val txSet: Option[CountDownCommitBarrier] =
if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet
else {
topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx
if (isTransactor) {
Actor.log.trace(
"Creating a new transaction set (top-level transaction)\n\tfor actor %s\n\twith message %s",
toString, messageHandle)
Some(createNewTransactionSet)
} else None
}
setTransactionSet(txSet)
try {
cancelReceiveTimeout // FIXME: leave this here?
if (isTransactor) {
val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory)
atomic(txFactory) {
actor.base(message)
setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
}
} else {
actor.base(message)
setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
}
} catch {
case e =>
_isBeingRestarted = true
// abort transaction set
if (isTransactionSetInScope) {
val txSet = getTransactionSetInScope
Actor.log.debug("Aborting transaction set [%s]", txSet)
txSet.abort
}
Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
senderFuture.foreach(_.completeWithException(this, e))
clearTransaction
if (topLevelTransaction) clearTransactionSet
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
} finally {
clearTransaction
if (topLevelTransaction) clearTransactionSet
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard {
if (isShutdown) Actor.log.warning("Actor [%s] is shut down, ignoring message [%s]", toString, messageHandle)
else {
currentMessage = Option(messageHandle)
try {
dispatch(messageHandle)
} catch {
case e =>
Actor.log.error(e, "Could not invoke actor [%s]", this)
throw e
} finally {
currentMessage = None //TODO: Don't reset this, we might want to resend the message
}
}
}
@ -1078,10 +981,10 @@ sealed class LocalActorRef private[akka](
faultHandler match {
// FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy
case Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange)) =>
restartLinkedActors(reason)
restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
case Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange)) =>
dead.restart(reason)
dead.restart(reason, maxNrOfRetries, withinTimeRange)
case None =>
throw new IllegalActorStateException(
@ -1094,42 +997,59 @@ sealed class LocalActorRef private[akka](
}
}
protected[akka] def restart(reason: Throwable): Unit = {
val failedActor = actorInstance.get
failedActor.synchronized {
lifeCycle.get match {
case LifeCycle(scope, _, _) => {
scope match {
case Permanent =>
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
restartLinkedActors(reason)
Actor.log.debug("Restarting linked actors for actor [%s].", id)
Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
failedActor.preRestart(reason)
nullOutActorRefReferencesFor(failedActor)
val freshActor = newActor
freshActor.synchronized {
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = {
if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis
maxNrOfRetriesCount += 1
if (maxNrOfRetriesCount > maxNrOfRetries || (System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange) {
val message = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)
Actor.log.warning(
"Maximum number of restarts [%s] within time range [%s] reached." +
"\n\tWill *not* restart actor [%s] anymore." +
"\n\tLast exception causing restart was [%s].",
maxNrOfRetries, withinTimeRange, this, reason)
_supervisor.foreach { sup =>
if (sup.isDefinedAt(message)) sup ! message
else Actor.log.warning(
"No message handler defined for system message [MaximumNumberOfRestartsWithinTimeRangeReached]" +
"\n\tCan't send the message to the supervisor [%s].", sup)
}
} else {
_isBeingRestarted = true
val failedActor = actorInstance.get
val lock = guard.lock
guard.withGuard {
lifeCycle.get match {
case LifeCycle(scope, _, _) => {
scope match {
case Permanent =>
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
Actor.log.debug("Restarting linked actors for actor [%s].", id)
Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
failedActor.preRestart(reason)
nullOutActorRefReferencesFor(failedActor)
val freshActor = newActor
freshActor.init
freshActor.initTransactionalState
actorInstance.set(freshActor)
Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id)
freshActor.postRestart(reason)
}
_isBeingRestarted = false
case Temporary => shutDownTemporaryActor(this)
_isBeingRestarted = false
case Temporary => shutDownTemporaryActor(this)
}
}
}
}
}
}
protected[akka] def restartLinkedActors(reason: Throwable) = guard.withGuard {
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int) = {
linkedActorsAsList.foreach { actorRef =>
if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent))
actorRef.lifeCycle.get match {
case LifeCycle(scope, _, _) => {
scope match {
case Permanent => actorRef.restart(reason)
case Permanent => actorRef.restart(reason, maxNrOfRetries, withinTimeRange)
case Temporary => shutDownTemporaryActor(actorRef)
}
}
@ -1137,20 +1057,6 @@ sealed class LocalActorRef private[akka](
}
}
private def shutDownTemporaryActor(temporaryActor: ActorRef) = {
Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", temporaryActor.id)
temporaryActor.stop
linkedActors.remove(temporaryActor.uuid) // remove the temporary actor
// if last temporary actor is gone, then unlink me from supervisor
if (linkedActors.isEmpty) {
Actor.log.info(
"All linked actors have died permanently (they were all configured as TEMPORARY)" +
"\n\tshutting down and unlinking supervisor actor as well [%s].",
temporaryActor.id)
_supervisor.foreach(_ ! UnlinkAndStop(this))
}
}
protected[akka] def registerSupervisorAsRemoteActor: Option[String] = guard.withGuard {
if (_supervisor.isDefined) {
RemoteClient.clientFor(remoteAddress.get).registerSupervisorForActor(this)
@ -1169,6 +1075,127 @@ sealed class LocalActorRef private[akka](
protected[akka] def linkedActorsAsList: List[ActorRef] =
linkedActors.values.toArray.toList.asInstanceOf[List[ActorRef]]
// ========= PRIVATE FUNCTIONS =========
private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = guard.withGuard {
val actorRef = Actor.actorOf(manifest[T].erasure.asInstanceOf[Class[T]].newInstance)
if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) actorRef.dispatcher = dispatcher
actorRef
}
private[this] def newActor: Actor = {
isInInitialization = true
Actor.actorRefInCreation.value = Some(this)
val actor = actorFactory match {
case Left(Some(clazz)) =>
try {
clazz.newInstance
} catch {
case e: InstantiationException => throw new ActorInitializationException(
"Could not instantiate Actor due to:\n" + e +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")
}
case Right(Some(factory)) =>
factory()
case _ =>
throw new ActorInitializationException(
"Can't create Actor, no Actor class or factory function in scope")
}
if (actor eq null) throw new ActorInitializationException(
"Actor instance passed to ActorRef can not be 'null'")
isInInitialization = false
actor
}
private def joinTransaction(message: Any) = if (isTransactionSetInScope) {
import org.multiverse.api.ThreadLocalTransaction
val oldTxSet = getTransactionSetInScope
val currentTxSet = if (oldTxSet.isAborted || oldTxSet.isCommitted) {
clearTransactionSet
createNewTransactionSet
} else oldTxSet
Actor.log.ifTrace("Joining transaction set [" + currentTxSet + "];\n\tactor " + toString + "\n\twith message [" + message + "]")
val mtx = ThreadLocalTransaction.getThreadLocalTransaction
if ((mtx eq null) || mtx.getStatus.isDead) currentTxSet.incParties
else currentTxSet.incParties(mtx, 1)
}
private def dispatch[T](messageHandle: MessageInvocation) = {
Actor.log.ifTrace("Invoking actor with message:\n" + messageHandle)
val message = messageHandle.message //serializeMessage(messageHandle.message)
var topLevelTransaction = false
val txSet: Option[CountDownCommitBarrier] =
if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet
else {
topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx
if (isTransactor) {
Actor.log.ifTrace("Creating a new transaction set (top-level transaction)\n\tfor actor " + toString + "\n\twith message " + messageHandle)
Some(createNewTransactionSet)
} else None
}
setTransactionSet(txSet)
try {
cancelReceiveTimeout // FIXME: leave this here?
if (isTransactor) {
val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory)
atomic(txFactory) {
actor.base(message)
setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
}
} else {
actor.base(message)
setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
}
} catch {
case e: DeadTransactionException =>
handleExceptionInDispatch(
new TransactionSetAbortedException("Transaction set has been aborted by another participant"),
message, topLevelTransaction)
case e =>
handleExceptionInDispatch(e, message, topLevelTransaction)
} finally {
clearTransaction
if (topLevelTransaction) clearTransactionSet
}
}
private def shutDownTemporaryActor(temporaryActor: ActorRef) = {
Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", temporaryActor.id)
temporaryActor.stop
linkedActors.remove(temporaryActor.uuid) // remove the temporary actor
// if last temporary actor is gone, then unlink me from supervisor
if (linkedActors.isEmpty) {
Actor.log.info(
"All linked actors have died permanently (they were all configured as TEMPORARY)" +
"\n\tshutting down and unlinking supervisor actor as well [%s].",
temporaryActor.id)
_supervisor.foreach(_ ! UnlinkAndStop(this))
}
}
private def handleExceptionInDispatch(e: Throwable, message: Any, topLevelTransaction: Boolean) = {
Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
_isBeingRestarted = true
// abort transaction set
if (isTransactionSetInScope) {
val txSet = getTransactionSetInScope
Actor.log.debug("Aborting transaction set [%s]", txSet)
txSet.abort
}
senderFuture.foreach(_.completeWithException(this, e))
clearTransaction
if (topLevelTransaction) clearTransactionSet
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
}
private def nullOutActorRefReferencesFor(actor: Actor) = {
actorSelfFields._1.set(actor, null)
actorSelfFields._2.set(actor, null)
@ -1297,9 +1324,9 @@ private[akka] case class RemoteActorRef private[akka] (
def supervisor: Option[ActorRef] = unsupported
def shutdownLinkedActors: Unit = unsupported
protected[akka] def mailbox: Deque[MessageInvocation] = unsupported
protected[akka] def restart(reason: Throwable): Unit = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported
protected[akka] def restartLinkedActors(reason: Throwable): Unit = unsupported
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported
protected[akka] def linkedActors: JMap[String, ActorRef] = unsupported
protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported

View file

@ -77,14 +77,14 @@ object ActorSerialization {
toSerializedActorRefProtocol(a, format).toByteArray
}
private def toSerializedActorRefProtocol[T <: Actor](a: ActorRef, format: Format[T]): SerializedActorRefProtocol = {
private def toSerializedActorRefProtocol[T <: Actor](actorRef: ActorRef, format: Format[T]): SerializedActorRefProtocol = {
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT)
case Temporary => builder.setLifeCycle(LifeCycleType.TEMPORARY)
}
val builder = LifeCycleProtocol.newBuilder
a.lifeCycle match {
actorRef.lifeCycle match {
case Some(LifeCycle(scope, None, _)) =>
setScope(builder, scope)
Some(builder.build)
@ -98,21 +98,22 @@ object ActorSerialization {
}
val originalAddress = AddressProtocol.newBuilder
.setHostname(a.homeAddress.getHostName)
.setPort(a.homeAddress.getPort)
.setHostname(actorRef.homeAddress.getHostName)
.setPort(actorRef.homeAddress.getPort)
.build
val builder = SerializedActorRefProtocol.newBuilder
.setUuid(a.uuid)
.setId(a.id)
.setActorClassname(a.actorClass.getName)
.setUuid(actorRef.uuid)
.setId(actorRef.id)
.setActorClassname(actorRef.actorClass.getName)
.setOriginalAddress(originalAddress)
.setIsTransactor(a.isTransactor)
.setTimeout(a.timeout)
.setIsTransactor(actorRef.isTransactor)
.setTimeout(actorRef.timeout)
builder.setActorInstance(ByteString.copyFrom(format.toBinary(a.actor.asInstanceOf[T])))
actorRef.receiveTimeout.foreach(builder.setReceiveTimeout(_))
builder.setActorInstance(ByteString.copyFrom(format.toBinary(actorRef.actor.asInstanceOf[T])))
lifeCycleProtocol.foreach(builder.setLifeCycle(_))
a.supervisor.foreach(s => builder.setSupervisor(RemoteActorSerialization.toRemoteActorRefProtocol(s)))
actorRef.supervisor.foreach(s => builder.setSupervisor(RemoteActorSerialization.toRemoteActorRefProtocol(s)))
// FIXME: how to serialize the hotswap PartialFunction ??
//hotswap.foreach(builder.setHotswapStack(_))
builder.build
@ -161,6 +162,7 @@ object ActorSerialization {
protocol.getOriginalAddress.getPort,
if (protocol.hasIsTransactor) protocol.getIsTransactor else false,
if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT,
if (protocol.hasReceiveTimeout) Some(protocol.getReceiveTimeout) else None,
lifeCycle,
supervisor,
hotswap,

View file

@ -53,7 +53,7 @@ final class MessageInvocation(val receiver: ActorRef,
"\n\tsender = " + sender +
"\n\tsenderFuture = " + senderFuture +
"\n\ttransactionSet = " + transactionSet +
"\n]"
"]"
}
}

View file

@ -151,11 +151,17 @@ abstract class BasicClusterActor extends ClusterActor with Logging {
case InitClusterActor(s) => {
serializer = s
boot
}
}
/**
* Implement this in a subclass to add node-to-node messaging
* Implement this in a subclass to boot up the cluster implementation
*/
protected def boot: Unit
/**
* Implement this in a subclass to add node-to-node messaging
*/
protected def toOneNode(dest: ADDR_T, msg: Array[Byte]): Unit

View file

@ -17,9 +17,8 @@ class JGroupsClusterActor extends BasicClusterActor {
@volatile private var isActive = false
@volatile private var channel: Option[JChannel] = None
override def init = {
super.init
log info "Initiating JGroups-based cluster actor"
protected def boot = {
log info "Booting JGroups-based cluster"
isActive = true
// Set up the JGroups local endpoint

View file

@ -215,11 +215,15 @@ class RemoteServer extends Logging {
def shutdown = synchronized {
if (_isRunning) {
RemoteServer.unregister(hostname, port)
openChannels.disconnect
openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources
Cluster.deregisterLocalNode(hostname, port)
try {
RemoteServer.unregister(hostname, port)
openChannels.disconnect
openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources
Cluster.deregisterLocalNode(hostname, port)
} catch {
case e: java.nio.channels.ClosedChannelException => log.warning("Could not close remote server channel in a graceful way")
}
}
}

View file

@ -4,7 +4,9 @@
package se.scalablesolutions.akka.stm
import javax.transaction.{TransactionManager, UserTransaction, Transaction => JtaTransaction, SystemException, Status, Synchronization, TransactionSynchronizationRegistry}
import javax.transaction.{TransactionManager, UserTransaction,
Transaction => JtaTransaction, SystemException,
Status, Synchronization, TransactionSynchronizationRegistry}
import javax.naming.{InitialContext, Context, NamingException}
import se.scalablesolutions.akka.config.Config._
@ -16,7 +18,7 @@ import se.scalablesolutions.akka.util.Logging
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object TransactionContainer extends Logging {
val AKKA_JTA_TRANSACTION_SERVICE_CLASS = "se.scalablesolutions.akka.jta.AtomikosTransactionService"
val AKKA_JTA_TRANSACTION_SERVICE_CLASS = "se.scalablesolutions.akka.jta.AtomikosTransactionService"
val DEFAULT_USER_TRANSACTION_NAME = "java:comp/UserTransaction"
val FALLBACK_TRANSACTION_MANAGER_NAMES = "java:comp/TransactionManager" ::
"java:appserver/TransactionManager" ::
@ -119,22 +121,31 @@ class TransactionContainer private (val tm: Either[Option[UserTransaction], Opti
}
}
def begin = tm match {
case Left(Some(userTx)) => userTx.begin
case Right(Some(txMan)) => txMan.begin
case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope")
def begin = {
TransactionContainer.log.ifTrace("Starting JTA transaction")
tm match {
case Left(Some(userTx)) => userTx.begin
case Right(Some(txMan)) => txMan.begin
case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope")
}
}
def commit = tm match {
case Left(Some(userTx)) => userTx.commit
case Right(Some(txMan)) => txMan.commit
case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope")
def commit = {
TransactionContainer.log.ifTrace("Committing JTA transaction")
tm match {
case Left(Some(userTx)) => userTx.commit
case Right(Some(txMan)) => txMan.commit
case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope")
}
}
def rollback = tm match {
case Left(Some(userTx)) => userTx.rollback
case Right(Some(txMan)) => txMan.rollback
case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope")
def rollback = {
TransactionContainer.log.ifTrace("Aborting JTA transaction")
tm match {
case Left(Some(userTx)) => userTx.rollback
case Right(Some(txMan)) => txMan.rollback
case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope")
}
}
def getStatus = tm match {

View file

@ -83,11 +83,12 @@ object Transaction {
if (JTA_AWARE) Some(TransactionContainer())
else None
log.trace("Creating %s", toString)
log.ifTrace("Creating transaction " + toString)
// --- public methods ---------
def begin = synchronized {
log.ifTrace("Starting transaction " + toString)
jta.foreach { txContainer =>
txContainer.begin
txContainer.registerSynchronization(new StmSynchronization(txContainer, this))
@ -95,14 +96,14 @@ object Transaction {
}
def commit = synchronized {
log.trace("Committing transaction %s", toString)
log.ifTrace("Committing transaction " + toString)
persistentStateMap.valuesIterator.foreach(_.commit)
status = TransactionStatus.Completed
jta.foreach(_.commit)
}
def abort = synchronized {
log.trace("Aborting transaction %s", toString)
log.ifTrace("Aborting transaction " + toString)
jta.foreach(_.rollback)
persistentStateMap.valuesIterator.foreach(_.abort)
persistentStateMap.clear

View file

@ -37,8 +37,8 @@ object TransactionConfig {
def traceLevel(level: String) = level.toLowerCase match {
case "coarse" | "course" => Transaction.TraceLevel.Coarse
case "fine" => Transaction.TraceLevel.Fine
case _ => Transaction.TraceLevel.None
case "fine" => Transaction.TraceLevel.Fine
case _ => Transaction.TraceLevel.None
}
/**
@ -126,7 +126,7 @@ object TransactionFactory {
traceLevel: TraceLevel = TransactionConfig.TRACE_LEVEL,
hooks: Boolean = TransactionConfig.HOOKS) = {
val config = new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew,
explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks)
explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks)
new TransactionFactory(config)
}
}

View file

@ -7,6 +7,7 @@ package se.scalablesolutions.akka.stm
import se.scalablesolutions.akka.util.Logging
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.TimeUnit
import org.multiverse.api.{StmUtils => MultiverseStmUtils}
import org.multiverse.api.ThreadLocalTransaction._
@ -14,16 +15,20 @@ import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.commitbarriers.CountDownCommitBarrier
import org.multiverse.templates.{TransactionalCallable, OrElseTemplate}
class StmException(msg: String) extends RuntimeException(msg)
class TransactionSetAbortedException(msg: String) extends RuntimeException(msg)
// TODO Should we remove TransactionAwareWrapperException? Not used anywhere yet.
class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
override def toString = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
}
/**
* Internal helper methods and properties for transaction management.
*/
object TransactionManagement extends TransactionManagement {
import se.scalablesolutions.akka.config.Config._
// move to stm.global.fair?
// FIXME move to stm.global.fair?
val FAIR_TRANSACTIONS = config.getBool("akka.stm.fair", true)
private[akka] val transactionSet = new ThreadLocal[Option[CountDownCommitBarrier]]() {
@ -47,6 +52,9 @@ object TransactionManagement extends TransactionManagement {
}
}
/**
* Internal helper methods for transaction management.
*/
trait TransactionManagement {
private[akka] def createNewTransactionSet: CountDownCommitBarrier = {
@ -111,7 +119,9 @@ class LocalStm extends TransactionManagement with Logging {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
factory.addHooks
body
val result = body
log.ifTrace("Committing local transaction [" + mtx + "]")
result
}
})
}
@ -145,10 +155,9 @@ class GlobalStm extends TransactionManagement with Logging {
factory.addHooks
val result = body
val txSet = getTransactionSetInScope
log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet)
// FIXME ? txSet.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
try { txSet.joinCommit(mtx) } catch { case e: IllegalStateException => {} }
clearTransaction
log.ifTrace("Committing global transaction [" + mtx + "]\n\tand joining transaction set [" + txSet + "]")
// Need to catch IllegalStateException until we have fix in Multiverse, since it throws it by mistake
try { txSet.tryJoinCommit(mtx, TransactionConfig.TIMEOUT, TimeUnit.MILLISECONDS) } catch { case e: IllegalStateException => {} }
result
}
})
@ -156,6 +165,7 @@ class GlobalStm extends TransactionManagement with Logging {
}
trait StmUtil {
/**
* Schedule a deferred task on the thread local transaction (use within an atomic).
* This is executed when the transaction commits.
@ -178,6 +188,14 @@ trait StmUtil {
/**
* Use either-orElse to combine two blocking transactions.
* Usage:
* <pre>
* either {
* ...
* } orElse {
* ...
* }
* </pre>
*/
def either[T](firstBody: => T) = new {
def orElse(secondBody: => T) = new OrElseTemplate[T] {

View file

@ -10,7 +10,7 @@ import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock}
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ReentrantGuard {
private val lock = new ReentrantLock
val lock = new ReentrantLock
def withGuard[T](body: => T): T = {
lock.lock
@ -20,6 +20,15 @@ class ReentrantGuard {
lock.unlock
}
}
def tryWithGuard[T](body: => T): T = {
while(!lock.tryLock) { Thread.sleep(10) } // wait on the monitor to be unlocked
try {
body
} finally {
lock.unlock
}
}
}
/**

View file

@ -2,35 +2,47 @@ package se.scalablesolutions.akka.actor;
import se.scalablesolutions.akka.actor.annotation.prerestart;
import se.scalablesolutions.akka.actor.annotation.postrestart;
import se.scalablesolutions.akka.actor.ActiveObjectContext;
import se.scalablesolutions.akka.dispatch.CompletableFuture;
public class SimpleJavaPojo {
public boolean pre = false;
public boolean post = false;
private String name;
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
@prerestart
public void pre() {
System.out.println("** pre()");
pre = true;
}
@postrestart
public void post() {
System.out.println("** post()");
post = true;
}
public void throwException() {
throw new RuntimeException();
}
ActiveObjectContext context;
public boolean pre = false;
public boolean post = false;
private String name;
public Object getSender() {
return context.getSender();
}
public CompletableFuture<Object> getSenderFuture() {
return context.getSenderFuture();
}
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
@prerestart
public void pre() {
System.out.println("** pre()");
pre = true;
}
@postrestart
public void post() {
System.out.println("** post()");
post = true;
}
public void throwException() {
throw new RuntimeException();
}
}

View file

@ -0,0 +1,20 @@
package se.scalablesolutions.akka.actor;
import se.scalablesolutions.akka.dispatch.CompletableFuture;
public class SimpleJavaPojoCaller {
SimpleJavaPojo pojo;
public void setPojo(SimpleJavaPojo pojo) {
this.pojo = pojo;
}
public Object getSenderFromSimpleJavaPojo() {
return pojo.getSender();
}
public CompletableFuture<Object> getSenderFutureFromSimpleJavaPojo() {
return pojo.getSenderFuture();
}
}

View file

@ -21,6 +21,7 @@ public class TransactionalActiveObject {
refState = new Ref();
isInitialized = true;
}
System.out.println("==========> init");
}
public String getMapState(String key) {
@ -37,6 +38,7 @@ public class TransactionalActiveObject {
public void setMapState(String key, String msg) {
mapState.put(key, msg);
System.out.println("==========> setMapState");
}
public void setVectorState(String msg) {
@ -72,6 +74,7 @@ public class TransactionalActiveObject {
mapState.put(key, msg);
vectorState.add(msg);
refState.swap(msg);
System.out.println("==========> failure");
nested.failure(key, msg, failer);
return msg;
}

View file

@ -0,0 +1,45 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
import org.scalatest.Spec
import org.scalatest.Assertions
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.dispatch.DefaultCompletableFuture;
@RunWith(classOf[JUnitRunner])
class ActiveObjectContextSpec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll {
describe("ActiveObjectContext") {
it("context.sender should return the sender Active Object reference") {
val pojo = ActiveObject.newInstance(classOf[SimpleJavaPojo])
val pojoCaller = ActiveObject.newInstance(classOf[SimpleJavaPojoCaller])
pojoCaller.setPojo(pojo)
try {
pojoCaller.getSenderFromSimpleJavaPojo should equal (pojoCaller)
} catch {
case e => fail("no sender available")
}
}
it("context.senderFuture should return the senderFuture Active Object reference") {
val pojo = ActiveObject.newInstance(classOf[SimpleJavaPojo])
val pojoCaller = ActiveObject.newInstance(classOf[SimpleJavaPojoCaller])
pojoCaller.setPojo(pojo)
try {
pojoCaller.getSenderFutureFromSimpleJavaPojo.getClass.getName should equal (classOf[DefaultCompletableFuture[_]].getName)
} catch {
case e => fail("no sender future available", e)
}
}
}
}

View file

@ -5,7 +5,9 @@ import org.scalatest.{BeforeAndAfterAll, Spec}
import org.scalatest.junit.JUnitRunner
import org.scalatest.matchers.ShouldMatchers
import se.scalablesolutions.akka.config.ActiveObjectConfigurator
import se.scalablesolutions.akka.actor.ActiveObject._
import se.scalablesolutions.akka.config.{OneForOneStrategy, ActiveObjectConfigurator}
import se.scalablesolutions.akka.config.JavaConfig._
/**
@ -151,5 +153,15 @@ class ActiveObjectLifecycleSpec extends Spec with ShouldMatchers with BeforeAndA
assert(!obj._post)
assert(obj._down)
}
it("both preRestart and postRestart methods should be invoked when an actor is restarted") {
val pojo = ActiveObject.newInstance(classOf[SimpleJavaPojo])
val supervisor = ActiveObject.newInstance(classOf[SimpleJavaPojo])
link(supervisor,pojo, new OneForOneStrategy(3, 2000),Array(classOf[Throwable]))
pojo.throwException
Thread.sleep(500)
pojo.pre should be(true)
pojo.post should be(true)
}
}
}

View file

@ -17,18 +17,11 @@ import se.scalablesolutions.akka.config.ActiveObjectConfigurator
import se.scalablesolutions.akka.config.JavaConfig._
import se.scalablesolutions.akka.actor._
/*
@RunWith(classOf[JUnitRunner])
class NestedTransactionalActiveObjectSpec extends
<<<<<<< HEAD:akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala
Spec with
ShouldMatchers with
BeforeAndAfterAll {
=======
Spec with
ShouldMatchers with
BeforeAndAfterAll {
>>>>>>> 38e8bea3fe6a7e9fcc9c5f353124144739bdc234:akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala
private val conf = new ActiveObjectConfigurator
private var messageLog = ""
@ -55,7 +48,7 @@ class NestedTransactionalActiveObjectSpec extends
}
describe("Transactional nested in-memory Active Object") {
/*
it("map should not rollback state for stateful server in case of success") {
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
stateful.init
@ -163,6 +156,6 @@ class NestedTransactionalActiveObjectSpec extends
Thread.sleep(100)
nested.getRefState should equal("init")
}
*/
}
}
*/
}

View file

@ -110,6 +110,8 @@ class SerializableTypeClassActorSpec extends
val actor2 = fromBinary(bytes)
actor2.start
(actor2 !! "hello").getOrElse("_") should equal("world 3")
actor2.receiveTimeout should equal (Some(1000))
}
it("should be able to serialize and deserialize a MyStatelessActorWithMessagesInMailbox") {
@ -172,7 +174,8 @@ class MyStatelessActorWithMessagesInMailbox extends Actor {
@serializable class MyJavaSerializableActor extends Actor {
var count = 0
self.receiveTimeout = Some(1000)
def receive = {
case "hello" =>
count = count + 1

View file

@ -6,12 +6,27 @@ package se.scalablesolutions.akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import java.lang.Throwable
import Actor._
import se.scalablesolutions.akka.config.OneForOneStrategy
import java.util.concurrent.{TimeUnit, CountDownLatch}
object SupervisorHierarchySpec {
class FireWorkerException(msg: String) extends Exception(msg)
class CountDownActor(countDown: CountDownLatch) extends Actor {
protected def receive = { case _ => () }
override def postRestart(reason: Throwable) = countDown.countDown
}
class CrasherActor extends Actor {
protected def receive = { case _ => () }
}
}
class SupervisorHierarchySpec extends JUnitSuite {
import SupervisorHierarchySpec._
@Test
def killWorkerShouldRestartMangerAndOtherWorkers = {
@ -19,7 +34,7 @@ class SupervisorHierarchySpec extends JUnitSuite {
val workerOne = actorOf(new CountDownActor(countDown))
val workerTwo = actorOf(new CountDownActor(countDown))
val workerThree = actorOf(new CountDownActor( countDown))
val workerThree = actorOf(new CountDownActor(countDown))
val boss = actorOf(new Actor{
self.trapExit = List(classOf[Throwable])
@ -35,19 +50,32 @@ class SupervisorHierarchySpec extends JUnitSuite {
manager.startLink(workerTwo)
manager.startLink(workerThree)
workerOne ! Exit(workerOne, new RuntimeException("Fire the worker!"))
workerOne ! Exit(workerOne, new FireWorkerException("Fire the worker!"))
// manager + all workers should be restarted by only killing a worker
// manager doesn't trap exits, so boss will restart manager
assert(countDown.await(4, TimeUnit.SECONDS))
assert(countDown.await(2, TimeUnit.SECONDS))
}
@Test
def supervisorShouldReceiveNotificationMessageWhenMaximumNumberOfRestartsWithinTimeRangeIsReached = {
val countDown = new CountDownLatch(2)
val crasher = actorOf(new CountDownActor(countDown))
val boss = actorOf(new Actor{
self.trapExit = List(classOf[Throwable])
self.faultHandler = Some(OneForOneStrategy(1, 5000))
protected def receive = {
case MaximumNumberOfRestartsWithinTimeRangeReached(_, _, _, _) =>
countDown.countDown
}
}).start
boss.startLink(crasher)
class CountDownActor(countDown: CountDownLatch) extends Actor {
crasher ! Exit(crasher, new FireWorkerException("Fire the worker!"))
crasher ! Exit(crasher, new FireWorkerException("Fire the worker!"))
protected def receive = { case _ => () }
override def postRestart(reason: Throwable) = countDown.countDown
assert(countDown.await(2, TimeUnit.SECONDS))
}
}

View file

@ -16,18 +16,12 @@ import se.scalablesolutions.akka.config._
import se.scalablesolutions.akka.config.ActiveObjectConfigurator
import se.scalablesolutions.akka.config.JavaConfig._
import se.scalablesolutions.akka.actor._
/*
@RunWith(classOf[JUnitRunner])
class TransactionalActiveObjectSpec extends
<<<<<<< HEAD:akka-core/src/test/scala/TransactionalActiveObjectSpec.scala
Spec with
ShouldMatchers with
BeforeAndAfterAll {
=======
Spec with
ShouldMatchers with
BeforeAndAfterAll {
>>>>>>> 38e8bea3fe6a7e9fcc9c5f353124144739bdc234:akka-core/src/test/scala/TransactionalActiveObjectSpec.scala
private val conf = new ActiveObjectConfigurator
private var messageLog = ""
@ -50,9 +44,9 @@ class TransactionalActiveObjectSpec extends
override def afterAll {
conf.stop
}
describe("Transactional in-memory Active Object ") {
/*
it("map should not rollback state for stateful server in case of success") {
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
stateful.init
@ -64,7 +58,9 @@ class TransactionalActiveObjectSpec extends
it("map should rollback state for stateful server in case of failure") {
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
stateful.init
Thread.sleep(500)
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init")
Thread.sleep(500)
val failer = conf.getInstance(classOf[ActiveObjectFailer])
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer)
@ -112,6 +108,6 @@ class TransactionalActiveObjectSpec extends
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state")
stateful.getRefState should equal("new state")
}
*/
}
}
*/

View file

@ -76,7 +76,6 @@ class StatefulTransactor(expectedInvocationCount: Int) extends Transactor {
failer !! "Failure"
self.reply(msg)
notifier.countDown
case SetMapStateOneWay(key, msg) =>
mapState.put(key, msg)
notifier.countDown
@ -95,8 +94,8 @@ class StatefulTransactor(expectedInvocationCount: Int) extends Transactor {
mapState.put(key, msg)
vectorState.add(msg)
refState.swap(msg)
failer ! "Failure"
notifier.countDown
failer ! "Failure"
}
}
@ -110,6 +109,7 @@ class FailerTransactor extends Transactor {
}
class TransactorSpec extends JUnitSuite {
@Test
def shouldOneWayMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = actorOf(new StatefulTransactor(2))
@ -139,7 +139,7 @@ class TransactorSpec extends JUnitSuite {
stateful ! SetMapStateOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
val notifier = (stateful !! GetNotifier).as[CountDownLatch]
assert(notifier.get.await(1, TimeUnit.SECONDS))
assert(notifier.get.await(5, TimeUnit.SECONDS))
assert("init" === (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
}

View file

@ -50,6 +50,11 @@ class AkkaServlet extends AtmosphereServlet with Logging {
addInitParameter(AtmosphereServlet.PROPERTY_USE_STREAM,"true")
addInitParameter("com.sun.jersey.config.property.packages",c.getList("akka.rest.resource_packages").mkString(";"))
addInitParameter("com.sun.jersey.spi.container.ResourceFilters",c.getList("akka.rest.filters").mkString(","))
c.getInt("akka.rest.maxInactiveActivity").foreach { value =>
log.info("MAX_INACTIVE:%s",value.toString)
addInitParameter(CometSupport.MAX_INACTIVE,value.toString)
}
val servlet = new AtmosphereRestServlet {
override def getInitParameter(key : String) = AkkaServlet.this.getInitParameter(key)

View file

@ -68,7 +68,7 @@ object World {
lazy val ants = setup
lazy val evaporator = actorOf[Evaporator].start
private val snapshotFactory = TransactionFactory(readonly = true, familyName = "snapshot")
private val snapshotFactory = TransactionFactory(readonly = true, familyName = "snapshot", hooks = false)
def snapshot = atomic(snapshotFactory) { Array.tabulate(Dim, Dim)(place(_, _).get) }
@ -139,7 +139,7 @@ class AntActor(initLoc: (Int, Int)) extends WorldActor {
val locRef = Ref(initLoc)
val name = "ant-from-" + initLoc._1 + "-" + initLoc._2
implicit val txFactory = TransactionFactory(familyName = name)
implicit val txFactory = TransactionFactory(familyName = name, hooks = false)
val homing = (p: Place) => p.pher + (100 * (if (p.home) 0 else 1))
val foraging = (p: Place) => p.pher + p.food
@ -211,7 +211,7 @@ class Evaporator extends WorldActor {
import Config._
import World._
implicit val txFactory = TransactionFactory(familyName = "evaporator")
implicit val txFactory = TransactionFactory(familyName = "evaporator", hooks = false)
val evaporate = (pher: Float) => pher * EvapRate
def act = for (x <- 0 until Dim; y <- 0 until Dim) {

View file

@ -5,16 +5,21 @@
# This file has all the default settings, so all these could be removed with no visible effect.
# Modify as needed.
<log>
log {
filename = "./logs/akka.log"
roll = "daily" # Options: never, hourly, daily, sunday/monday/...
level = "debug" # Options: fatal, critical, error, warning, info, debug, trace
console = on
# syslog_host = ""
# syslog_server_name = ""
</log>
<akka>
akka {
node = "se.scalablesolutions.akka"
level = "info"
}
}
akka {
version = "0.10"
# FQN (Fully Qualified Name) to the class doing initial active object/actor
@ -24,81 +29,81 @@
"sample.rest.scala.Boot",
"sample.security.Boot"]
<actor>
actor {
timeout = 5000 # default timeout for future based invocations
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
throughput = 5 # default throughput for ExecutorBasedEventDrivenDispatcher
</actor>
}
<stm>
stm {
fair = on # should global transactions be fair or non-fair (non fair yield better performance)
jta-aware = off # 'on' means that if there JTA Transaction Manager available then the STM will
# begin (or join), commit or rollback the JTA transaction. Default is 'off'.
</stm>
}
<jta>
jta {
provider = "from-jndi" # Options: "from-jndi" (means that Akka will try to detect a TransactionManager in the JNDI)
# "atomikos" (means that Akka will use the Atomikos based JTA impl in 'akka-jta',
# e.g. you need the akka-jta JARs on classpath).
timeout = 60000
</jta>
}
<rest>
rest {
service = on
hostname = "localhost"
port = 9998
filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use
resource_packages = ["sample.rest.scala","sample.rest.java","sample.security"] # List with all resource packages for your Jersey services
authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now)
#maxInactiveActivity = 60000 #Atmosphere CometSupport maxInactiveActivity
#IF you are using a KerberosAuthenticationActor
# <kerberos>
# kerberos {
# servicePrincipal = "HTTP/localhost@EXAMPLE.COM"
# keyTabLocation = "URL to keytab"
# kerberosDebug = "true"
# realm = "EXAMPLE.COM"
# </kerberos>
</rest>
# }
}
<remote>
remote {
compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression
zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
<cluster>
cluster {
service = on
name = "default" # The name of the cluster
serializer = "se.scalablesolutions.akka.serialization.Serializer$Java$" # FQN of the serializer class
</cluster>
}
<server>
server {
service = on
hostname = "localhost"
port = 9999
connection-timeout = 1000 # in millis (1 sec default)
</server>
}
<client>
client {
reconnect-delay = 5000 # in millis (5 sec default)
read-timeout = 10000 # in millis (10 sec default)
</client>
</remote>
}
}
<storage>
<cassandra>
storage {
cassandra {
hostname = "127.0.0.1" # IP address or hostname of one of the Cassandra cluster's seeds
port = 9160
consistency-level = "QUORUM" # Options: ZERO, ONE, QUORUM, DCQUORUM, DCQUORUMSYNC, ALL, ANY
</cassandra>
}
<mongodb>
mongodb {
hostname = "127.0.0.1" # IP address or hostname of the MongoDB DB instance
port = 27017
dbname = "mydb"
</mongodb>
}
<redis>
redis {
hostname = "127.0.0.1" # IP address or hostname of the Redis instance
port = 6379
</redis>
</storage>
</akka>
}
}
}

View file

@ -0,0 +1,46 @@
<?xml version='1.0' encoding='UTF-8'?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>net.lag</groupId>
<artifactId>configgy</artifactId>
<packaging>jar</packaging>
<version>2.8.0-1.5.5</version>
<licenses>
<license>
<name>Apache 2</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>
<dependencies>
<dependency>
<groupId>org.scala-tools</groupId>
<artifactId>vscaladoc</artifactId>
<version>1.1-md-3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.8.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>PublicReleasesRepository</id>
<name>Public Releases Repository</name>
<url>http://maven/content/groups/public/</url>
</repository>
<repository>
<id>PublicSnapshots</id>
<name>Public Snapshots</name>
<url>http://maven/content/groups/public-snapshots/</url>
</repository>
<repository>
<id>ScalaToolsMaven2Repository</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases/</url>
</repository>
</repositories>
</project>

View file

@ -0,0 +1,261 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>multiverse-alpha</artifactId>
<name>Alpha Multiverse STM engine</name>
<description>
Contains an all in one jar that that contains the AlphaStm including the Multiverse
Javaagent and the Multiverse Compiler. This is the JAR you want to include in your
projects, if you do, you don't need to worry about any Multiverse dependency
at all.
</description>
<packaging>jar</packaging>
<version>0.6-2010-07-15</version>
<parent>
<groupId>org.multiverse</groupId>
<artifactId>multiverse</artifactId>
<version>0.6-2010-07-15</version>
</parent>
<properties>
<multiverse.agentclass>org.multiverse.javaagent.MultiverseJavaAgent</multiverse.agentclass>
<multiverse.instrumentor>org.multiverse.stms.alpha.instrumentation.AlphaStmInstrumentor
</multiverse.instrumentor>
</properties>
<build>
<plugins>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>create-main-jar</id>
<phase>compile</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<tasks>
<mkdir dir="${project.build.directory}"/>
<delete file="${project.build.outputDirectory}/GiveMavenSomethingToCompile.class"/>
<echo message="-------------------------------------------"/>
<echo message="Building ${artifactId}"/>
<echo message="-------------------------------------------"/>
<echo message="Unzipping all artifacts"/>
<unzip src="${settings.localRepository}/org/multiverse/multiverse-core/${project.version}/multiverse-core-${project.version}.jar"
dest="${project.build.outputDirectory}"/>
<unzip src="${settings.localRepository}/org/multiverse/multiverse-instrumentation/${project.version}/multiverse-instrumentation-${project.version}.jar"
dest="${project.build.outputDirectory}"/>
<unzip src="${settings.localRepository}/args4j/args4j/${args4j.version}/args4j-${args4j.version}.jar"
dest="${project.build.outputDirectory}"/>
<unzip src="${settings.localRepository}/asm/asm-all/${asm.version}/asm-all-${asm.version}.jar"
dest="${project.build.outputDirectory}"/>
<unzip src="${settings.localRepository}/org/multiverse/multiverse-alpha-unborn/${project.version}/multiverse-alpha-unborn-${project.version}.jar"
dest="${project.build.outputDirectory}"/>
<echo message="-------------------------------------------"/>
<echo message="${settings.localRepository}/asm/asm-all/${asm.version}/asm-all-${asm.version}.jar"/>
<echo message="-------------------------------------------"/>
<java classname="org.multiverse.compiler.MultiverseCompiler">
<arg value="-o"/>
<arg value="-v"/>
<arg value="-d"/>
<arg value="${project.build.directory}/classes"/>
<classpath>
<pathelement
location="${settings.localRepository}/org/multiverse/multiverse-core/${project.version}/multiverse-core-${project.version}.jar"/>
<pathelement
location="${settings.localRepository}/org/multiverse/multiverse-instrumentation/${project.version}/multiverse-instrumentation-${project.version}.jar"/>
<pathelement
location="${settings.localRepository}/org/multiverse/multiverse-alpha-unborn/${project.version}/multiverse-alpha-unborn-${project.version}.jar"/>
<pathelement
location="${settings.localRepository}/args4j/args4j/${args4j.version}/args4j-${args4j.version}.jar"/>
<pathelement
location="${settings.localRepository}/asm/asm-all/${asm.version}/asm-all-${asm.version}.jar"/>
</classpath>
</java>
<!-- we are going create a new set of class files where the dependant
jars are integrated. Unfortunately jarjar only works on jars, so
we temporarily create the jar, unzip it and remove it again-->
<echo message="Creating Jar"/>
<taskdef name="jarjar" classname="com.tonicsystems.jarjar.JarJarTask"/>
<jarjar destfile="${project.build.directory}/${artifactId}-${project.version}.jar"
update="true">
<rule pattern="org.objectweb.asm.**" result="${groupId}.repackaged.@0"/>
<fileset dir="${project.build.outputDirectory}"/>
<manifest>
<attribute name="Premain-Class" value="${multiverse.agentclass}"/>
</manifest>
</jarjar>
<delete dir="${project.build.outputDirectory}"/>
<mkdir dir="${project.build.outputDirectory}"/>
<unzip src="${project.build.directory}/${artifactId}-${project.version}.jar"
dest="${project.build.outputDirectory}"/>
<delete file="${project.build.directory}/${artifactId}-${project.version}.jar"/>
</tasks>
</configuration>
</execution>
<execution>
<id>Compiles the tests</id>
<phase>test-compile</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<tasks>
<echo message="Successfully created ${project.build.directory}/${artifactId}-${project.version}.jar"/>
<echo message="-------------------------------------------"/>
<echo message="Instrumenting the Multiverse-alpha test classes"/>
<echo message="-------------------------------------------"/>
<mkdir dir="${project.build.testOutputDirectory}"/>
<unzip src="${settings.localRepository}/org/multiverse/multiverse-instrumentation/${project.version}/multiverse-instrumentation-${project.version}-tests.jar"
dest="${project.build.testOutputDirectory}"/>
<delete>
<fileset dir="${project.build.testOutputDirectory}" includes="**/*Test.class"/>
</delete>
<unzip src="${settings.localRepository}/org/multiverse/multiverse-alpha-unborn/${project.version}/multiverse-alpha-unborn-${project.version}-tests.jar"
dest="${project.build.testOutputDirectory}"/>
<delete>
<fileset dir="${project.build.testOutputDirectory}"
includes="**/transactions/**/Test.class"/>
</delete>
<unzip src="${settings.localRepository}/org/multiverse/multiverse-core/${project.version}/multiverse-core-${project.version}-tests.jar"
dest="${project.build.testOutputDirectory}"/>
<!-- remove tests that can't be executed with the multiverse compiler.
The Multiverse compiler aborts when an instrumentation error occurs
and these errors cause instrumentation errors (their purpose) -->
<delete file="${project.build.testOutputDirectory}/org/multiverse/stms/alpha/instrumentation/transactionalmethod/TransactionalMethod_InterruptibleTest.class"/>
<delete file="${project.build.testOutputDirectory}/org/multiverse/stms/alpha/instrumentation/transactionalmethod/TransactionalMethod_InterruptibleTest$MethodWithoutException.class"/>
<delete file="${project.build.testOutputDirectory}/org/multiverse/stms/alpha/instrumentation/transactionalmethod/TransactionalMethod_InterruptibleTest$MethodWithIncorrectException.class"/>
<delete file="${project.build.testOutputDirectory}/org/multiverse/stms/alpha/instrumentation/fieldaccess/TransactionalObject_ClashingFieldAndMethodTest.class"/>
<delete file="${project.build.testOutputDirectory}/org/multiverse/stms/alpha/instrumentation/fieldaccess/TransactionalObject_ClashingFieldAndMethodTest$ObjectWithClashingMethod.class"/>
<java classname="org.multiverse.compiler.MultiverseCompiler">
<arg value="-o"/>
<arg value="-v"/>
<arg value="-d"/>
<arg value="${project.build.testOutputDirectory}"/>
<classpath>
<pathelement
location="${settings.localRepository}/org/multiverse/multiverse-core/${project.version}/multiverse-core-${project.version}.jar"/>
<pathelement
location="${settings.localRepository}/org/multiverse/multiverse-instrumentation/${project.version}/multiverse-instrumentation-${project.version}.jar"/>
<pathelement
location="${settings.localRepository}/org/multiverse/multiverse-alpha-unborn/${project.version}/multiverse-alpha-unborn-${project.version}.jar"/>
<pathelement
location="${settings.localRepository}/args4j/args4j/${args4j.version}/args4j-${args4j.version}.jar"/>
<pathelement
location="${settings.localRepository}/asm/asm-all/${asm.version}/asm-all-${asm.version}.jar"/>
</classpath>
</java>
<echo message="Successfully instrumented the test classes"/>
</tasks>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>ant</groupId>
<artifactId>ant</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>ant</groupId>
<artifactId>optional</artifactId>
<version>1.5.4</version>
</dependency>
<dependency>
<groupId>com.tonicsystems.jarjar</groupId>
<artifactId>jarjar</artifactId>
<version>1.0-rc8</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifestEntries>
<Premain-Class>${multiverse.agentclass}</Premain-Class>
</manifestEntries>
<manifestFile>${project.build.outputDirectory}/META-INF/MANIFEST.MF</manifestFile>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!-- Multiverse dependencies -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>multiverse-alpha-unborn</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>multiverse-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>multiverse-instrumentation</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>multiverse-alpha-unborn</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
<version>${args4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.tonicsystems.jarjar</groupId>
<artifactId>jarjar</artifactId>
<version>1.0-rc8</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>asm</groupId>
<artifactId>asm-all</artifactId>
<version>${asm.version}</version>
</dependency>
</dependencies>
</project>

View file

@ -0,0 +1,488 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<name>Multiverse Software Transactional Memory</name>
<description>
Multiverse is a Software Transactional Memory implementation that can be used in Java
but also in other languages running on the JVM like Scala or Groovy. Essentially it is a framework that allows
different STM implementation (with different featuresets or performance characteristics) to be used
under the hood. The main STM implementation is multiverse-alpha..
</description>
<groupId>org.multiverse</groupId>
<artifactId>multiverse</artifactId>
<version>0.6-2010-07-15</version>
<inceptionYear>2008</inceptionYear>
<packaging>pom</packaging>
<properties>
<sourceEncoding>UTF-8</sourceEncoding>
<targetJdk>1.6</targetJdk>
<asm.version>3.2</asm.version>
<args4j.version>2.0.16</args4j.version>
<junit.version>4.8.1</junit.version>
<mockito.version>1.8.2</mockito.version>
</properties>
<licenses>
<license>
<name>The Apache License, ASL Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0</url>
</license>
</licenses>
<organization>
<name>Multiverse</name>
<url>http://multiverse.codehaus.org</url>
</organization>
<developers>
<developer>
<id>pveentjer</id>
<name>Peter Veentjer</name>
<timezone>+1</timezone>
<email>alarmnummer AT gmail DOTCOM</email>
<roles>
<role>Founder</role>
</roles>
</developer>
<developer>
<id>aphillips</id>
<name>Andrew Phillips</name>
<timezone>+1</timezone>
<email>aphillips AT qrmedia DOTCOM</email>
<roles>
<role>Committer</role>
</roles>
</developer>
</developers>
<repositories>
<repository>
<id>maven.atlassian.com</id>
<name>Atlassian Maven Proxy</name>
<url>https://maven.atlassian.com/content/groups/public</url>
</repository>
<repository>
<id>repo1.maven</id>
<name>Maven Main Repository</name>
<url>http://repo1.maven.org/maven2</url>
</repository>
<repository>
<id>maven2-repository.dev.java.net</id>
<name>Java.net Repository for Maven</name>
<url>http://download.java.net/maven/2</url>
</repository>
<repository>
<id>java.net</id>
<name>Java.net Legacy Repository for Maven</name>
<url>http://download.java.net/maven/1</url>
<layout>legacy</layout>
</repository>
<repository>
<id>google-maven-repository</id>
<name>Google Maven Repository</name>
<url>http://google-maven-repository.googlecode.com/svn/repository/</url>
</repository>
<repository>
<id>repository.codehaus.org</id>
<name>Codehaus Maven Repository</name>
<url>http://repository.codehaus.org</url>
</repository>
<repository>
<id>ibiblio</id>
<url>http://www.ibiblio.org/maven</url>
</repository>
<repository>
<id>sourceforge</id>
<url>http://maven-plugins.sourceforge.net/repository</url>
</repository>
<!-- contains the google data api's-->
<repository>
<id>mandubian-mvn</id>
<url>http://mandubian-mvn.googlecode.com/svn/trunk/mandubian-mvn/repository</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>snapshots</id>
<url>http://snapshots.maven.codehaus.org/maven2</url>
</pluginRepository>
</pluginRepositories>
<modules>
<module>multiverse-benchy</module>
<!-- module>multiverse-documentation</module -->
<module>multiverse-core</module>
<module>multiverse-core-tests</module>
<module>multiverse-instrumentation</module>
<module>multiverse-alpha-unborn</module>
<module>multiverse-alpha</module>
<!-- module>multiverse-scala</module -->
<module>multiverse-site</module>
<!-- module>multiverse-project-archetype</module -->
<module>multiverse-performance-tool</module>
</modules>
<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<encoding>${sourceEncoding}</encoding>
<source>${targetJdk}</source>
<target>${targetJdk}</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<configuration>
<encoding>${sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/*LongTest.java</exclude>
<exclude>**/*longTest.java</exclude>
<exclude>**/*StressTest.java</exclude>
<exclude>**/*stressTest.java</exclude>
<exclude>**/*PerformanceTest.java</exclude>
<exclude>**/*performanceTest.java</exclude>
</excludes>
<includes>
<include>**/*Test.java</include>
</includes>
<forkMode>once</forkMode>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<executions>
<execution>
<id>enforce-java</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<requireJavaVersion>
<version>${targetJdk}</version>
</requireJavaVersion>
</rules>
</configuration>
</execution>
</executions>
</plugin>
<!-- plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>failsafe-maven-plugin</artifactId>
<version>2.4.3-alpha-1</version>
<configuration>
<encoding>${sourceEncoding}</encoding>
<includes>
<include>**/*LongTest.java</include>
</includes>
<argLine>-Xmx256m</argLine>
<forkMode>once</forkMode>
</configuration>
<executions>
<execution>
<id>integration-test</id>
<goals>
<goal>integration-test</goal>
</goals>
</execution>
<execution>
<id>verify</id>
<goals>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin -->
<plugin>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- plugin>
<groupId>com.atlassian.maven.plugins</groupId>
<artifactId>maven-clover2-plugin</artifactId>
<executions>
<execution>
<id>verify</id>
<phase>verify</phase>
<goals>
<goal>instrument</goal>
<goal>check</goal>
</goals>
</execution>
<execution>
<id>pre-site</id>
<phase>pre-site</phase>
<goals>
<goal>instrument</goal>
</goals>
</execution>
</executions>
</plugin -->
</plugins>
<extensions>
<extension>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-webdav</artifactId>
<version>1.0-beta-2</version>
</extension>
<extension>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-ftp</artifactId>
<version>1.0-beta-6</version>
</extension>
</extensions>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<scm>
<connection>scm:git:git://git.codehaus.org/multiverse.git</connection>
<developerConnection>scm:git:ssh://git@git.codehaus.org/multiverse.git</developerConnection>
<url>http://git.codehaus.org/gitweb.cgi?p=multiverse.git</url>
</scm>
<issueManagement>
<system>Jira</system>
<url>http://jira.codehaus.org/browse/MULTIVERSE</url>
</issueManagement>
<mailingLists>
<mailingList>
<name>Development List</name>
<subscribe>dev-subscribe@multiverse.codehaus.org</subscribe>
<unsubscribe>dev-unsubscribe@multiverse.codehaus.org</unsubscribe>
<post>dev@multiverse.codehaus.org</post>
<archive>http://archive.multiverse.codehaus.org/dev</archive>
</mailingList>
<mailingList>
<name>User List</name>
<subscribe>user-subscribe@multiverse.codehaus.org</subscribe>
<unsubscribe>user-unsubscribe@multiverse.codehaus.org</unsubscribe>
<post>user@multiverse.codehaus.org</post>
<archive>http://archive.multiverse.codehaus.org/user</archive>
</mailingList>
<mailingList>
<name>Commits List</name>
<subscribe>scm-subscribe@multiverse.codehaus.org</subscribe>
<unsubscribe>scm-unsubscribe@multiverse.codehaus.org</unsubscribe>
<archive>http://archive.multiverse.codehaus.org/scm</archive>
</mailingList>
</mailingLists>
<reporting>
<plugins>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.6.1</version>
<configuration>
<encoding>${sourceEncoding}</encoding>
<quiet>true</quiet>
</configuration>
<reportSets>
<reportSet>
<id>default</id>
<reports>
<report>aggregate</report>
</reports>
</reportSet>
</reportSets>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>jxr-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>taglist-maven-plugin</artifactId>
<version>2.3</version>
<configuration>
<encoding>${sourceEncoding}</encoding>
<tags>
<tag>FIXME</tag>
<tag>Fixme</tag>
<tag>fixme</tag>
<tag>TODO</tag>
<tag>todo</tag>
<tag>Todo</tag>
<tag>@todo</tag>
<tag>@deprecated</tag>
</tags>
</configuration>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
</plugin>
<plugin>
<artifactId>maven-changes-plugin</artifactId>
<version>2.0-beta-3</version>
<reportSets>
<reportSet>
<reports>
<report>changes-report</report>
</reports>
</reportSet>
</reportSets>
<configuration>
<xmlPath>${basedir}/changes.xml</xmlPath>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-report-plugin</artifactId>
<configuration>
<showSuccess>false</showSuccess>
</configuration>
<reportSets>
<reportSet>
<reports>
<report>report-only</report>
</reports>
</reportSet>
</reportSets>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<version>2.0.1</version>
</plugin>
<plugin>
<artifactId>maven-pmd-plugin</artifactId>
<version>2.3</version>
<configuration>
<sourceEncoding>${sourceEncoding}</sourceEncoding>
<targetJdk>${targetJdk}</targetJdk>
</configuration>
</plugin>
<!-- plugin>
<groupId>com.atlassian.maven.plugins</groupId>
<artifactId>maven-clover2-plugin</artifactId>
<configuration>
<generateHistorical>true</generateHistorical>
</configuration>
</plugin -->
</plugins>
</reporting>
<distributionManagement>
<repository>
<id>multiverse-releases</id>
<name>Multiverse Central Repository</name>
<url>dav:https://dav.codehaus.org/repository/multiverse/</url>
</repository>
<snapshotRepository>
<id>multiverse-snapshots</id>
<name>Multiverse Central Development Repository</name>
<url>dav:https://dav.codehaus.org/snapshots.repository/multiverse/</url>
</snapshotRepository>
<!-- site>
<id>multiverse-site</id>
<name>Multiverse Maven site</name>
<url>dav:https://dav.codehaus.org/multiverse/maven-site</url>
</site -->
<downloadUrl>http://dist.codehaus.org/multiverse/</downloadUrl>
</distributionManagement>
<profiles>
<profile>
<id>release</id>
<build>
<plugins>
<!-- should only be executed at the top-level POM -->
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.6.1</version>
<inherited>false</inherited>
<executions>
<execution>
<phase>generate-resources</phase>
<goals>
<goal>aggregate</goal>
</goals>
</execution>
</executions>
<configuration>
<encoding>${sourceEncoding}</encoding>
<quiet>true</quiet>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-2</version>
<inherited>false</inherited>
<executions>
<execution>
<id>distribution</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptors>
<descriptor>distribution.xml</descriptor>
</descriptors>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>stress</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>**/*LongTest.java</include>
</includes>
<argLine>-Xmx256m</argLine>
<forkMode>once</forkMode>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View file

@ -0,0 +1,152 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.scala-tools</groupId>
<artifactId>scala-tools-parent</artifactId>
<version>1.3</version>
</parent>
<artifactId>vscaladoc</artifactId>
<version>1.1-md-3</version>
<name>${project.artifactId}</name>
<inceptionYear>2008</inceptionYear>
<scm>
<connection>scm:svn:http://vscaladoc.googlecode.com/svn/tags/vscaladoc-1.1</connection>
<developerConnection>scm:svn:https://vscaladoc.googlecode.com/svn/tags/vscaladoc-1.1</developerConnection>
<url>http://code.google.com/p/vscaladoc/source/browse/tags/vscaladoc-1.1</url>
</scm>
<issueManagement>
<system>code.google</system>
<url>http://code.google.com/p/vscaladoc/issues/list</url>
</issueManagement>
<ciManagement>
<system>hudson</system>
<url>http://scala-tools.org/hudson/job/vscaladoc</url>
</ciManagement>
<properties>
<scala.version>2.7.1</scala.version>
</properties>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<args>
<arg>-target:jvm-1.5</arg>
</args>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<excludes>
<exclude>org.scala-lang:scala-library</exclude>
</excludes>
<classpathContainers>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
<projectnatures>
<java.lang.String>ch.epfl.lamp.sdt.core.scalanature</java.lang.String>
<java.lang.String>org.eclipse.jdt.core.javanature</java.lang.String>
</projectnatures>
<buildcommands>
<java.lang.String>ch.epfl.lamp.sdt.core.scalabuilder</java.lang.String>
</buildcommands>
</configuration>
</plugin>
<plugin>
<groupId>net.sf.alchim</groupId>
<artifactId>yuicompressor-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compress</goal>
</goals>
</execution>
</executions>
<configuration>
<nosuffix>true</nosuffix>
<aggregations>
<aggregation>
<!-- remove files after aggregation (default: false) -->
<removeIncluded>true</removeIncluded>
<!-- insert new line after each concatenation (default: false) -->
<insertNewLine>true</insertNewLine>
<output>${project.build.directory}/classes/org/scala_tools/vscaladoc/_highlighter/shAll.js</output>
<!-- files to include, path relative to output's directory or absolute path-->
<includes>
<include>shCore*.js</include>
<include>shBrush*.js</include>
</includes>
</aggregation>
</aggregations>
</configuration>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<vscaladocVersion>1.0</vscaladocVersion>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>

View file

@ -0,0 +1,33 @@
<?xml version='1.0' encoding='UTF-8'?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>sbinary</groupId>
<artifactId>sbinary</artifactId>
<packaging>jar</packaging>
<version>2.8.0-0.3.1</version>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.8.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>PublicReleasesRepository</id>
<name>Public Releases Repository</name>
<url>http://maven/content/groups/public/</url>
</repository>
<repository>
<id>PublicSnapshots</id>
<name>Public Snapshots</name>
<url>http://maven/content/groups/public-snapshots/</url>
</repository>
<repository>
<id>ScalaToolsMaven2Repository</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases/</url>
</repository>
</repositories>
</project>

View file

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>sjson.json</groupId>
<artifactId>sjson</artifactId>
<version>0.7-SNAPSHOT-2.8.0</version>
<description>POM was created from install:install-file</description>
</project>

View file

@ -1,7 +1,7 @@
project.organization=se.scalablesolutions.akka
project.name=akka
project.version=0.10
scala.version=2.8.0.RC3
scala.version=2.8.0
sbt.version=0.7.4
def.scala.version=2.7.7
build.scala.versions=2.8.0.RC3
build.scala.versions=2.8.0

View file

@ -22,7 +22,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val SPRING_VERSION = "3.0.3.RELEASE"
val CASSANDRA_VERSION = "0.6.1"
val LIFT_VERSION = "2.0-scala280-SNAPSHOT"
val SCALATEST_VERSION = "1.2-for-scala-2.8.0.RC3-SNAPSHOT"
val SCALATEST_VERSION = "1.2-for-scala-2.8.0.final-SNAPSHOT"
val MULTIVERSE_VERSION = "0.6-SNAPSHOT"
// ------------------------------------------------------------
@ -84,9 +84,6 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
lazy val akka_kernel = project("akka-kernel", "akka-kernel", new AkkaKernelProject(_),
akka_core, akka_http, akka_spring, akka_camel, akka_persistence, akka_amqp)
// active object tests in java
lazy val akka_active_object_test = project("akka-active-object-test", "akka-active-object-test", new AkkaActiveObjectTestProject(_), akka_kernel)
// examples
lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_))
@ -182,10 +179,10 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val netty = "org.jboss.netty" % "netty" % "3.2.1.Final" % "compile"
val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile"
val commons_io = "commons-io" % "commons-io" % "1.4" % "compile"
val dispatch_json = "net.databinder" % "dispatch-json_2.8.0.RC3" % "0.7.4" % "compile"
val dispatch_http = "net.databinder" % "dispatch-http_2.8.0.RC3" % "0.7.4" % "compile"
val sjson = "sjson.json" % "sjson" % "0.6-SNAPSHOT-2.8.RC3" % "compile"
val sbinary = "sbinary" % "sbinary" % "2.8.0.RC3-0.3.1-SNAPSHOT" % "compile"
val dispatch_json = "net.databinder" % "dispatch-json_2.8.0" % "0.7.4" % "compile"
val dispatch_http = "net.databinder" % "dispatch-http_2.8.0" % "0.7.4" % "compile"
val sjson = "sjson.json" % "sjson" % "0.7-SNAPSHOT-2.8.0" % "compile"
val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile"
val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.2.1" % "compile"
val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % "1.2.1" % "compile"
val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile"
@ -193,7 +190,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val jta_1_1 = "org.apache.geronimo.specs" % "geronimo-jta_1.1_spec" % "1.1.1" % "compile" intransitive()
val werkz = "org.codehaus.aspectwerkz" % "aspectwerkz-nodeps-jdk5" % "2.2.1" % "compile"
val werkz_core = "org.codehaus.aspectwerkz" % "aspectwerkz-jdk5" % "2.2.1" % "compile"
val configgy = "net.lag" % "configgy" % "2.8.0.RC3-1.5.2-SNAPSHOT" % "compile"
val configgy = "net.lag" % "configgy" % "2.8.0-1.5.5" % "compile"
val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile"
val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile"
val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" % "compile"
@ -207,7 +204,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
class AkkaAMQPProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with CodeFellowPlugin {
val commons_io = "commons-io" % "commons-io" % "1.4" % "compile"
val rabbit = "com.rabbitmq" % "amqp-client" % "1.8.0" % "compile"
val rabbit = "com.rabbitmq" % "amqp-client" % "1.8.1" % "compile"
// testing
val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "test" intransitive()
@ -250,7 +247,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
class AkkaRedisProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val redis = "com.redis" % "redisclient" % "2.8.0.RC3-1.4" % "compile"
val redis = "com.redis" % "redisclient" % "2.8.0-1.4" % "compile"
val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile"
override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
}
@ -304,13 +301,6 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val jta_spec = "org.apache.geronimo.specs" % "geronimo-jta_1.1_spec" % "1.1.1" % "compile" intransitive()
}
// ================= TEST ==================
class AkkaActiveObjectTestProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with CodeFellowPlugin {
// testing
val junit = "junit" % "junit" % "4.5" % "test"
val jmock = "org.jmock" % "jmock" % "2.4.0" % "test"
}
// ================= EXAMPLES ==================
class AkkaSampleAntsProject(info: ProjectInfo) extends DefaultSpdeProject(info) with CodeFellowPlugin {
val scalaToolsSnapshots = ScalaToolsSnapshots
@ -339,10 +329,20 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
class AkkaSampleCamelProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) with CodeFellowPlugin {
val spring_jms = "org.springframework" % "spring-jms" % SPRING_VERSION % "compile"
val camel_jetty = "org.apache.camel" % "camel-jetty" % CAMEL_VERSION % "compile"
val camel_jms = "org.apache.camel" % "camel-jms" % CAMEL_VERSION % "compile"
val activemq_core = "org.apache.activemq" % "activemq-core" % "5.3.2" % "compile"
override def ivyXML =
<dependencies>
<dependency org="org.springframework" name="spring-jms" rev={SPRING_VERSION}>
</dependency>
<dependency org="org.apache.geronimo.specs" name="geronimo-servlet_2.5_spec" rev="1.1.1">
</dependency>
<dependency org="org.apache.camel" name="camel-jetty" rev={CAMEL_VERSION}>
<exclude module="geronimo-servlet_2.4_spec"/>
</dependency>
<dependency org="org.apache.camel" name="camel-jms" rev={CAMEL_VERSION}>
</dependency>
<dependency org="org.apache.activemq" name="activemq-core" rev="5.3.2">
</dependency>
</dependencies>
}
class AkkaSampleSecurityProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) with CodeFellowPlugin {