diff --git a/.gitignore b/.gitignore index 55bcf5ee42..287430d4e5 100755 --- a/.gitignore +++ b/.gitignore @@ -32,6 +32,7 @@ tm.out *.iws *.ipr *.iml +run-codefellow .project .settings .classpath diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index be3556e812..8605401dbd 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -10,8 +10,6 @@ import se.scalablesolutions.akka.config.OneForOneStrategy import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory} import java.lang.IllegalArgumentException import se.scalablesolutions.akka.util.Logging -import se.scalablesolutions.akka.serialization.Serializer - /** * AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors. * @@ -20,7 +18,6 @@ import se.scalablesolutions.akka.serialization.Serializer * @author Irmo Manie */ object AMQP { - case class ConnectionParameters( host: String = ConnectionFactory.DEFAULT_HOST, port: Int = ConnectionFactory.DEFAULT_AMQP_PORT, @@ -57,7 +54,6 @@ object AMQP { queueExclusive: Boolean = false, selfAcknowledging: Boolean = true, channelParameters: Option[ChannelParameters] = None) { - if (queueDurable && queueName.isEmpty) { throw new IllegalArgumentException("A queue name is required when requesting a durable queue.") } @@ -84,27 +80,25 @@ object AMQP { consumer } - def newRpcClient(connection: ActorRef, + def newRpcClient[O,I](connection: ActorRef, exchangeParameters: ExchangeParameters, routingKey: String, - inSerializer: Serializer, - outSerializer: Serializer, + serializer: RpcClientSerializer[O,I], channelParameters: Option[ChannelParameters] = None): ActorRef = { - val rpcActor: ActorRef = actorOf(new RpcClientActor(exchangeParameters, routingKey, inSerializer, outSerializer, channelParameters)) + val rpcActor: ActorRef = actorOf(new RpcClientActor[O,I](exchangeParameters, routingKey, serializer, channelParameters)) connection.startLink(rpcActor) rpcActor ! Start rpcActor } - def newRpcServer(connection: ActorRef, - exchangeParameters: ExchangeParameters, - routingKey: String, - inSerializer: Serializer, - outSerializer: Serializer, - requestHandler: PartialFunction[AnyRef, AnyRef], - channelParameters: Option[ChannelParameters] = None) = { + def newRpcServer[I,O](connection: ActorRef, + exchangeParameters: ExchangeParameters, + routingKey: String, + serializer: RpcServerSerializer[I,O], + requestHandler: PartialFunction[I, O], + channelParameters: Option[ChannelParameters] = None) = { val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) - val rpcServer = actorOf(new RpcServerActor(producer, inSerializer, outSerializer, requestHandler)) + val rpcServer = actorOf(new RpcServerActor[I,O](producer, serializer, requestHandler)) val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer , channelParameters = channelParameters , selfAcknowledging = false)) @@ -133,4 +127,17 @@ object AMQP { connectionActor } } + + trait FromBinary[T] { + def fromBinary(bytes: Array[Byte]): T + } + + trait ToBinary[T] { + def toBinary(t: T): Array[Byte] + } + + + case class RpcClientSerializer[O,I](toBinary: ToBinary[O], fromBinary: FromBinary[I]) + + case class RpcServerSerializer[I,O](fromBinary: FromBinary[I], toBinary: ToBinary[O]) } diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/ExampleSession.scala index 8049eb74ab..97571e2783 100644 --- a/akka-amqp/src/main/scala/ExampleSession.scala +++ b/akka-amqp/src/main/scala/ExampleSession.scala @@ -8,10 +8,10 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRegistry} import Actor._ import java.util.concurrent.{CountDownLatch, TimeUnit} import se.scalablesolutions.akka.amqp.AMQP._ -import se.scalablesolutions.akka.serialization.Serializer -import java.lang.Class +import java.lang.String object ExampleSession { + def main(args: Array[String]) = { println("==== DIRECT ===") direct @@ -97,6 +97,7 @@ object ExampleSession { } def callback = { + val channelCountdown = new CountDownLatch(2) val connectionCallback = actor { @@ -129,21 +130,36 @@ object ExampleSession { } def rpc = { + val connection = AMQP.newConnection() val exchangeParameters = ExchangeParameters("my_rpc_exchange", ExchangeType.Topic) - val stringSerializer = new Serializer { - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]) = new String(bytes) - def toBinary(obj: AnyRef) = obj.asInstanceOf[String].getBytes + /** Server */ + val serverFromBinary = new FromBinary[String] { + def fromBinary(bytes: Array[Byte]) = new String(bytes) } + val serverToBinary = new ToBinary[Int] { + def toBinary(t: Int) = Array(t.toByte) + } + val rpcServerSerializer = new RpcServerSerializer[String, Int](serverFromBinary, serverToBinary) - val rpcServer = AMQP.newRpcServer(connection, exchangeParameters, "rpc.in.key", stringSerializer, stringSerializer, { - case "rpc_request" => "rpc_response" + val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, { + case "rpc_request" => 3 case _ => error("unknown request") }) - val rpcClient = AMQP.newRpcClient(connection, exchangeParameters, "rpc.in.key", stringSerializer, stringSerializer) + + /** Client */ + val clientToBinary = new ToBinary[String] { + def toBinary(t: String) = t.getBytes + } + val clientFromBinary = new FromBinary[Int] { + def fromBinary(bytes: Array[Byte]) = bytes.head.toInt + } + val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary) + + val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.in.key", rpcClientSerializer) val response = (rpcClient !! "rpc_request") log.info("Response: " + response) diff --git a/akka-amqp/src/main/scala/RpcClientActor.scala b/akka-amqp/src/main/scala/RpcClientActor.scala index 8ff7d8a0ac..ba85005777 100644 --- a/akka-amqp/src/main/scala/RpcClientActor.scala +++ b/akka-amqp/src/main/scala/RpcClientActor.scala @@ -6,12 +6,13 @@ package se.scalablesolutions.akka.amqp import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameters} -import com.rabbitmq.client.{Channel, RpcClient} -class RpcClientActor(exchangeParameters: ExchangeParameters, +import com.rabbitmq.client.{Channel, RpcClient} +import se.scalablesolutions.akka.amqp.AMQP.{RpcClientSerializer, ChannelParameters, ExchangeParameters} + +class RpcClientActor[I,O](exchangeParameters: ExchangeParameters, routingKey: String, - inSerializer: Serializer, - outSerializer: Serializer, + serializer: RpcClientSerializer[I,O], channelParameters: Option[ChannelParameters] = None) extends FaultTolerantChannelActor(exchangeParameters, channelParameters) { import exchangeParameters._ @@ -21,29 +22,22 @@ class RpcClientActor(exchangeParameters: ExchangeParameters, log.info("%s started", this) def specificMessageHandler = { - case payload: AnyRef => { - + case payload: I => { rpcClient match { case Some(client) => - val response: Array[Byte] = client.primitiveCall(inSerializer.toBinary(payload)) - self.reply(outSerializer.fromBinary(response, None)) + val response: Array[Byte] = client.primitiveCall(serializer.toBinary.toBinary(payload)) + self.reply(serializer.fromBinary.fromBinary(response)) case None => error("%s has no client to send messages with".format(this)) } } } - protected def setupChannel(ch: Channel) = { - rpcClient = Some(new RpcClient(ch, exchangeName, routingKey)) - } + protected def setupChannel(ch: Channel) = rpcClient = Some(new RpcClient(ch, exchangeName, routingKey)) override def preRestart(reason: Throwable) = { rpcClient = None super.preRestart(reason) } - - override def toString(): String = - "AMQP.RpcClient[exchange=" +exchangeName + - ", routingKey=" + routingKey+ "]" - + override def toString = "AMQP.RpcClient[exchange=" +exchangeName + ", routingKey=" + routingKey+ "]" } \ No newline at end of file diff --git a/akka-amqp/src/main/scala/RpcServerActor.scala b/akka-amqp/src/main/scala/RpcServerActor.scala index fa760edda8..897c041c69 100644 --- a/akka-amqp/src/main/scala/RpcServerActor.scala +++ b/akka-amqp/src/main/scala/RpcServerActor.scala @@ -6,9 +6,9 @@ package se.scalablesolutions.akka.amqp import se.scalablesolutions.akka.actor.{ActorRef, Actor} import com.rabbitmq.client.AMQP.BasicProperties -import se.scalablesolutions.akka.serialization.Serializer +import se.scalablesolutions.akka.amqp.AMQP.RpcServerSerializer -class RpcServerActor(producer: ActorRef, inSerializer: Serializer, outSerializer: Serializer, requestHandler: PartialFunction[AnyRef, AnyRef]) extends Actor { +class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: PartialFunction[I, O]) extends Actor { log.info("%s started", this) @@ -16,8 +16,8 @@ class RpcServerActor(producer: ActorRef, inSerializer: Serializer, outSerializer case Delivery(payload, _, tag, props, sender) => { log.debug("%s handling delivery with tag %d", this, tag) - val request = inSerializer.fromBinary(payload, None) - val response: Array[Byte] = outSerializer.toBinary(requestHandler(request)) + val request = serializer.fromBinary.fromBinary(payload) + val response: Array[Byte] = serializer.toBinary.toBinary(requestHandler(request)) log.debug("%s sending reply to %s", this, props.getReplyTo) val replyProps = new BasicProperties diff --git a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala index eebcfccce3..7dbfb4becd 100644 --- a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala +++ b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala @@ -11,8 +11,8 @@ import se.scalablesolutions.akka.amqp._ import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import java.util.concurrent.{CountDownLatch, TimeUnit} -import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters} import se.scalablesolutions.akka.serialization.Serializer +import se.scalablesolutions.akka.amqp.AMQP._ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging { @@ -29,23 +29,34 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging } val exchangeParameters = ExchangeParameters("text_topic_exchange", ExchangeType.Topic) - val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) - val stringSerializer = new Serializer { - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]) = new String(bytes) - def toBinary(obj: AnyRef) = obj.asInstanceOf[String].getBytes - } + val channelParameters = ChannelParameters(channelCallback + = Some(channelCallback)) - val rpcServer = AMQP.newRpcServer(connection, exchangeParameters, "rpc.routing", stringSerializer, stringSerializer, { - case "some_payload" => "some_result" - case _ => error("Unhandled message") + val serverFromBinary = new FromBinary[String] { + def fromBinary(bytes: Array[Byte]) = new String(bytes) + } + val serverToBinary = new ToBinary[Int] { + def toBinary(t: Int) = Array(t.toByte) + } + val rpcServerSerializer = new RpcServerSerializer[String, Int](serverFromBinary, serverToBinary) + val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, { + case "some_payload" => 3 + case _ => error("unknown request") }, channelParameters = Some(channelParameters)) - val rpcClient = AMQP.newRpcClient(connection, exchangeParameters, "rpc.routing", stringSerializer, stringSerializer - , channelParameters = Some(channelParameters)) + val clientToBinary = new ToBinary[String] { + def toBinary(t: String) = t.getBytes + } + val clientFromBinary = new FromBinary[Int] { + def fromBinary(bytes: Array[Byte]) = bytes.head.toInt + } + val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary) + val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer, + channelParameters = Some(channelParameters)) countDown.await(2, TimeUnit.SECONDS) must be (true) val response = rpcClient !! "some_payload" - response must be (Some("some_result")) + response must be (Some(3)) } finally { connection.stop } diff --git a/akka-camel/src/main/scala/component/ActorComponent.scala b/akka-camel/src/main/scala/component/ActorComponent.scala index 696588fcec..e267fcd077 100644 --- a/akka-camel/src/main/scala/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/component/ActorComponent.scala @@ -22,6 +22,7 @@ import se.scalablesolutions.akka.stm.TransactionConfig import scala.reflect.BeanProperty import CamelMessageConversion.toExchangeAdapter +import java.lang.Throwable /** * Camel component for sending messages to and receiving replies from actors. @@ -250,8 +251,8 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall def supervisor: Option[ActorRef] = unsupported protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](message: Any, timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]) = unsupported protected[akka] def mailbox: Deque[MessageInvocation] = unsupported - protected[akka] def restart(reason: Throwable): Unit = unsupported - protected[akka] def restartLinkedActors(reason: Throwable): Unit = unsupported + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported protected[akka] def linkedActors: JavaMap[String, ActorRef] = unsupported protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported diff --git a/akka-core/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java b/akka-core/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java index 648f53842b..9af73c6c77 100644 --- a/akka-core/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java +++ b/akka-core/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java @@ -667,29 +667,36 @@ public final class RemoteProtocol { public boolean hasTimeout() { return hasTimeout; } public long getTimeout() { return timeout_; } - // optional .LifeCycleProtocol lifeCycle = 9; - public static final int LIFECYCLE_FIELD_NUMBER = 9; + // optional uint64 receiveTimeout = 9; + public static final int RECEIVETIMEOUT_FIELD_NUMBER = 9; + private boolean hasReceiveTimeout; + private long receiveTimeout_ = 0L; + public boolean hasReceiveTimeout() { return hasReceiveTimeout; } + public long getReceiveTimeout() { return receiveTimeout_; } + + // optional .LifeCycleProtocol lifeCycle = 10; + public static final int LIFECYCLE_FIELD_NUMBER = 10; private boolean hasLifeCycle; private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol lifeCycle_; public boolean hasLifeCycle() { return hasLifeCycle; } public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol getLifeCycle() { return lifeCycle_; } - // optional .RemoteActorRefProtocol supervisor = 10; - public static final int SUPERVISOR_FIELD_NUMBER = 10; + // optional .RemoteActorRefProtocol supervisor = 11; + public static final int SUPERVISOR_FIELD_NUMBER = 11; private boolean hasSupervisor; private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol supervisor_; public boolean hasSupervisor() { return hasSupervisor; } public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol getSupervisor() { return supervisor_; } - // optional bytes hotswapStack = 11; - public static final int HOTSWAPSTACK_FIELD_NUMBER = 11; + // optional bytes hotswapStack = 12; + public static final int HOTSWAPSTACK_FIELD_NUMBER = 12; private boolean hasHotswapStack; private com.google.protobuf.ByteString hotswapStack_ = com.google.protobuf.ByteString.EMPTY; public boolean hasHotswapStack() { return hasHotswapStack; } public com.google.protobuf.ByteString getHotswapStack() { return hotswapStack_; } - // repeated .RemoteRequestProtocol messages = 12; - public static final int MESSAGES_FIELD_NUMBER = 12; + // repeated .RemoteRequestProtocol messages = 13; + public static final int MESSAGES_FIELD_NUMBER = 13; private java.util.List messages_ = java.util.Collections.emptyList(); public java.util.List getMessagesList() { @@ -750,17 +757,20 @@ public final class RemoteProtocol { if (hasTimeout()) { output.writeUInt64(8, getTimeout()); } + if (hasReceiveTimeout()) { + output.writeUInt64(9, getReceiveTimeout()); + } if (hasLifeCycle()) { - output.writeMessage(9, getLifeCycle()); + output.writeMessage(10, getLifeCycle()); } if (hasSupervisor()) { - output.writeMessage(10, getSupervisor()); + output.writeMessage(11, getSupervisor()); } if (hasHotswapStack()) { - output.writeBytes(11, getHotswapStack()); + output.writeBytes(12, getHotswapStack()); } for (se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol element : getMessagesList()) { - output.writeMessage(12, element); + output.writeMessage(13, element); } getUnknownFields().writeTo(output); } @@ -803,21 +813,25 @@ public final class RemoteProtocol { size += com.google.protobuf.CodedOutputStream .computeUInt64Size(8, getTimeout()); } + if (hasReceiveTimeout()) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(9, getReceiveTimeout()); + } if (hasLifeCycle()) { size += com.google.protobuf.CodedOutputStream - .computeMessageSize(9, getLifeCycle()); + .computeMessageSize(10, getLifeCycle()); } if (hasSupervisor()) { size += com.google.protobuf.CodedOutputStream - .computeMessageSize(10, getSupervisor()); + .computeMessageSize(11, getSupervisor()); } if (hasHotswapStack()) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(11, getHotswapStack()); + .computeBytesSize(12, getHotswapStack()); } for (se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol element : getMessagesList()) { size += com.google.protobuf.CodedOutputStream - .computeMessageSize(12, element); + .computeMessageSize(13, element); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -1005,6 +1019,9 @@ public final class RemoteProtocol { if (other.hasTimeout()) { setTimeout(other.getTimeout()); } + if (other.hasReceiveTimeout()) { + setReceiveTimeout(other.getReceiveTimeout()); + } if (other.hasLifeCycle()) { mergeLifeCycle(other.getLifeCycle()); } @@ -1082,7 +1099,11 @@ public final class RemoteProtocol { setTimeout(input.readUInt64()); break; } - case 74: { + case 72: { + setReceiveTimeout(input.readUInt64()); + break; + } + case 82: { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol.newBuilder(); if (hasLifeCycle()) { subBuilder.mergeFrom(getLifeCycle()); @@ -1091,7 +1112,7 @@ public final class RemoteProtocol { setLifeCycle(subBuilder.buildPartial()); break; } - case 82: { + case 90: { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.newBuilder(); if (hasSupervisor()) { subBuilder.mergeFrom(getSupervisor()); @@ -1100,11 +1121,11 @@ public final class RemoteProtocol { setSupervisor(subBuilder.buildPartial()); break; } - case 90: { + case 98: { setHotswapStack(input.readBytes()); break; } - case 98: { + case 106: { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.newBuilder(); input.readMessage(subBuilder, extensionRegistry); addMessages(subBuilder.buildPartial()); @@ -1293,7 +1314,25 @@ public final class RemoteProtocol { return this; } - // optional .LifeCycleProtocol lifeCycle = 9; + // optional uint64 receiveTimeout = 9; + public boolean hasReceiveTimeout() { + return result.hasReceiveTimeout(); + } + public long getReceiveTimeout() { + return result.getReceiveTimeout(); + } + public Builder setReceiveTimeout(long value) { + result.hasReceiveTimeout = true; + result.receiveTimeout_ = value; + return this; + } + public Builder clearReceiveTimeout() { + result.hasReceiveTimeout = false; + result.receiveTimeout_ = 0L; + return this; + } + + // optional .LifeCycleProtocol lifeCycle = 10; public boolean hasLifeCycle() { return result.hasLifeCycle(); } @@ -1330,7 +1369,7 @@ public final class RemoteProtocol { return this; } - // optional .RemoteActorRefProtocol supervisor = 10; + // optional .RemoteActorRefProtocol supervisor = 11; public boolean hasSupervisor() { return result.hasSupervisor(); } @@ -1367,7 +1406,7 @@ public final class RemoteProtocol { return this; } - // optional bytes hotswapStack = 11; + // optional bytes hotswapStack = 12; public boolean hasHotswapStack() { return result.hasHotswapStack(); } @@ -1388,7 +1427,7 @@ public final class RemoteProtocol { return this; } - // repeated .RemoteRequestProtocol messages = 12; + // repeated .RemoteRequestProtocol messages = 13; public java.util.List getMessagesList() { return java.util.Collections.unmodifiableList(result.messages_); } @@ -4210,40 +4249,40 @@ public final class RemoteProtocol { "\n\024RemoteProtocol.proto\"v\n\026RemoteActorRef" + "Protocol\022\014\n\004uuid\030\001 \002(\t\022\026\n\016actorClassname" + "\030\002 \002(\t\022%\n\013homeAddress\030\003 \002(\0132\020.AddressPro" + - "tocol\022\017\n\007timeout\030\004 \001(\004\"\350\002\n\032SerializedAct" + + "tocol\022\017\n\007timeout\030\004 \001(\004\"\200\003\n\032SerializedAct" + "orRefProtocol\022\014\n\004uuid\030\001 \002(\t\022\n\n\002id\030\002 \002(\t\022" + "\026\n\016actorClassname\030\003 \002(\t\022)\n\017originalAddre" + "ss\030\004 \002(\0132\020.AddressProtocol\022\025\n\ractorInsta" + "nce\030\005 \001(\014\022\033\n\023serializerClassname\030\006 \001(\t\022\024" + - "\n\014isTransactor\030\007 \001(\010\022\017\n\007timeout\030\010 \001(\004\022%\n" + - "\tlifeCycle\030\t \001(\0132\022.LifeCycleProtocol\022+\n\n", - "supervisor\030\n \001(\0132\027.RemoteActorRefProtoco" + - "l\022\024\n\014hotswapStack\030\013 \001(\014\022(\n\010messages\030\014 \003(" + - "\0132\026.RemoteRequestProtocol\"r\n\017MessageProt" + - "ocol\0225\n\023serializationScheme\030\001 \002(\0162\030.Seri" + - "alizationSchemeType\022\017\n\007message\030\002 \002(\014\022\027\n\017" + - "messageManifest\030\003 \001(\014\"\374\001\n\025RemoteRequestP" + - "rotocol\022\n\n\002id\030\001 \002(\004\022!\n\007message\030\002 \002(\0132\020.M" + - "essageProtocol\022\016\n\006method\030\003 \001(\t\022\016\n\006target" + - "\030\004 \002(\t\022\014\n\004uuid\030\005 \002(\t\022\017\n\007timeout\030\006 \002(\004\022\026\n" + - "\016supervisorUuid\030\007 \001(\t\022\017\n\007isActor\030\010 \002(\010\022\020", - "\n\010isOneWay\030\t \002(\010\022\021\n\tisEscaped\030\n \002(\010\022\'\n\006s" + - "ender\030\013 \001(\0132\027.RemoteActorRefProtocol\"\252\001\n" + - "\023RemoteReplyProtocol\022\n\n\002id\030\001 \002(\004\022!\n\007mess" + - "age\030\002 \001(\0132\020.MessageProtocol\022%\n\texception" + - "\030\003 \001(\0132\022.ExceptionProtocol\022\026\n\016supervisor" + - "Uuid\030\004 \001(\t\022\017\n\007isActor\030\005 \002(\010\022\024\n\014isSuccess" + - "ful\030\006 \002(\010\"_\n\021LifeCycleProtocol\022!\n\tlifeCy" + - "cle\030\001 \002(\0162\016.LifeCycleType\022\022\n\npreRestart\030" + - "\002 \001(\t\022\023\n\013postRestart\030\003 \001(\t\"1\n\017AddressPro" + - "tocol\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n", - "\021ExceptionProtocol\022\021\n\tclassname\030\001 \002(\t\022\017\n" + - "\007message\030\002 \002(\t*]\n\027SerializationSchemeTyp" + - "e\022\010\n\004JAVA\020\001\022\013\n\007SBINARY\020\002\022\016\n\nSCALA_JSON\020\003" + - "\022\r\n\tJAVA_JSON\020\004\022\014\n\010PROTOBUF\020\005*-\n\rLifeCyc" + - "leType\022\r\n\tPERMANENT\020\001\022\r\n\tTEMPORARY\020\002B-\n)" + - "se.scalablesolutions.akka.remote.protoco" + - "lH\001" + "\n\014isTransactor\030\007 \001(\010\022\017\n\007timeout\030\010 \001(\004\022\026\n" + + "\016receiveTimeout\030\t \001(\004\022%\n\tlifeCycle\030\n \001(\013", + "2\022.LifeCycleProtocol\022+\n\nsupervisor\030\013 \001(\013" + + "2\027.RemoteActorRefProtocol\022\024\n\014hotswapStac" + + "k\030\014 \001(\014\022(\n\010messages\030\r \003(\0132\026.RemoteReques" + + "tProtocol\"r\n\017MessageProtocol\0225\n\023serializ" + + "ationScheme\030\001 \002(\0162\030.SerializationSchemeT" + + "ype\022\017\n\007message\030\002 \002(\014\022\027\n\017messageManifest\030" + + "\003 \001(\014\"\374\001\n\025RemoteRequestProtocol\022\n\n\002id\030\001 " + + "\002(\004\022!\n\007message\030\002 \002(\0132\020.MessageProtocol\022\016" + + "\n\006method\030\003 \001(\t\022\016\n\006target\030\004 \002(\t\022\014\n\004uuid\030\005" + + " \002(\t\022\017\n\007timeout\030\006 \002(\004\022\026\n\016supervisorUuid\030", + "\007 \001(\t\022\017\n\007isActor\030\010 \002(\010\022\020\n\010isOneWay\030\t \002(\010" + + "\022\021\n\tisEscaped\030\n \002(\010\022\'\n\006sender\030\013 \001(\0132\027.Re" + + "moteActorRefProtocol\"\252\001\n\023RemoteReplyProt" + + "ocol\022\n\n\002id\030\001 \002(\004\022!\n\007message\030\002 \001(\0132\020.Mess" + + "ageProtocol\022%\n\texception\030\003 \001(\0132\022.Excepti" + + "onProtocol\022\026\n\016supervisorUuid\030\004 \001(\t\022\017\n\007is" + + "Actor\030\005 \002(\010\022\024\n\014isSuccessful\030\006 \002(\010\"_\n\021Lif" + + "eCycleProtocol\022!\n\tlifeCycle\030\001 \002(\0162\016.Life" + + "CycleType\022\022\n\npreRestart\030\002 \001(\t\022\023\n\013postRes" + + "tart\030\003 \001(\t\"1\n\017AddressProtocol\022\020\n\010hostnam", + "e\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021ExceptionProtoc" + + "ol\022\021\n\tclassname\030\001 \002(\t\022\017\n\007message\030\002 \002(\t*]" + + "\n\027SerializationSchemeType\022\010\n\004JAVA\020\001\022\013\n\007S" + + "BINARY\020\002\022\016\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022" + + "\014\n\010PROTOBUF\020\005*-\n\rLifeCycleType\022\r\n\tPERMAN" + + "ENT\020\001\022\r\n\tTEMPORARY\020\002B-\n)se.scalablesolut" + + "ions.akka.remote.protocolH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -4263,7 +4302,7 @@ public final class RemoteProtocol { internal_static_SerializedActorRefProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SerializedActorRefProtocol_descriptor, - new java.lang.String[] { "Uuid", "Id", "ActorClassname", "OriginalAddress", "ActorInstance", "SerializerClassname", "IsTransactor", "Timeout", "LifeCycle", "Supervisor", "HotswapStack", "Messages", }, + new java.lang.String[] { "Uuid", "Id", "ActorClassname", "OriginalAddress", "ActorInstance", "SerializerClassname", "IsTransactor", "Timeout", "ReceiveTimeout", "LifeCycle", "Supervisor", "HotswapStack", "Messages", }, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.Builder.class); internal_static_MessageProtocol_descriptor = diff --git a/akka-core/src/main/protocol/RemoteProtocol.proto b/akka-core/src/main/protocol/RemoteProtocol.proto index 6d8b8995f4..c4e5a8157e 100644 --- a/akka-core/src/main/protocol/RemoteProtocol.proto +++ b/akka-core/src/main/protocol/RemoteProtocol.proto @@ -36,10 +36,11 @@ message SerializedActorRefProtocol { optional string serializerClassname = 6; optional bool isTransactor = 7; optional uint64 timeout = 8; - optional LifeCycleProtocol lifeCycle = 9; - optional RemoteActorRefProtocol supervisor = 10; - optional bytes hotswapStack = 11; - repeated RemoteRequestProtocol messages = 12; + optional uint64 receiveTimeout = 9; + optional LifeCycleProtocol lifeCycle = 10; + optional RemoteActorRefProtocol supervisor = 11; + optional bytes hotswapStack = 12; + repeated RemoteRequestProtocol messages = 13; } /** diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index a545f9f633..b83816f4d2 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -474,9 +474,9 @@ object ActiveObject extends Logging { val parent = clazz.getSuperclass if (parent != null) injectActiveObjectContext0(activeObject, parent) else { - log.trace( - "Can't set 'ActiveObjectContext' for ActiveObject [%s] since no field of this type could be found.", - activeObject.getClass.getName) + log.ifTrace("Can't set 'ActiveObjectContext' for ActiveObject [" + + activeObject.getClass.getName + + "] since no field of this type could be found.") None } } @@ -486,7 +486,6 @@ object ActiveObject extends Logging { private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor = Supervisor(SupervisorConfig(restartStrategy, components)) - } private[akka] object AspectInitRegistry extends ListenerManagement { @@ -634,11 +633,12 @@ private[akka] sealed class ActiveObjectAspect { joinPoint: JoinPoint, isOneWay: Boolean, isVoid: Boolean, sender: AnyRef, senderFuture: CompletableFuture[Any]) { override def toString: String = synchronized { - "Invocation [joinPoint: " + joinPoint.toString + - ", isOneWay: " + isOneWay + - ", isVoid: " + isVoid + - ", sender: " + sender + - ", senderFuture: " + senderFuture + + "Invocation [" + + "\n\t\tmethod = " + joinPoint.getRtti.asInstanceOf[MethodRtti].getMethod.getName + " @ " + joinPoint.getTarget.getClass.getName + + "\n\t\tisOneWay = " + isOneWay + + "\n\t\tisVoid = " + isVoid + + "\n\t\tsender = " + sender + + "\n\t\tsenderFuture = " + senderFuture + "]" } @@ -687,8 +687,6 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, private var context: Option[ActiveObjectContext] = None private var targetClass:Class[_] = _ - - def this(transactionalRequired: Boolean) = this(transactionalRequired,None) private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef, ctx: Option[ActiveObjectContext]) = { @@ -701,6 +699,8 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, context = ctx val methods = targetInstance.getClass.getDeclaredMethods.toList + if (self.lifeCycle.isEmpty) self.lifeCycle = Some(LifeCycle(Permanent)) + // See if we have any config define restart callbacks restartCallbacks match { case None => {} @@ -758,14 +758,14 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, } def receive = { - case Invocation(joinPoint, isOneWay, _, sender, senderFuture) => + case invocation @ Invocation(joinPoint, isOneWay, _, sender, senderFuture) => + ActiveObject.log.ifTrace("Invoking active object with message:\n" + invocation) context.foreach { ctx => if (sender ne null) ctx._sender = sender if (senderFuture ne null) ctx._senderFuture = senderFuture } ActiveObjectContext.sender.value = joinPoint.getThis // set next sender self.senderFuture.foreach(ActiveObjectContext.senderFuture.value = _) - if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint) if (isOneWay) joinPoint.proceed else self.reply(joinPoint.proceed) @@ -773,61 +773,53 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, // Jan Kronquist: started work on issue 121 case Link(target) => self.link(target) case Unlink(target) => self.unlink(target) - case unexpected => - throw new IllegalActorStateException("Unexpected message [" + unexpected + "] sent to [" + this + "]") + case unexpected => throw new IllegalActorStateException( + "Unexpected message [" + unexpected + "] sent to [" + this + "]") } override def preRestart(reason: Throwable) { try { - // Since preRestart is called we know that this dispatcher - // is about to be restarted. Put the instance in a thread - // local so the new dispatcher can be initialized with the contents of the - // old. - //FIXME - This should be considered as a workaround. - crashedActorTl.set(this) - if (preRestart.isDefined) preRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) + // Since preRestart is called we know that this dispatcher + // is about to be restarted. Put the instance in a thread + // local so the new dispatcher can be initialized with the + // contents of the old. + //FIXME - This should be considered as a workaround. + crashedActorTl.set(this) + preRestart.foreach(_.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)) } catch { case e: InvocationTargetException => throw e.getCause } } override def postRestart(reason: Throwable) { try { - - if (postRestart.isDefined) { - postRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) - } + postRestart.foreach(_.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)) } catch { case e: InvocationTargetException => throw e.getCause } } override def init = { - // Get the crashed dispatcher from thread local and intitialize this actor with the - // contents of the old dispatcher - val oldActor = crashedActorTl.get(); - if(oldActor != null) { - initialize(oldActor.targetClass,oldActor.target.get,oldActor.context) - crashedActorTl.set(null) - } + // Get the crashed dispatcher from thread local and intitialize this actor with the + // contents of the old dispatcher + val oldActor = crashedActorTl.get(); + if (oldActor != null) { + initialize(oldActor.targetClass, oldActor.target.get, oldActor.context) + crashedActorTl.set(null) + } } override def shutdown = { try { - if (zhutdown.isDefined) { - zhutdown.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) - } - } catch { - case e: InvocationTargetException => throw e.getCause - } finally { + zhutdown.foreach(_.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)) + } catch { case e: InvocationTargetException => throw e.getCause + } finally { AspectInitRegistry.unregister(target.get); } } override def initTransactionalState = { - try { + try { if (initTxState.isDefined && target.isDefined) initTxState.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) } catch { case e: InvocationTargetException => throw e.getCause } } - - private def serializeArguments(joinPoint: JoinPoint) = { val args = joinPoint.getRtti.asInstanceOf[MethodRtti].getParameterValues var unserializable = false diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index e1227168b2..76adf9c729 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -46,8 +46,9 @@ case class Exit(dead: ActorRef, killer: Throwable) extends LifeCycleMessage case class Link(child: ActorRef) extends LifeCycleMessage case class Unlink(child: ActorRef) extends LifeCycleMessage case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage -case object Kill extends LifeCycleMessage case object ReceiveTimeout extends LifeCycleMessage +case class MaximumNumberOfRestartsWithinTimeRangeReached( + victim: ActorRef, maxNrOfRetries: Int, withinTimeRange: Int, lastExceptionCausingRestart: Throwable) extends LifeCycleMessage // Exceptions for Actors class ActorStartException private[akka](message: String) extends RuntimeException(message) @@ -284,6 +285,7 @@ object Actor extends Logging { * @author Jonas Bonér */ trait Actor extends Logging { + /** * Type alias because traits cannot have companion objects. */ @@ -300,12 +302,12 @@ trait Actor extends Logging { Actor.actorRefInCreation.value = None if (ref.isEmpty) throw new ActorInitializationException( "ActorRef for instance of actor [" + getClass.getName + "] is not in scope." + - "\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." + - "\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." + - "\n\tEither use:" + - "\n\t\t'val actor = Actor.actorOf[MyActor]', or" + - "\n\t\t'val actor = Actor.actorOf(new MyActor(..))', or" + - "\n\t\t'val actor = Actor.actor { case msg => .. } }'") + "\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." + + "\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." + + "\n\tEither use:" + + "\n\t\t'val actor = Actor.actorOf[MyActor]', or" + + "\n\t\t'val actor = Actor.actorOf(new MyActor(..))', or" + + "\n\t\t'val actor = Actor.actor { case msg => .. } }'") else ref } @@ -426,12 +428,11 @@ trait Actor extends Logging { private val lifeCycles: Receive = { case HotSwap(code) => self.hotswap = code; self.checkReceiveTimeout // FIXME : how to reschedule receivetimeout on hotswap? - case Restart(reason) => self.restart(reason) case Exit(dead, reason) => self.handleTrapExit(dead, reason) case Link(child) => self.link(child) case Unlink(child) => self.unlink(child) case UnlinkAndStop(child) => self.unlink(child); child.stop - case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message") + case Restart(reason) => throw reason } } diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 2472ea924d..0d70643dc8 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -10,26 +10,27 @@ import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, F import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.stm.global._ import se.scalablesolutions.akka.stm.TransactionManagement._ -import se.scalablesolutions.akka.stm.TransactionManagement +import se.scalablesolutions.akka.stm.{TransactionManagement, TransactionSetAbortedException} import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, MessageSerializer, RemoteRequestProtocolIdFactory} import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util.{HashCode, Logging, UUID, ReentrantGuard} +import RemoteActorSerialization._ import org.multiverse.api.ThreadLocalTransaction._ import org.multiverse.commitbarriers.CountDownCommitBarrier - -import jsr166x.{Deque, ConcurrentLinkedDeque} +import org.multiverse.api.exceptions.DeadTransactionException import java.net.InetSocketAddress import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.{Map => JMap} import java.lang.reflect.Field -import RemoteActorSerialization._ + +import jsr166x.{Deque, ConcurrentLinkedDeque} import com.google.protobuf.ByteString -import java.util.concurrent.{ConcurrentHashMap, TimeUnit} /** * ActorRef is an immutable and serializable handle to an Actor. @@ -71,9 +72,7 @@ trait ActorRef extends TransactionManagement { @volatile protected[this] var _isShutDown = false @volatile protected[akka] var _isBeingRestarted = false @volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServer.HOSTNAME, RemoteServer.PORT) - @volatile protected[akka] var _timeoutActor: Option[ActorRef] = None - @volatile protected[akka] var startOnCreation = false @volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false protected[this] val guard = new ReentrantGuard @@ -99,12 +98,12 @@ trait ActorRef extends TransactionManagement { @volatile var timeout: Long = Actor.TIMEOUT /** - * User overridable callback/setting. - *

- * Defines the default timeout for an initial receive invocation. - * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. - */ - @volatile var receiveTimeout: Option[Long] = None + * User overridable callback/setting. + *

+ * Defines the default timeout for an initial receive invocation. + * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. + */ + @volatile var receiveTimeout: Option[Long] = None /** * User overridable callback/setting. @@ -166,12 +165,12 @@ trait ActorRef extends TransactionManagement { * The default is also that all actors that are created and spawned from within this actor * is sharing the same dispatcher as its creator. */ - private[akka] var _dispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher + @volatile private[akka] var _dispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher /** * Holds the hot swapped partial function. */ - protected[akka] var hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack + @volatile protected[akka] var hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack /** * User overridable callback/setting. @@ -184,12 +183,12 @@ trait ActorRef extends TransactionManagement { /** * Configuration for TransactionFactory. User overridable. */ - protected[akka] var _transactionConfig: TransactionConfig = DefaultGlobalTransactionConfig + @volatile protected[akka] var _transactionConfig: TransactionConfig = DefaultGlobalTransactionConfig /** * TransactionFactory to be used for atomic when isTransactor. Configuration is overridable. */ - private[akka] var _transactionFactory: Option[TransactionFactory] = None + @volatile private[akka] var _transactionFactory: Option[TransactionFactory] = None /** * This lock ensures thread safety in the dispatching: only one message can @@ -215,12 +214,10 @@ trait ActorRef extends TransactionManagement { * Is defined if the message was sent from another Actor, else None. */ def sender: Option[ActorRef] = { - //Five lines of map-performance-avoidance, could be just: currentMessage map { _.sender } + // Five lines of map-performance-avoidance, could be just: currentMessage map { _.sender } val msg = currentMessage - if(msg.isEmpty) - None - else - msg.get.sender + if(msg.isEmpty) None + else msg.get.sender } /** @@ -228,12 +225,10 @@ trait ActorRef extends TransactionManagement { * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ def senderFuture: Option[CompletableFuture[Any]] = { - //Five lines of map-performance-avoidance, could be just: currentMessage map { _.senderFuture } + // Five lines of map-performance-avoidance, could be just: currentMessage map { _.senderFuture } val msg = currentMessage - if(msg.isEmpty) - None - else - msg.get.senderFuture + if(msg.isEmpty) None + else msg.get.senderFuture } /** @@ -442,7 +437,7 @@ trait ActorRef extends TransactionManagement { /** * Starts up the actor and its message queue. */ - def start: ActorRef + def start(): ActorRef /** * Shuts down the actor its dispatcher and message queue. @@ -549,11 +544,11 @@ trait ActorRef extends TransactionManagement { protected[akka] def mailbox: Deque[MessageInvocation] - protected[akka] def restart(reason: Throwable): Unit - protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit - protected[akka] def restartLinkedActors(reason: Throwable): Unit + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit + + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit protected[akka] def registerSupervisorAsRemoteActor: Option[String] @@ -571,23 +566,19 @@ trait ActorRef extends TransactionManagement { override def toString = "Actor[" + id + ":" + uuid + "]" - protected[akka] def cancelReceiveTimeout = { - _timeoutActor.foreach { - x => - if (x.isRunning) Scheduler.unschedule(x) - _timeoutActor = None - log.debug("Timeout canceled for %s", this) - } - } - - protected [akka] def checkReceiveTimeout = { + protected[akka] def checkReceiveTimeout = { cancelReceiveTimeout - receiveTimeout.foreach { timeout => + receiveTimeout.foreach { time => log.debug("Scheduling timeout for %s", this) - _timeoutActor = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, timeout, TimeUnit.MILLISECONDS)) + _timeoutActor = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, time, TimeUnit.MILLISECONDS)) } } + protected[akka] def cancelReceiveTimeout = _timeoutActor.foreach { timeoutActor => + if (timeoutActor.isRunning) Scheduler.unschedule(timeoutActor) + _timeoutActor = None + log.debug("Timeout canceled for %s", this) + } } /** @@ -599,8 +590,24 @@ sealed class LocalActorRef private[akka]( private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None)) extends ActorRef { - private var isDeserialized = false - private var loader: Option[ClassLoader] = None + @volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None // only mutable to maintain identity across nodes + @volatile private[akka] var _linkedActors: Option[ConcurrentHashMap[String, ActorRef]] = None + @volatile private[akka] var _supervisor: Option[ActorRef] = None + @volatile private var isInInitialization = false + @volatile private var runActorInitialization = false + @volatile private var isDeserialized = false + @volatile private var loader: Option[ClassLoader] = None + @volatile private var maxNrOfRetriesCount: Int = 0 + @volatile private var restartsWithinTimeRangeTimestamp: Long = 0L + + protected[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation] + protected[this] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) } + + // Needed to be able to null out the 'val self: ActorRef' member variables to make the Actor + // instance elegible for garbage collection + private val actorSelfFields = findActorSelfField(actor.getClass) + + if (runActorInitialization && !isDeserialized) initializeActorInstance private[akka] def this(clazz: Class[_ <: Actor]) = this(Left(Some(clazz))) private[akka] def this(factory: () => Actor) = this(Right(Some(factory))) @@ -614,6 +621,7 @@ sealed class LocalActorRef private[akka]( __port: Int, __isTransactor: Boolean, __timeout: Long, + __receiveTimeout: Option[Long], __lifeCycle: Option[LifeCycle], __supervisor: Option[ActorRef], __hotswap: Option[PartialFunction[Any, Unit]], @@ -635,6 +643,7 @@ sealed class LocalActorRef private[akka]( homeAddress = (__hostname, __port) isTransactor = __isTransactor timeout = __timeout + receiveTimeout = __receiveTimeout lifeCycle = __lifeCycle _supervisor = __supervisor hotswap = __hotswap @@ -643,30 +652,11 @@ sealed class LocalActorRef private[akka]( actorSelfFields._3.set(actor, Some(this)) start __messages.foreach(message => this ! MessageSerializer.deserialize(message.getMessage)) + checkReceiveTimeout ActorRegistry.register(this) } - // Only mutable for RemoteServer in order to maintain identity across nodes - @volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None - @volatile private[akka] var _linkedActors: Option[ConcurrentHashMap[String, ActorRef]] = None - @volatile private[akka] var _supervisor: Option[ActorRef] = None - - protected[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation] - protected[this] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) } - - @volatile private var isInInitialization = false - @volatile private var runActorInitialization = false - - // Needed to be able to null out the 'val self: ActorRef' member variables to make the Actor - // instance elegible for garbage collection - private val actorSelfFields = findActorSelfField(actor.getClass) - - if (runActorInitialization && !isDeserialized) initializeActorInstance - - /** - * Returns the mailbox. - */ - def mailbox: Deque[MessageInvocation] = _mailbox + // ========= PUBLIC FUNCTIONS ========= /** * Returns the class for the Actor instance that is managed by the ActorRef. @@ -681,7 +671,7 @@ sealed class LocalActorRef private[akka]( /** * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. */ - def dispatcher_=(md: MessageDispatcher): Unit = guard.withGuard { + def dispatcher_=(md: MessageDispatcher): Unit = { if (!isRunning || isBeingRestarted) _dispatcher = md else throw new ActorInitializationException( "Can not swap dispatcher for " + toString + " after it has been started") @@ -690,7 +680,7 @@ sealed class LocalActorRef private[akka]( /** * Get the dispatcher for this actor. */ - def dispatcher: MessageDispatcher = guard.withGuard { _dispatcher } + def dispatcher: MessageDispatcher = _dispatcher /** * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. @@ -734,19 +724,19 @@ sealed class LocalActorRef private[akka]( /** * Get the transaction configuration for this actor. */ - def transactionConfig: TransactionConfig = guard.withGuard { _transactionConfig } + def transactionConfig: TransactionConfig = _transactionConfig /** * Set the contact address for this actor. This is used for replying to messages * sent asynchronously when no reply channel exists. */ - def homeAddress_=(address: InetSocketAddress): Unit = guard.withGuard { _homeAddress = address } + def homeAddress_=(address: InetSocketAddress): Unit = _homeAddress = address /** * Returns the remote address for the actor, if any, else None. */ - def remoteAddress: Option[InetSocketAddress] = guard.withGuard { _remoteAddress } - protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit = guard.withGuard { _remoteAddress = addr } + def remoteAddress: Option[InetSocketAddress] = _remoteAddress + protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit = _remoteAddress = addr /** * Starts up the actor and its message queue. @@ -897,6 +887,11 @@ sealed class LocalActorRef private[akka]( } } + /** + * Returns the mailbox. + */ + def mailbox: Deque[MessageInvocation] = _mailbox + /** * Returns the mailbox size. */ @@ -910,7 +905,7 @@ sealed class LocalActorRef private[akka]( /** * Shuts down and removes all linked actors. */ - def shutdownLinkedActors(): Unit = guard.withGuard { + def shutdownLinkedActors(): Unit = { linkedActorsAsList.foreach(_.stop) linkedActors.clear } @@ -918,41 +913,11 @@ sealed class LocalActorRef private[akka]( /** * Returns the supervisor, if there is one. */ - def supervisor: Option[ActorRef] = guard.withGuard { _supervisor } + def supervisor: Option[ActorRef] = _supervisor - protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = guard.withGuard { _supervisor = sup } + // ========= AKKA PROTECTED FUNCTIONS ========= - private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = guard.withGuard { - val actorRef = Actor.actorOf(manifest[T].erasure.asInstanceOf[Class[T]].newInstance) - if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) actorRef.dispatcher = dispatcher - actorRef - } - - private[this] def newActor: Actor = { - isInInitialization = true - Actor.actorRefInCreation.value = Some(this) - val actor = actorFactory match { - case Left(Some(clazz)) => - try { - clazz.newInstance - } catch { - case e: InstantiationException => throw new ActorInitializationException( - "Could not instantiate Actor due to:\n" + e + - "\nMake sure Actor is NOT defined inside a class/trait," + - "\nif so put it outside the class/trait, f.e. in a companion object," + - "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.") - } - case Right(Some(factory)) => - factory() - case _ => - throw new ActorInitializationException( - "Can't create Actor, no Actor class or factory function in scope") - } - if (actor eq null) throw new ActorInitializationException( - "Actor instance passed to ActorRef can not be 'null'") - isInInitialization = false - actor - } + protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { joinTransaction(message) @@ -992,84 +957,22 @@ sealed class LocalActorRef private[akka]( } } - private def joinTransaction(message: Any) = if (isTransactionSetInScope) { - import org.multiverse.api.ThreadLocalTransaction - val txSet = getTransactionSetInScope - Actor.log.trace("Joining transaction set [%s];\n\tactor %s\n\twith message [%s]", txSet, toString, message) // FIXME test to run bench without this trace call - val mtx = ThreadLocalTransaction.getThreadLocalTransaction - if ((mtx eq null) || mtx.getStatus.isDead) txSet.incParties - else txSet.incParties(mtx, 1) - } - /** * Callback for the dispatcher. This is the ingle entry point to the user Actor implementation. */ - protected[akka] def invoke(messageHandle: MessageInvocation): Unit = actor.synchronized { - if (isShutdown) { - Actor.log.warning("Actor [%s] is shut down, ignoring message [%s]", toString, messageHandle) - return - } - currentMessage = Option(messageHandle) - try { - dispatch(messageHandle) - } catch { - case e => - Actor.log.error(e, "Could not invoke actor [%s]", this) - throw e - } finally { - currentMessage = None //TODO: Don't reset this, we might want to resend the message - } - } - - private def dispatch[T](messageHandle: MessageInvocation) = { - val message = messageHandle.message //serializeMessage(messageHandle.message) - var topLevelTransaction = false - val txSet: Option[CountDownCommitBarrier] = - if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet - else { - topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx - if (isTransactor) { - Actor.log.trace( - "Creating a new transaction set (top-level transaction)\n\tfor actor %s\n\twith message %s", - toString, messageHandle) - Some(createNewTransactionSet) - } else None - } - setTransactionSet(txSet) - - try { - cancelReceiveTimeout // FIXME: leave this here? - if (isTransactor) { - val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory) - atomic(txFactory) { - actor.base(message) - setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit - } - } else { - actor.base(message) - setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit - } - } catch { - case e => - _isBeingRestarted = true - // abort transaction set - if (isTransactionSetInScope) { - val txSet = getTransactionSetInScope - Actor.log.debug("Aborting transaction set [%s]", txSet) - txSet.abort - } - Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) - - senderFuture.foreach(_.completeWithException(this, e)) - - clearTransaction - if (topLevelTransaction) clearTransactionSet - - // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client - if (_supervisor.isDefined) _supervisor.get ! Exit(this, e) - } finally { - clearTransaction - if (topLevelTransaction) clearTransactionSet + protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard { + if (isShutdown) Actor.log.warning("Actor [%s] is shut down, ignoring message [%s]", toString, messageHandle) + else { + currentMessage = Option(messageHandle) + try { + dispatch(messageHandle) + } catch { + case e => + Actor.log.error(e, "Could not invoke actor [%s]", this) + throw e + } finally { + currentMessage = None //TODO: Don't reset this, we might want to resend the message + } } } @@ -1078,10 +981,10 @@ sealed class LocalActorRef private[akka]( faultHandler match { // FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy case Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange)) => - restartLinkedActors(reason) + restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) case Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange)) => - dead.restart(reason) + dead.restart(reason, maxNrOfRetries, withinTimeRange) case None => throw new IllegalActorStateException( @@ -1094,42 +997,59 @@ sealed class LocalActorRef private[akka]( } } - protected[akka] def restart(reason: Throwable): Unit = { - val failedActor = actorInstance.get - failedActor.synchronized { - lifeCycle.get match { - case LifeCycle(scope, _, _) => { - scope match { - case Permanent => - Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) - restartLinkedActors(reason) - Actor.log.debug("Restarting linked actors for actor [%s].", id) - Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) - failedActor.preRestart(reason) - nullOutActorRefReferencesFor(failedActor) - val freshActor = newActor - freshActor.synchronized { + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = { + if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis + maxNrOfRetriesCount += 1 + if (maxNrOfRetriesCount > maxNrOfRetries || (System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange) { + val message = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) + Actor.log.warning( + "Maximum number of restarts [%s] within time range [%s] reached." + + "\n\tWill *not* restart actor [%s] anymore." + + "\n\tLast exception causing restart was [%s].", + maxNrOfRetries, withinTimeRange, this, reason) + _supervisor.foreach { sup => + if (sup.isDefinedAt(message)) sup ! message + else Actor.log.warning( + "No message handler defined for system message [MaximumNumberOfRestartsWithinTimeRangeReached]" + + "\n\tCan't send the message to the supervisor [%s].", sup) + } + } else { + _isBeingRestarted = true + val failedActor = actorInstance.get + val lock = guard.lock + guard.withGuard { + lifeCycle.get match { + case LifeCycle(scope, _, _) => { + scope match { + case Permanent => + Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) + restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) + Actor.log.debug("Restarting linked actors for actor [%s].", id) + Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) + failedActor.preRestart(reason) + nullOutActorRefReferencesFor(failedActor) + val freshActor = newActor freshActor.init freshActor.initTransactionalState actorInstance.set(freshActor) Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id) freshActor.postRestart(reason) - } - _isBeingRestarted = false - case Temporary => shutDownTemporaryActor(this) + _isBeingRestarted = false + case Temporary => shutDownTemporaryActor(this) + } } } } } } - protected[akka] def restartLinkedActors(reason: Throwable) = guard.withGuard { + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int) = { linkedActorsAsList.foreach { actorRef => if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent)) actorRef.lifeCycle.get match { case LifeCycle(scope, _, _) => { scope match { - case Permanent => actorRef.restart(reason) + case Permanent => actorRef.restart(reason, maxNrOfRetries, withinTimeRange) case Temporary => shutDownTemporaryActor(actorRef) } } @@ -1137,20 +1057,6 @@ sealed class LocalActorRef private[akka]( } } - private def shutDownTemporaryActor(temporaryActor: ActorRef) = { - Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", temporaryActor.id) - temporaryActor.stop - linkedActors.remove(temporaryActor.uuid) // remove the temporary actor - // if last temporary actor is gone, then unlink me from supervisor - if (linkedActors.isEmpty) { - Actor.log.info( - "All linked actors have died permanently (they were all configured as TEMPORARY)" + - "\n\tshutting down and unlinking supervisor actor as well [%s].", - temporaryActor.id) - _supervisor.foreach(_ ! UnlinkAndStop(this)) - } - } - protected[akka] def registerSupervisorAsRemoteActor: Option[String] = guard.withGuard { if (_supervisor.isDefined) { RemoteClient.clientFor(remoteAddress.get).registerSupervisorForActor(this) @@ -1169,6 +1075,127 @@ sealed class LocalActorRef private[akka]( protected[akka] def linkedActorsAsList: List[ActorRef] = linkedActors.values.toArray.toList.asInstanceOf[List[ActorRef]] + // ========= PRIVATE FUNCTIONS ========= + + private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = guard.withGuard { + val actorRef = Actor.actorOf(manifest[T].erasure.asInstanceOf[Class[T]].newInstance) + if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) actorRef.dispatcher = dispatcher + actorRef + } + + private[this] def newActor: Actor = { + isInInitialization = true + Actor.actorRefInCreation.value = Some(this) + val actor = actorFactory match { + case Left(Some(clazz)) => + try { + clazz.newInstance + } catch { + case e: InstantiationException => throw new ActorInitializationException( + "Could not instantiate Actor due to:\n" + e + + "\nMake sure Actor is NOT defined inside a class/trait," + + "\nif so put it outside the class/trait, f.e. in a companion object," + + "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.") + } + case Right(Some(factory)) => + factory() + case _ => + throw new ActorInitializationException( + "Can't create Actor, no Actor class or factory function in scope") + } + if (actor eq null) throw new ActorInitializationException( + "Actor instance passed to ActorRef can not be 'null'") + isInInitialization = false + actor + } + + private def joinTransaction(message: Any) = if (isTransactionSetInScope) { + import org.multiverse.api.ThreadLocalTransaction + val oldTxSet = getTransactionSetInScope + val currentTxSet = if (oldTxSet.isAborted || oldTxSet.isCommitted) { + clearTransactionSet + createNewTransactionSet + } else oldTxSet + Actor.log.ifTrace("Joining transaction set [" + currentTxSet + "];\n\tactor " + toString + "\n\twith message [" + message + "]") + val mtx = ThreadLocalTransaction.getThreadLocalTransaction + if ((mtx eq null) || mtx.getStatus.isDead) currentTxSet.incParties + else currentTxSet.incParties(mtx, 1) + } + + private def dispatch[T](messageHandle: MessageInvocation) = { + Actor.log.ifTrace("Invoking actor with message:\n" + messageHandle) + val message = messageHandle.message //serializeMessage(messageHandle.message) + var topLevelTransaction = false + val txSet: Option[CountDownCommitBarrier] = + if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet + else { + topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx + if (isTransactor) { + Actor.log.ifTrace("Creating a new transaction set (top-level transaction)\n\tfor actor " + toString + "\n\twith message " + messageHandle) + Some(createNewTransactionSet) + } else None + } + setTransactionSet(txSet) + + try { + cancelReceiveTimeout // FIXME: leave this here? + if (isTransactor) { + val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory) + atomic(txFactory) { + actor.base(message) + setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit + } + } else { + actor.base(message) + setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit + } + } catch { + case e: DeadTransactionException => + handleExceptionInDispatch( + new TransactionSetAbortedException("Transaction set has been aborted by another participant"), + message, topLevelTransaction) + case e => + handleExceptionInDispatch(e, message, topLevelTransaction) + } finally { + clearTransaction + if (topLevelTransaction) clearTransactionSet + } + } + + private def shutDownTemporaryActor(temporaryActor: ActorRef) = { + Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", temporaryActor.id) + temporaryActor.stop + linkedActors.remove(temporaryActor.uuid) // remove the temporary actor + // if last temporary actor is gone, then unlink me from supervisor + if (linkedActors.isEmpty) { + Actor.log.info( + "All linked actors have died permanently (they were all configured as TEMPORARY)" + + "\n\tshutting down and unlinking supervisor actor as well [%s].", + temporaryActor.id) + _supervisor.foreach(_ ! UnlinkAndStop(this)) + } + } + + private def handleExceptionInDispatch(e: Throwable, message: Any, topLevelTransaction: Boolean) = { + Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) + + _isBeingRestarted = true + // abort transaction set + if (isTransactionSetInScope) { + val txSet = getTransactionSetInScope + Actor.log.debug("Aborting transaction set [%s]", txSet) + txSet.abort + } + + senderFuture.foreach(_.completeWithException(this, e)) + + clearTransaction + if (topLevelTransaction) clearTransactionSet + + // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client + if (_supervisor.isDefined) _supervisor.get ! Exit(this, e) + } + private def nullOutActorRefReferencesFor(actor: Actor) = { actorSelfFields._1.set(actor, null) actorSelfFields._2.set(actor, null) @@ -1297,9 +1324,9 @@ private[akka] case class RemoteActorRef private[akka] ( def supervisor: Option[ActorRef] = unsupported def shutdownLinkedActors: Unit = unsupported protected[akka] def mailbox: Deque[MessageInvocation] = unsupported - protected[akka] def restart(reason: Throwable): Unit = unsupported protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported - protected[akka] def restartLinkedActors(reason: Throwable): Unit = unsupported + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported protected[akka] def linkedActors: JMap[String, ActorRef] = unsupported protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported diff --git a/akka-core/src/main/scala/actor/SerializationProtocol.scala b/akka-core/src/main/scala/actor/SerializationProtocol.scala index d549bb8c80..13e8230638 100644 --- a/akka-core/src/main/scala/actor/SerializationProtocol.scala +++ b/akka-core/src/main/scala/actor/SerializationProtocol.scala @@ -77,14 +77,14 @@ object ActorSerialization { toSerializedActorRefProtocol(a, format).toByteArray } - private def toSerializedActorRefProtocol[T <: Actor](a: ActorRef, format: Format[T]): SerializedActorRefProtocol = { + private def toSerializedActorRefProtocol[T <: Actor](actorRef: ActorRef, format: Format[T]): SerializedActorRefProtocol = { val lifeCycleProtocol: Option[LifeCycleProtocol] = { def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match { case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT) case Temporary => builder.setLifeCycle(LifeCycleType.TEMPORARY) } val builder = LifeCycleProtocol.newBuilder - a.lifeCycle match { + actorRef.lifeCycle match { case Some(LifeCycle(scope, None, _)) => setScope(builder, scope) Some(builder.build) @@ -98,21 +98,22 @@ object ActorSerialization { } val originalAddress = AddressProtocol.newBuilder - .setHostname(a.homeAddress.getHostName) - .setPort(a.homeAddress.getPort) + .setHostname(actorRef.homeAddress.getHostName) + .setPort(actorRef.homeAddress.getPort) .build val builder = SerializedActorRefProtocol.newBuilder - .setUuid(a.uuid) - .setId(a.id) - .setActorClassname(a.actorClass.getName) + .setUuid(actorRef.uuid) + .setId(actorRef.id) + .setActorClassname(actorRef.actorClass.getName) .setOriginalAddress(originalAddress) - .setIsTransactor(a.isTransactor) - .setTimeout(a.timeout) + .setIsTransactor(actorRef.isTransactor) + .setTimeout(actorRef.timeout) - builder.setActorInstance(ByteString.copyFrom(format.toBinary(a.actor.asInstanceOf[T]))) + actorRef.receiveTimeout.foreach(builder.setReceiveTimeout(_)) + builder.setActorInstance(ByteString.copyFrom(format.toBinary(actorRef.actor.asInstanceOf[T]))) lifeCycleProtocol.foreach(builder.setLifeCycle(_)) - a.supervisor.foreach(s => builder.setSupervisor(RemoteActorSerialization.toRemoteActorRefProtocol(s))) + actorRef.supervisor.foreach(s => builder.setSupervisor(RemoteActorSerialization.toRemoteActorRefProtocol(s))) // FIXME: how to serialize the hotswap PartialFunction ?? //hotswap.foreach(builder.setHotswapStack(_)) builder.build @@ -161,6 +162,7 @@ object ActorSerialization { protocol.getOriginalAddress.getPort, if (protocol.hasIsTransactor) protocol.getIsTransactor else false, if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT, + if (protocol.hasReceiveTimeout) Some(protocol.getReceiveTimeout) else None, lifeCycle, supervisor, hotswap, diff --git a/akka-core/src/main/scala/dispatch/MessageHandling.scala b/akka-core/src/main/scala/dispatch/MessageHandling.scala index c2e74ceb1d..a73f2b691b 100644 --- a/akka-core/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-core/src/main/scala/dispatch/MessageHandling.scala @@ -53,7 +53,7 @@ final class MessageInvocation(val receiver: ActorRef, "\n\tsender = " + sender + "\n\tsenderFuture = " + senderFuture + "\n\ttransactionSet = " + transactionSet + - "\n]" + "]" } } diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index f1c3633944..8a5864a51b 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -151,11 +151,17 @@ abstract class BasicClusterActor extends ClusterActor with Logging { case InitClusterActor(s) => { serializer = s + boot } } /** - * Implement this in a subclass to add node-to-node messaging + * Implement this in a subclass to boot up the cluster implementation + */ + protected def boot: Unit + + /** + * Implement this in a subclass to add node-to-node messaging */ protected def toOneNode(dest: ADDR_T, msg: Array[Byte]): Unit diff --git a/akka-core/src/main/scala/remote/JGroupsClusterActor.scala b/akka-core/src/main/scala/remote/JGroupsClusterActor.scala index 847985e3d3..54ef3807d4 100644 --- a/akka-core/src/main/scala/remote/JGroupsClusterActor.scala +++ b/akka-core/src/main/scala/remote/JGroupsClusterActor.scala @@ -17,9 +17,8 @@ class JGroupsClusterActor extends BasicClusterActor { @volatile private var isActive = false @volatile private var channel: Option[JChannel] = None - override def init = { - super.init - log info "Initiating JGroups-based cluster actor" + protected def boot = { + log info "Booting JGroups-based cluster" isActive = true // Set up the JGroups local endpoint diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 28d087b3e1..f425d8accf 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -215,11 +215,15 @@ class RemoteServer extends Logging { def shutdown = synchronized { if (_isRunning) { - RemoteServer.unregister(hostname, port) - openChannels.disconnect - openChannels.close.awaitUninterruptibly - bootstrap.releaseExternalResources - Cluster.deregisterLocalNode(hostname, port) + try { + RemoteServer.unregister(hostname, port) + openChannels.disconnect + openChannels.close.awaitUninterruptibly + bootstrap.releaseExternalResources + Cluster.deregisterLocalNode(hostname, port) + } catch { + case e: java.nio.channels.ClosedChannelException => log.warning("Could not close remote server channel in a graceful way") + } } } diff --git a/akka-core/src/main/scala/stm/JTA.scala b/akka-core/src/main/scala/stm/JTA.scala index bb61973c91..24b5a49086 100644 --- a/akka-core/src/main/scala/stm/JTA.scala +++ b/akka-core/src/main/scala/stm/JTA.scala @@ -4,7 +4,9 @@ package se.scalablesolutions.akka.stm -import javax.transaction.{TransactionManager, UserTransaction, Transaction => JtaTransaction, SystemException, Status, Synchronization, TransactionSynchronizationRegistry} +import javax.transaction.{TransactionManager, UserTransaction, + Transaction => JtaTransaction, SystemException, + Status, Synchronization, TransactionSynchronizationRegistry} import javax.naming.{InitialContext, Context, NamingException} import se.scalablesolutions.akka.config.Config._ @@ -16,7 +18,7 @@ import se.scalablesolutions.akka.util.Logging * @author Jonas Bonér */ object TransactionContainer extends Logging { - val AKKA_JTA_TRANSACTION_SERVICE_CLASS = "se.scalablesolutions.akka.jta.AtomikosTransactionService" + val AKKA_JTA_TRANSACTION_SERVICE_CLASS = "se.scalablesolutions.akka.jta.AtomikosTransactionService" val DEFAULT_USER_TRANSACTION_NAME = "java:comp/UserTransaction" val FALLBACK_TRANSACTION_MANAGER_NAMES = "java:comp/TransactionManager" :: "java:appserver/TransactionManager" :: @@ -119,22 +121,31 @@ class TransactionContainer private (val tm: Either[Option[UserTransaction], Opti } } - def begin = tm match { - case Left(Some(userTx)) => userTx.begin - case Right(Some(txMan)) => txMan.begin - case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope") + def begin = { + TransactionContainer.log.ifTrace("Starting JTA transaction") + tm match { + case Left(Some(userTx)) => userTx.begin + case Right(Some(txMan)) => txMan.begin + case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope") + } } - def commit = tm match { - case Left(Some(userTx)) => userTx.commit - case Right(Some(txMan)) => txMan.commit - case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope") + def commit = { + TransactionContainer.log.ifTrace("Committing JTA transaction") + tm match { + case Left(Some(userTx)) => userTx.commit + case Right(Some(txMan)) => txMan.commit + case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope") + } } - def rollback = tm match { - case Left(Some(userTx)) => userTx.rollback - case Right(Some(txMan)) => txMan.rollback - case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope") + def rollback = { + TransactionContainer.log.ifTrace("Aborting JTA transaction") + tm match { + case Left(Some(userTx)) => userTx.rollback + case Right(Some(txMan)) => txMan.rollback + case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope") + } } def getStatus = tm match { diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala index 54f20a3504..0951cbc5c5 100644 --- a/akka-core/src/main/scala/stm/Transaction.scala +++ b/akka-core/src/main/scala/stm/Transaction.scala @@ -83,11 +83,12 @@ object Transaction { if (JTA_AWARE) Some(TransactionContainer()) else None - log.trace("Creating %s", toString) + log.ifTrace("Creating transaction " + toString) // --- public methods --------- def begin = synchronized { + log.ifTrace("Starting transaction " + toString) jta.foreach { txContainer => txContainer.begin txContainer.registerSynchronization(new StmSynchronization(txContainer, this)) @@ -95,14 +96,14 @@ object Transaction { } def commit = synchronized { - log.trace("Committing transaction %s", toString) + log.ifTrace("Committing transaction " + toString) persistentStateMap.valuesIterator.foreach(_.commit) status = TransactionStatus.Completed jta.foreach(_.commit) } def abort = synchronized { - log.trace("Aborting transaction %s", toString) + log.ifTrace("Aborting transaction " + toString) jta.foreach(_.rollback) persistentStateMap.valuesIterator.foreach(_.abort) persistentStateMap.clear diff --git a/akka-core/src/main/scala/stm/TransactionFactory.scala b/akka-core/src/main/scala/stm/TransactionFactory.scala index 56982bb759..d4e61ee04f 100644 --- a/akka-core/src/main/scala/stm/TransactionFactory.scala +++ b/akka-core/src/main/scala/stm/TransactionFactory.scala @@ -37,8 +37,8 @@ object TransactionConfig { def traceLevel(level: String) = level.toLowerCase match { case "coarse" | "course" => Transaction.TraceLevel.Coarse - case "fine" => Transaction.TraceLevel.Fine - case _ => Transaction.TraceLevel.None + case "fine" => Transaction.TraceLevel.Fine + case _ => Transaction.TraceLevel.None } /** @@ -126,7 +126,7 @@ object TransactionFactory { traceLevel: TraceLevel = TransactionConfig.TRACE_LEVEL, hooks: Boolean = TransactionConfig.HOOKS) = { val config = new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew, - explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks) + explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks) new TransactionFactory(config) } } diff --git a/akka-core/src/main/scala/stm/TransactionManagement.scala b/akka-core/src/main/scala/stm/TransactionManagement.scala index 0c6a244f42..2eada538c7 100644 --- a/akka-core/src/main/scala/stm/TransactionManagement.scala +++ b/akka-core/src/main/scala/stm/TransactionManagement.scala @@ -7,6 +7,7 @@ package se.scalablesolutions.akka.stm import se.scalablesolutions.akka.util.Logging import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.TimeUnit import org.multiverse.api.{StmUtils => MultiverseStmUtils} import org.multiverse.api.ThreadLocalTransaction._ @@ -14,16 +15,20 @@ import org.multiverse.api.{Transaction => MultiverseTransaction} import org.multiverse.commitbarriers.CountDownCommitBarrier import org.multiverse.templates.{TransactionalCallable, OrElseTemplate} -class StmException(msg: String) extends RuntimeException(msg) +class TransactionSetAbortedException(msg: String) extends RuntimeException(msg) +// TODO Should we remove TransactionAwareWrapperException? Not used anywhere yet. class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) { override def toString = "TransactionAwareWrapperException[" + cause + ", " + tx + "]" } +/** + * Internal helper methods and properties for transaction management. + */ object TransactionManagement extends TransactionManagement { import se.scalablesolutions.akka.config.Config._ - // move to stm.global.fair? + // FIXME move to stm.global.fair? val FAIR_TRANSACTIONS = config.getBool("akka.stm.fair", true) private[akka] val transactionSet = new ThreadLocal[Option[CountDownCommitBarrier]]() { @@ -47,6 +52,9 @@ object TransactionManagement extends TransactionManagement { } } +/** + * Internal helper methods for transaction management. + */ trait TransactionManagement { private[akka] def createNewTransactionSet: CountDownCommitBarrier = { @@ -111,7 +119,9 @@ class LocalStm extends TransactionManagement with Logging { factory.boilerplate.execute(new TransactionalCallable[T]() { def call(mtx: MultiverseTransaction): T = { factory.addHooks - body + val result = body + log.ifTrace("Committing local transaction [" + mtx + "]") + result } }) } @@ -145,10 +155,9 @@ class GlobalStm extends TransactionManagement with Logging { factory.addHooks val result = body val txSet = getTransactionSetInScope - log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet) - // FIXME ? txSet.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS) - try { txSet.joinCommit(mtx) } catch { case e: IllegalStateException => {} } - clearTransaction + log.ifTrace("Committing global transaction [" + mtx + "]\n\tand joining transaction set [" + txSet + "]") + // Need to catch IllegalStateException until we have fix in Multiverse, since it throws it by mistake + try { txSet.tryJoinCommit(mtx, TransactionConfig.TIMEOUT, TimeUnit.MILLISECONDS) } catch { case e: IllegalStateException => {} } result } }) @@ -156,6 +165,7 @@ class GlobalStm extends TransactionManagement with Logging { } trait StmUtil { + /** * Schedule a deferred task on the thread local transaction (use within an atomic). * This is executed when the transaction commits. @@ -178,6 +188,14 @@ trait StmUtil { /** * Use either-orElse to combine two blocking transactions. + * Usage: + *

+   * either {
+   *   ...
+   * } orElse {
+   *   ...
+   * }
+   * 
*/ def either[T](firstBody: => T) = new { def orElse(secondBody: => T) = new OrElseTemplate[T] { diff --git a/akka-core/src/main/scala/util/LockUtil.scala b/akka-core/src/main/scala/util/LockUtil.scala index 09a4b2d650..885e11def7 100644 --- a/akka-core/src/main/scala/util/LockUtil.scala +++ b/akka-core/src/main/scala/util/LockUtil.scala @@ -10,7 +10,7 @@ import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock} * @author Jonas Bonér */ class ReentrantGuard { - private val lock = new ReentrantLock + val lock = new ReentrantLock def withGuard[T](body: => T): T = { lock.lock @@ -20,6 +20,15 @@ class ReentrantGuard { lock.unlock } } + + def tryWithGuard[T](body: => T): T = { + while(!lock.tryLock) { Thread.sleep(10) } // wait on the monitor to be unlocked + try { + body + } finally { + lock.unlock + } + } } /** diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojo.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojo.java index ee549cef23..31f22c217f 100644 --- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojo.java +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojo.java @@ -2,35 +2,47 @@ package se.scalablesolutions.akka.actor; import se.scalablesolutions.akka.actor.annotation.prerestart; import se.scalablesolutions.akka.actor.annotation.postrestart; +import se.scalablesolutions.akka.actor.ActiveObjectContext; +import se.scalablesolutions.akka.dispatch.CompletableFuture; public class SimpleJavaPojo { - - public boolean pre = false; - public boolean post = false; - - private String name; - - public void setName(String name) { - this.name = name; - } - - public String getName() { - return name; - } - - @prerestart - public void pre() { - System.out.println("** pre()"); - pre = true; - } - - @postrestart - public void post() { - System.out.println("** post()"); - post = true; - } - public void throwException() { - throw new RuntimeException(); - } + ActiveObjectContext context; + + public boolean pre = false; + public boolean post = false; + + private String name; + + public Object getSender() { + return context.getSender(); + } + + public CompletableFuture getSenderFuture() { + return context.getSenderFuture(); + } + + public void setName(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + @prerestart + public void pre() { + System.out.println("** pre()"); + pre = true; + } + + @postrestart + public void post() { + System.out.println("** post()"); + post = true; + } + + public void throwException() { + throw new RuntimeException(); + } } diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojoCaller.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojoCaller.java new file mode 100644 index 0000000000..0fb6aff9c5 --- /dev/null +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojoCaller.java @@ -0,0 +1,20 @@ +package se.scalablesolutions.akka.actor; + +import se.scalablesolutions.akka.dispatch.CompletableFuture; + +public class SimpleJavaPojoCaller { + + SimpleJavaPojo pojo; + + public void setPojo(SimpleJavaPojo pojo) { + this.pojo = pojo; + } + + public Object getSenderFromSimpleJavaPojo() { + return pojo.getSender(); + } + + public CompletableFuture getSenderFutureFromSimpleJavaPojo() { + return pojo.getSenderFuture(); + } +} diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/TransactionalActiveObject.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/TransactionalActiveObject.java index 515f4fafee..825e7ca489 100644 --- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/TransactionalActiveObject.java +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/TransactionalActiveObject.java @@ -21,6 +21,7 @@ public class TransactionalActiveObject { refState = new Ref(); isInitialized = true; } + System.out.println("==========> init"); } public String getMapState(String key) { @@ -37,6 +38,7 @@ public class TransactionalActiveObject { public void setMapState(String key, String msg) { mapState.put(key, msg); + System.out.println("==========> setMapState"); } public void setVectorState(String msg) { @@ -72,6 +74,7 @@ public class TransactionalActiveObject { mapState.put(key, msg); vectorState.add(msg); refState.swap(msg); + System.out.println("==========> failure"); nested.failure(key, msg, failer); return msg; } diff --git a/akka-core/src/test/scala/ActiveObjectContextSpec.scala b/akka-core/src/test/scala/ActiveObjectContextSpec.scala new file mode 100644 index 0000000000..5a54f0a505 --- /dev/null +++ b/akka-core/src/test/scala/ActiveObjectContextSpec.scala @@ -0,0 +1,45 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor + +import org.scalatest.Spec +import org.scalatest.Assertions +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import se.scalablesolutions.akka.dispatch.DefaultCompletableFuture; + +@RunWith(classOf[JUnitRunner]) +class ActiveObjectContextSpec extends + Spec with + ShouldMatchers with + BeforeAndAfterAll { + + describe("ActiveObjectContext") { + it("context.sender should return the sender Active Object reference") { + val pojo = ActiveObject.newInstance(classOf[SimpleJavaPojo]) + val pojoCaller = ActiveObject.newInstance(classOf[SimpleJavaPojoCaller]) + pojoCaller.setPojo(pojo) + try { + pojoCaller.getSenderFromSimpleJavaPojo should equal (pojoCaller) + } catch { + case e => fail("no sender available") + } + } + + it("context.senderFuture should return the senderFuture Active Object reference") { + val pojo = ActiveObject.newInstance(classOf[SimpleJavaPojo]) + val pojoCaller = ActiveObject.newInstance(classOf[SimpleJavaPojoCaller]) + pojoCaller.setPojo(pojo) + try { + pojoCaller.getSenderFutureFromSimpleJavaPojo.getClass.getName should equal (classOf[DefaultCompletableFuture[_]].getName) + } catch { + case e => fail("no sender future available", e) + } + } + } +} diff --git a/akka-core/src/test/scala/ActiveObjectLifecycleSpec.scala b/akka-core/src/test/scala/ActiveObjectLifecycleSpec.scala index 97b01c12ce..2b8e4d502f 100644 --- a/akka-core/src/test/scala/ActiveObjectLifecycleSpec.scala +++ b/akka-core/src/test/scala/ActiveObjectLifecycleSpec.scala @@ -5,7 +5,9 @@ import org.scalatest.{BeforeAndAfterAll, Spec} import org.scalatest.junit.JUnitRunner import org.scalatest.matchers.ShouldMatchers -import se.scalablesolutions.akka.config.ActiveObjectConfigurator +import se.scalablesolutions.akka.actor.ActiveObject._ + +import se.scalablesolutions.akka.config.{OneForOneStrategy, ActiveObjectConfigurator} import se.scalablesolutions.akka.config.JavaConfig._ /** @@ -151,5 +153,15 @@ class ActiveObjectLifecycleSpec extends Spec with ShouldMatchers with BeforeAndA assert(!obj._post) assert(obj._down) } + + it("both preRestart and postRestart methods should be invoked when an actor is restarted") { + val pojo = ActiveObject.newInstance(classOf[SimpleJavaPojo]) + val supervisor = ActiveObject.newInstance(classOf[SimpleJavaPojo]) + link(supervisor,pojo, new OneForOneStrategy(3, 2000),Array(classOf[Throwable])) + pojo.throwException + Thread.sleep(500) + pojo.pre should be(true) + pojo.post should be(true) + } } } \ No newline at end of file diff --git a/akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala b/akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala index 0a47d1c9d6..6ad73d5438 100644 --- a/akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala +++ b/akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala @@ -17,18 +17,11 @@ import se.scalablesolutions.akka.config.ActiveObjectConfigurator import se.scalablesolutions.akka.config.JavaConfig._ import se.scalablesolutions.akka.actor._ -/* @RunWith(classOf[JUnitRunner]) class NestedTransactionalActiveObjectSpec extends -<<<<<<< HEAD:akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala Spec with ShouldMatchers with BeforeAndAfterAll { -======= - Spec with - ShouldMatchers with - BeforeAndAfterAll { ->>>>>>> 38e8bea3fe6a7e9fcc9c5f353124144739bdc234:akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala private val conf = new ActiveObjectConfigurator private var messageLog = "" @@ -55,7 +48,7 @@ class NestedTransactionalActiveObjectSpec extends } describe("Transactional nested in-memory Active Object") { - +/* it("map should not rollback state for stateful server in case of success") { val stateful = conf.getInstance(classOf[TransactionalActiveObject]) stateful.init @@ -163,6 +156,6 @@ class NestedTransactionalActiveObjectSpec extends Thread.sleep(100) nested.getRefState should equal("init") } + */ } -} -*/ +} \ No newline at end of file diff --git a/akka-core/src/test/scala/SerializableTypeClassActorSpec.scala b/akka-core/src/test/scala/SerializableTypeClassActorSpec.scala index 99a2b99dc9..2b26c9ad81 100644 --- a/akka-core/src/test/scala/SerializableTypeClassActorSpec.scala +++ b/akka-core/src/test/scala/SerializableTypeClassActorSpec.scala @@ -110,6 +110,8 @@ class SerializableTypeClassActorSpec extends val actor2 = fromBinary(bytes) actor2.start (actor2 !! "hello").getOrElse("_") should equal("world 3") + + actor2.receiveTimeout should equal (Some(1000)) } it("should be able to serialize and deserialize a MyStatelessActorWithMessagesInMailbox") { @@ -172,7 +174,8 @@ class MyStatelessActorWithMessagesInMailbox extends Actor { @serializable class MyJavaSerializableActor extends Actor { var count = 0 - + self.receiveTimeout = Some(1000) + def receive = { case "hello" => count = count + 1 diff --git a/akka-core/src/test/scala/SupervisorHierarchySpec.scala b/akka-core/src/test/scala/SupervisorHierarchySpec.scala index 75751e3d58..138313bafc 100644 --- a/akka-core/src/test/scala/SupervisorHierarchySpec.scala +++ b/akka-core/src/test/scala/SupervisorHierarchySpec.scala @@ -6,12 +6,27 @@ package se.scalablesolutions.akka.actor import org.scalatest.junit.JUnitSuite import org.junit.Test -import java.lang.Throwable + import Actor._ import se.scalablesolutions.akka.config.OneForOneStrategy + import java.util.concurrent.{TimeUnit, CountDownLatch} +object SupervisorHierarchySpec { + class FireWorkerException(msg: String) extends Exception(msg) + + class CountDownActor(countDown: CountDownLatch) extends Actor { + protected def receive = { case _ => () } + override def postRestart(reason: Throwable) = countDown.countDown + } + + class CrasherActor extends Actor { + protected def receive = { case _ => () } + } +} + class SupervisorHierarchySpec extends JUnitSuite { + import SupervisorHierarchySpec._ @Test def killWorkerShouldRestartMangerAndOtherWorkers = { @@ -19,7 +34,7 @@ class SupervisorHierarchySpec extends JUnitSuite { val workerOne = actorOf(new CountDownActor(countDown)) val workerTwo = actorOf(new CountDownActor(countDown)) - val workerThree = actorOf(new CountDownActor( countDown)) + val workerThree = actorOf(new CountDownActor(countDown)) val boss = actorOf(new Actor{ self.trapExit = List(classOf[Throwable]) @@ -35,19 +50,32 @@ class SupervisorHierarchySpec extends JUnitSuite { manager.startLink(workerTwo) manager.startLink(workerThree) - workerOne ! Exit(workerOne, new RuntimeException("Fire the worker!")) + workerOne ! Exit(workerOne, new FireWorkerException("Fire the worker!")) // manager + all workers should be restarted by only killing a worker // manager doesn't trap exits, so boss will restart manager - assert(countDown.await(4, TimeUnit.SECONDS)) + assert(countDown.await(2, TimeUnit.SECONDS)) } + + @Test + def supervisorShouldReceiveNotificationMessageWhenMaximumNumberOfRestartsWithinTimeRangeIsReached = { + val countDown = new CountDownLatch(2) + val crasher = actorOf(new CountDownActor(countDown)) + val boss = actorOf(new Actor{ + self.trapExit = List(classOf[Throwable]) + self.faultHandler = Some(OneForOneStrategy(1, 5000)) + protected def receive = { + case MaximumNumberOfRestartsWithinTimeRangeReached(_, _, _, _) => + countDown.countDown + } + }).start + boss.startLink(crasher) - class CountDownActor(countDown: CountDownLatch) extends Actor { + crasher ! Exit(crasher, new FireWorkerException("Fire the worker!")) + crasher ! Exit(crasher, new FireWorkerException("Fire the worker!")) - protected def receive = { case _ => () } - - override def postRestart(reason: Throwable) = countDown.countDown + assert(countDown.await(2, TimeUnit.SECONDS)) } } diff --git a/akka-core/src/test/scala/TransactionalActiveObjectSpec.scala b/akka-core/src/test/scala/TransactionalActiveObjectSpec.scala index b42c137e33..d1c59b1df4 100644 --- a/akka-core/src/test/scala/TransactionalActiveObjectSpec.scala +++ b/akka-core/src/test/scala/TransactionalActiveObjectSpec.scala @@ -16,18 +16,12 @@ import se.scalablesolutions.akka.config._ import se.scalablesolutions.akka.config.ActiveObjectConfigurator import se.scalablesolutions.akka.config.JavaConfig._ import se.scalablesolutions.akka.actor._ -/* + @RunWith(classOf[JUnitRunner]) class TransactionalActiveObjectSpec extends -<<<<<<< HEAD:akka-core/src/test/scala/TransactionalActiveObjectSpec.scala Spec with ShouldMatchers with BeforeAndAfterAll { -======= - Spec with - ShouldMatchers with - BeforeAndAfterAll { ->>>>>>> 38e8bea3fe6a7e9fcc9c5f353124144739bdc234:akka-core/src/test/scala/TransactionalActiveObjectSpec.scala private val conf = new ActiveObjectConfigurator private var messageLog = "" @@ -50,9 +44,9 @@ class TransactionalActiveObjectSpec extends override def afterAll { conf.stop } - + describe("Transactional in-memory Active Object ") { - + /* it("map should not rollback state for stateful server in case of success") { val stateful = conf.getInstance(classOf[TransactionalActiveObject]) stateful.init @@ -64,7 +58,9 @@ class TransactionalActiveObjectSpec extends it("map should rollback state for stateful server in case of failure") { val stateful = conf.getInstance(classOf[TransactionalActiveObject]) stateful.init + Thread.sleep(500) stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") + Thread.sleep(500) val failer = conf.getInstance(classOf[ActiveObjectFailer]) try { stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) @@ -112,6 +108,6 @@ class TransactionalActiveObjectSpec extends stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") stateful.getRefState should equal("new state") } + */ } } -*/ diff --git a/akka-core/src/test/scala/TransactorSpec.scala b/akka-core/src/test/scala/TransactorSpec.scala index 872b160fb1..1f48d0e740 100644 --- a/akka-core/src/test/scala/TransactorSpec.scala +++ b/akka-core/src/test/scala/TransactorSpec.scala @@ -76,7 +76,6 @@ class StatefulTransactor(expectedInvocationCount: Int) extends Transactor { failer !! "Failure" self.reply(msg) notifier.countDown - case SetMapStateOneWay(key, msg) => mapState.put(key, msg) notifier.countDown @@ -95,8 +94,8 @@ class StatefulTransactor(expectedInvocationCount: Int) extends Transactor { mapState.put(key, msg) vectorState.add(msg) refState.swap(msg) - failer ! "Failure" notifier.countDown + failer ! "Failure" } } @@ -110,6 +109,7 @@ class FailerTransactor extends Transactor { } class TransactorSpec extends JUnitSuite { + @Test def shouldOneWayMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { val stateful = actorOf(new StatefulTransactor(2)) @@ -139,7 +139,7 @@ class TransactorSpec extends JUnitSuite { stateful ! SetMapStateOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method val notifier = (stateful !! GetNotifier).as[CountDownLatch] - assert(notifier.get.await(1, TimeUnit.SECONDS)) + assert(notifier.get.await(5, TimeUnit.SECONDS)) assert("init" === (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state } diff --git a/akka-http/src/main/scala/AkkaCometServlet.scala b/akka-http/src/main/scala/AkkaCometServlet.scala index 92db0835cf..0313dfe6f7 100644 --- a/akka-http/src/main/scala/AkkaCometServlet.scala +++ b/akka-http/src/main/scala/AkkaCometServlet.scala @@ -50,6 +50,11 @@ class AkkaServlet extends AtmosphereServlet with Logging { addInitParameter(AtmosphereServlet.PROPERTY_USE_STREAM,"true") addInitParameter("com.sun.jersey.config.property.packages",c.getList("akka.rest.resource_packages").mkString(";")) addInitParameter("com.sun.jersey.spi.container.ResourceFilters",c.getList("akka.rest.filters").mkString(",")) + c.getInt("akka.rest.maxInactiveActivity").foreach { value => + log.info("MAX_INACTIVE:%s",value.toString) + addInitParameter(CometSupport.MAX_INACTIVE,value.toString) + } + val servlet = new AtmosphereRestServlet { override def getInitParameter(key : String) = AkkaServlet.this.getInitParameter(key) diff --git a/akka-samples/akka-sample-ants/src/main/scala/Ants.scala b/akka-samples/akka-sample-ants/src/main/scala/Ants.scala index f1cc0ba628..fc53db6f22 100644 --- a/akka-samples/akka-sample-ants/src/main/scala/Ants.scala +++ b/akka-samples/akka-sample-ants/src/main/scala/Ants.scala @@ -68,7 +68,7 @@ object World { lazy val ants = setup lazy val evaporator = actorOf[Evaporator].start - private val snapshotFactory = TransactionFactory(readonly = true, familyName = "snapshot") + private val snapshotFactory = TransactionFactory(readonly = true, familyName = "snapshot", hooks = false) def snapshot = atomic(snapshotFactory) { Array.tabulate(Dim, Dim)(place(_, _).get) } @@ -139,7 +139,7 @@ class AntActor(initLoc: (Int, Int)) extends WorldActor { val locRef = Ref(initLoc) val name = "ant-from-" + initLoc._1 + "-" + initLoc._2 - implicit val txFactory = TransactionFactory(familyName = name) + implicit val txFactory = TransactionFactory(familyName = name, hooks = false) val homing = (p: Place) => p.pher + (100 * (if (p.home) 0 else 1)) val foraging = (p: Place) => p.pher + p.food @@ -211,7 +211,7 @@ class Evaporator extends WorldActor { import Config._ import World._ - implicit val txFactory = TransactionFactory(familyName = "evaporator") + implicit val txFactory = TransactionFactory(familyName = "evaporator", hooks = false) val evaporate = (pher: Float) => pher * EvapRate def act = for (x <- 0 until Dim; y <- 0 until Dim) { diff --git a/config/akka-reference.conf b/config/akka-reference.conf index e20a745ca1..720716e0ea 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -5,16 +5,21 @@ # This file has all the default settings, so all these could be removed with no visible effect. # Modify as needed. - +log { filename = "./logs/akka.log" roll = "daily" # Options: never, hourly, daily, sunday/monday/... level = "debug" # Options: fatal, critical, error, warning, info, debug, trace console = on # syslog_host = "" # syslog_server_name = "" - - + akka { + node = "se.scalablesolutions.akka" + level = "info" + } +} + +akka { version = "0.10" # FQN (Fully Qualified Name) to the class doing initial active object/actor @@ -24,81 +29,81 @@ "sample.rest.scala.Boot", "sample.security.Boot"] - + actor { timeout = 5000 # default timeout for future based invocations serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability throughput = 5 # default throughput for ExecutorBasedEventDrivenDispatcher - + } - + stm { fair = on # should global transactions be fair or non-fair (non fair yield better performance) jta-aware = off # 'on' means that if there JTA Transaction Manager available then the STM will # begin (or join), commit or rollback the JTA transaction. Default is 'off'. - + } - + jta { provider = "from-jndi" # Options: "from-jndi" (means that Akka will try to detect a TransactionManager in the JNDI) # "atomikos" (means that Akka will use the Atomikos based JTA impl in 'akka-jta', # e.g. you need the akka-jta JARs on classpath). timeout = 60000 - + } - + rest { service = on hostname = "localhost" port = 9998 filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use resource_packages = ["sample.rest.scala","sample.rest.java","sample.security"] # List with all resource packages for your Jersey services authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now) - + #maxInactiveActivity = 60000 #Atmosphere CometSupport maxInactiveActivity #IF you are using a KerberosAuthenticationActor - # + # kerberos { # servicePrincipal = "HTTP/localhost@EXAMPLE.COM" # keyTabLocation = "URL to keytab" # kerberosDebug = "true" # realm = "EXAMPLE.COM" - # - + # } + } - + remote { compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6 - + cluster { service = on name = "default" # The name of the cluster serializer = "se.scalablesolutions.akka.serialization.Serializer$Java$" # FQN of the serializer class - + } - + server { service = on hostname = "localhost" port = 9999 connection-timeout = 1000 # in millis (1 sec default) - + } - + client { reconnect-delay = 5000 # in millis (5 sec default) read-timeout = 10000 # in millis (10 sec default) - - + } + } - - + storage { + cassandra { hostname = "127.0.0.1" # IP address or hostname of one of the Cassandra cluster's seeds port = 9160 consistency-level = "QUORUM" # Options: ZERO, ONE, QUORUM, DCQUORUM, DCQUORUMSYNC, ALL, ANY - + } - + mongodb { hostname = "127.0.0.1" # IP address or hostname of the MongoDB DB instance port = 27017 dbname = "mydb" - + } - + redis { hostname = "127.0.0.1" # IP address or hostname of the Redis instance port = 6379 - - - + } + } +} diff --git a/embedded-repo/com/redis/redisclient/2.8.0-1.4/redisclient-2.8.0-1.4.jar b/embedded-repo/com/redis/redisclient/2.8.0-1.4/redisclient-2.8.0-1.4.jar new file mode 100644 index 0000000000..a5c824b19e Binary files /dev/null and b/embedded-repo/com/redis/redisclient/2.8.0-1.4/redisclient-2.8.0-1.4.jar differ diff --git a/embedded-repo/net/lag/configgy/2.8.0-1.5.5/configgy-2.8.0-1.5.5.jar b/embedded-repo/net/lag/configgy/2.8.0-1.5.5/configgy-2.8.0-1.5.5.jar new file mode 100644 index 0000000000..8568788a70 Binary files /dev/null and b/embedded-repo/net/lag/configgy/2.8.0-1.5.5/configgy-2.8.0-1.5.5.jar differ diff --git a/embedded-repo/net/lag/configgy/2.8.0-1.5.5/configgy-2.8.0-1.5.5.pom b/embedded-repo/net/lag/configgy/2.8.0-1.5.5/configgy-2.8.0-1.5.5.pom new file mode 100644 index 0000000000..1a3c2f8f83 --- /dev/null +++ b/embedded-repo/net/lag/configgy/2.8.0-1.5.5/configgy-2.8.0-1.5.5.pom @@ -0,0 +1,46 @@ + + + 4.0.0 + net.lag + configgy + jar + 2.8.0-1.5.5 + + + Apache 2 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + org.scala-tools + vscaladoc + 1.1-md-3 + compile + + + org.scala-lang + scala-library + 2.8.0 + compile + + + + + PublicReleasesRepository + Public Releases Repository + http://maven/content/groups/public/ + + + PublicSnapshots + Public Snapshots + http://maven/content/groups/public-snapshots/ + + + ScalaToolsMaven2Repository + Scala-Tools Maven2 Repository + http://scala-tools.org/repo-releases/ + + + \ No newline at end of file diff --git a/embedded-repo/org/multiverse/multiverse-alpha/0.6-2010-07-15/multiverse-alpha-0.6-2010-07-15.jar b/embedded-repo/org/multiverse/multiverse-alpha/0.6-2010-07-15/multiverse-alpha-0.6-2010-07-15.jar new file mode 100644 index 0000000000..0929a5bc75 Binary files /dev/null and b/embedded-repo/org/multiverse/multiverse-alpha/0.6-2010-07-15/multiverse-alpha-0.6-2010-07-15.jar differ diff --git a/embedded-repo/org/multiverse/multiverse-alpha/0.6-2010-07-15/multiverse-alpha-0.6-2010-07-15.pom b/embedded-repo/org/multiverse/multiverse-alpha/0.6-2010-07-15/multiverse-alpha-0.6-2010-07-15.pom new file mode 100644 index 0000000000..32e32fc857 --- /dev/null +++ b/embedded-repo/org/multiverse/multiverse-alpha/0.6-2010-07-15/multiverse-alpha-0.6-2010-07-15.pom @@ -0,0 +1,261 @@ + + + 4.0.0 + + multiverse-alpha + Alpha Multiverse STM engine + + Contains an all in one jar that that contains the AlphaStm including the Multiverse + Javaagent and the Multiverse Compiler. This is the JAR you want to include in your + projects, if you do, you don't need to worry about any Multiverse dependency + at all. + + jar + 0.6-2010-07-15 + + + org.multiverse + multiverse + 0.6-2010-07-15 + + + + org.multiverse.javaagent.MultiverseJavaAgent + org.multiverse.stms.alpha.instrumentation.AlphaStmInstrumentor + + + + + + + maven-antrun-plugin + + + + create-main-jar + compile + + run + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Compiles the tests + test-compile + + run + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + ant + ant + 1.7.0 + + + junit + junit + ${junit.version} + + + ant + optional + 1.5.4 + + + com.tonicsystems.jarjar + jarjar + 1.0-rc8 + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + ${multiverse.agentclass} + + ${project.build.outputDirectory}/META-INF/MANIFEST.MF + + + + + + + + + + ${project.groupId} + multiverse-alpha-unborn + ${project.version} + + + ${project.groupId} + multiverse-core + ${project.version} + + + ${project.groupId} + multiverse-instrumentation + ${project.version} + + + ${project.groupId} + multiverse-alpha-unborn + ${project.version} + test + + + + args4j + args4j + ${args4j.version} + provided + + + + com.tonicsystems.jarjar + jarjar + 1.0-rc8 + provided + + + + asm + asm-all + ${asm.version} + + + diff --git a/embedded-repo/org/multiverse/multiverse/0.6-2010-07-15/multiverse-0.6-2010-07-15.pom b/embedded-repo/org/multiverse/multiverse/0.6-2010-07-15/multiverse-0.6-2010-07-15.pom new file mode 100644 index 0000000000..5d89a92a8c --- /dev/null +++ b/embedded-repo/org/multiverse/multiverse/0.6-2010-07-15/multiverse-0.6-2010-07-15.pom @@ -0,0 +1,488 @@ + + + 4.0.0 + + Multiverse Software Transactional Memory + + Multiverse is a Software Transactional Memory implementation that can be used in Java + but also in other languages running on the JVM like Scala or Groovy. Essentially it is a framework that allows + different STM implementation (with different featuresets or performance characteristics) to be used + under the hood. The main STM implementation is multiverse-alpha.. + + org.multiverse + multiverse + 0.6-2010-07-15 + 2008 + pom + + + UTF-8 + 1.6 + 3.2 + 2.0.16 + 4.8.1 + 1.8.2 + + + + + The Apache License, ASL Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0 + + + + + Multiverse + http://multiverse.codehaus.org + + + + + pveentjer + Peter Veentjer + +1 + alarmnummer AT gmail DOTCOM + + Founder + + + + aphillips + Andrew Phillips + +1 + aphillips AT qrmedia DOTCOM + + Committer + + + + + + + maven.atlassian.com + Atlassian Maven Proxy + https://maven.atlassian.com/content/groups/public + + + repo1.maven + Maven Main Repository + http://repo1.maven.org/maven2 + + + maven2-repository.dev.java.net + Java.net Repository for Maven + http://download.java.net/maven/2 + + + java.net + Java.net Legacy Repository for Maven + http://download.java.net/maven/1 + legacy + + + google-maven-repository + Google Maven Repository + http://google-maven-repository.googlecode.com/svn/repository/ + + + repository.codehaus.org + Codehaus Maven Repository + http://repository.codehaus.org + + + ibiblio + http://www.ibiblio.org/maven + + + sourceforge + http://maven-plugins.sourceforge.net/repository + + + + mandubian-mvn + http://mandubian-mvn.googlecode.com/svn/trunk/mandubian-mvn/repository + + + + + + snapshots + http://snapshots.maven.codehaus.org/maven2 + + + + + multiverse-benchy + + multiverse-core + multiverse-core-tests + multiverse-instrumentation + multiverse-alpha-unborn + multiverse-alpha + + multiverse-site + + multiverse-performance-tool + + + + + + + maven-compiler-plugin + + ${sourceEncoding} + ${targetJdk} + ${targetJdk} + + + + maven-resources-plugin + + ${sourceEncoding} + + + + maven-surefire-plugin + + + **/*LongTest.java + **/*longTest.java + **/*StressTest.java + **/*stressTest.java + **/*PerformanceTest.java + **/*performanceTest.java + + + **/*Test.java + + once + + + + + + + maven-enforcer-plugin + + + enforce-java + + enforce + + + + + ${targetJdk} + + + + + + + + + maven-source-plugin + + + attach-sources + + jar + + + + + + + + + + org.apache.maven.wagon + wagon-webdav + 1.0-beta-2 + + + org.apache.maven.wagon + wagon-ftp + 1.0-beta-6 + + + + + + + junit + junit + ${junit.version} + test + + + org.mockito + mockito-all + ${mockito.version} + test + + + + + scm:git:git://git.codehaus.org/multiverse.git + scm:git:ssh://git@git.codehaus.org/multiverse.git + http://git.codehaus.org/gitweb.cgi?p=multiverse.git + + + + Jira + http://jira.codehaus.org/browse/MULTIVERSE + + + + + Development List + dev-subscribe@multiverse.codehaus.org + dev-unsubscribe@multiverse.codehaus.org + dev@multiverse.codehaus.org + http://archive.multiverse.codehaus.org/dev + + + + User List + user-subscribe@multiverse.codehaus.org + user-unsubscribe@multiverse.codehaus.org + user@multiverse.codehaus.org + http://archive.multiverse.codehaus.org/user + + + + Commits List + scm-subscribe@multiverse.codehaus.org + scm-unsubscribe@multiverse.codehaus.org + http://archive.multiverse.codehaus.org/scm + + + + + + + maven-javadoc-plugin + 2.6.1 + + ${sourceEncoding} + true + + + + default + + aggregate + + + + + + org.codehaus.mojo + jxr-maven-plugin + + + org.codehaus.mojo + taglist-maven-plugin + 2.3 + + ${sourceEncoding} + + FIXME + Fixme + fixme + TODO + todo + Todo + @todo + @deprecated + + + + + maven-project-info-reports-plugin + + + maven-changes-plugin + 2.0-beta-3 + + + + changes-report + + + + + ${basedir}/changes.xml + + + + maven-surefire-report-plugin + + false + + + + + report-only + + + + + + org.codehaus.mojo + findbugs-maven-plugin + 2.0.1 + + + maven-pmd-plugin + 2.3 + + ${sourceEncoding} + ${targetJdk} + + + + + + + + + multiverse-releases + Multiverse Central Repository + dav:https://dav.codehaus.org/repository/multiverse/ + + + multiverse-snapshots + Multiverse Central Development Repository + dav:https://dav.codehaus.org/snapshots.repository/multiverse/ + + + http://dist.codehaus.org/multiverse/ + + + + + release + + + + + maven-javadoc-plugin + 2.6.1 + false + + + generate-resources + + aggregate + + + + + ${sourceEncoding} + true + + + + + maven-assembly-plugin + 2.2-beta-2 + false + + + distribution + package + + single + + + + distribution.xml + + + + + + + + + + + stress + + + + org.apache.maven.plugins + maven-surefire-plugin + + + **/*LongTest.java + + -Xmx256m + once + + + + + + + diff --git a/embedded-repo/org/scala-tools/vscaladoc/1.1-md-3/vscaladoc-1.1-md-3.jar b/embedded-repo/org/scala-tools/vscaladoc/1.1-md-3/vscaladoc-1.1-md-3.jar new file mode 100644 index 0000000000..daa3365531 Binary files /dev/null and b/embedded-repo/org/scala-tools/vscaladoc/1.1-md-3/vscaladoc-1.1-md-3.jar differ diff --git a/embedded-repo/org/scala-tools/vscaladoc/1.1-md-3/vscaladoc-1.1-md-3.pom b/embedded-repo/org/scala-tools/vscaladoc/1.1-md-3/vscaladoc-1.1-md-3.pom new file mode 100644 index 0000000000..aa542db6ac --- /dev/null +++ b/embedded-repo/org/scala-tools/vscaladoc/1.1-md-3/vscaladoc-1.1-md-3.pom @@ -0,0 +1,152 @@ + + 4.0.0 + + org.scala-tools + scala-tools-parent + 1.3 + + vscaladoc + 1.1-md-3 + ${project.artifactId} + 2008 + + scm:svn:http://vscaladoc.googlecode.com/svn/tags/vscaladoc-1.1 + scm:svn:https://vscaladoc.googlecode.com/svn/tags/vscaladoc-1.1 + http://code.google.com/p/vscaladoc/source/browse/tags/vscaladoc-1.1 + + + code.google + http://code.google.com/p/vscaladoc/issues/list + + + hudson + http://scala-tools.org/hudson/job/vscaladoc + + + + 2.7.1 + + + + + scala-tools.org + Scala-Tools Maven2 Repository + http://scala-tools.org/repo-releases + + + + + + scala-tools.org + Scala-Tools Maven2 Repository + http://scala-tools.org/repo-releases + + + + + + org.scala-lang + scala-library + ${scala.version} + true + + + org.scala-lang + scala-compiler + ${scala.version} + true + + + junit + junit + 3.8.1 + test + + + + + src/main/scala + src/test/scala + + + org.scala-tools + maven-scala-plugin + + + compile + + compile + testCompile + + + + + + -target:jvm-1.5 + + ${scala.version} + + + + org.apache.maven.plugins + maven-eclipse-plugin + + true + + org.scala-lang:scala-library + + + ch.epfl.lamp.sdt.launching.SCALA_CONTAINER + + + ch.epfl.lamp.sdt.core.scalanature + org.eclipse.jdt.core.javanature + + + ch.epfl.lamp.sdt.core.scalabuilder + + + + + net.sf.alchim + yuicompressor-maven-plugin + + + + compress + + + + + true + + + + true + + true + ${project.build.directory}/classes/org/scala_tools/vscaladoc/_highlighter/shAll.js + + + shCore*.js + shBrush*.js + + + + + + + + + + + org.scala-tools + maven-scala-plugin + + 1.0 + ${scala.version} + + + + + diff --git a/embedded-repo/sbinary/sbinary/2.8.0-0.3.1/sbinary-2.8.0-0.3.1.jar b/embedded-repo/sbinary/sbinary/2.8.0-0.3.1/sbinary-2.8.0-0.3.1.jar new file mode 100644 index 0000000000..5d76ec911a Binary files /dev/null and b/embedded-repo/sbinary/sbinary/2.8.0-0.3.1/sbinary-2.8.0-0.3.1.jar differ diff --git a/embedded-repo/sbinary/sbinary/2.8.0-0.3.1/sbinary-2.8.0-0.3.1.pom b/embedded-repo/sbinary/sbinary/2.8.0-0.3.1/sbinary-2.8.0-0.3.1.pom new file mode 100644 index 0000000000..7a720793cc --- /dev/null +++ b/embedded-repo/sbinary/sbinary/2.8.0-0.3.1/sbinary-2.8.0-0.3.1.pom @@ -0,0 +1,33 @@ + + + 4.0.0 + sbinary + sbinary + jar + 2.8.0-0.3.1 + + + org.scala-lang + scala-library + 2.8.0 + compile + + + + + PublicReleasesRepository + Public Releases Repository + http://maven/content/groups/public/ + + + PublicSnapshots + Public Snapshots + http://maven/content/groups/public-snapshots/ + + + ScalaToolsMaven2Repository + Scala-Tools Maven2 Repository + http://scala-tools.org/repo-releases/ + + + \ No newline at end of file diff --git a/embedded-repo/sjson/json/sjson/0.7-SNAPSHOT-2.8.0/sjson-0.7-SNAPSHOT-2.8.0.jar b/embedded-repo/sjson/json/sjson/0.7-SNAPSHOT-2.8.0/sjson-0.7-SNAPSHOT-2.8.0.jar new file mode 100644 index 0000000000..f0d321a9e3 Binary files /dev/null and b/embedded-repo/sjson/json/sjson/0.7-SNAPSHOT-2.8.0/sjson-0.7-SNAPSHOT-2.8.0.jar differ diff --git a/embedded-repo/sjson/json/sjson/0.7-SNAPSHOT-2.8.0/sjson-0.7-SNAPSHOT-2.8.0.pom b/embedded-repo/sjson/json/sjson/0.7-SNAPSHOT-2.8.0/sjson-0.7-SNAPSHOT-2.8.0.pom new file mode 100644 index 0000000000..e8081164fc --- /dev/null +++ b/embedded-repo/sjson/json/sjson/0.7-SNAPSHOT-2.8.0/sjson-0.7-SNAPSHOT-2.8.0.pom @@ -0,0 +1,9 @@ + + + 4.0.0 + sjson.json + sjson + 0.7-SNAPSHOT-2.8.0 + POM was created from install:install-file + diff --git a/project/build.properties b/project/build.properties index cc8e376f1b..ba020623b4 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1,7 +1,7 @@ project.organization=se.scalablesolutions.akka project.name=akka project.version=0.10 -scala.version=2.8.0.RC3 +scala.version=2.8.0 sbt.version=0.7.4 def.scala.version=2.7.7 -build.scala.versions=2.8.0.RC3 +build.scala.versions=2.8.0 diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index b698a2ec57..8020b848af 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -22,7 +22,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { val SPRING_VERSION = "3.0.3.RELEASE" val CASSANDRA_VERSION = "0.6.1" val LIFT_VERSION = "2.0-scala280-SNAPSHOT" - val SCALATEST_VERSION = "1.2-for-scala-2.8.0.RC3-SNAPSHOT" + val SCALATEST_VERSION = "1.2-for-scala-2.8.0.final-SNAPSHOT" val MULTIVERSE_VERSION = "0.6-SNAPSHOT" // ------------------------------------------------------------ @@ -84,9 +84,6 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { lazy val akka_kernel = project("akka-kernel", "akka-kernel", new AkkaKernelProject(_), akka_core, akka_http, akka_spring, akka_camel, akka_persistence, akka_amqp) - // active object tests in java - lazy val akka_active_object_test = project("akka-active-object-test", "akka-active-object-test", new AkkaActiveObjectTestProject(_), akka_kernel) - // examples lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_)) @@ -182,10 +179,10 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { val netty = "org.jboss.netty" % "netty" % "3.2.1.Final" % "compile" val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" val commons_io = "commons-io" % "commons-io" % "1.4" % "compile" - val dispatch_json = "net.databinder" % "dispatch-json_2.8.0.RC3" % "0.7.4" % "compile" - val dispatch_http = "net.databinder" % "dispatch-http_2.8.0.RC3" % "0.7.4" % "compile" - val sjson = "sjson.json" % "sjson" % "0.6-SNAPSHOT-2.8.RC3" % "compile" - val sbinary = "sbinary" % "sbinary" % "2.8.0.RC3-0.3.1-SNAPSHOT" % "compile" + val dispatch_json = "net.databinder" % "dispatch-json_2.8.0" % "0.7.4" % "compile" + val dispatch_http = "net.databinder" % "dispatch-http_2.8.0" % "0.7.4" % "compile" + val sjson = "sjson.json" % "sjson" % "0.7-SNAPSHOT-2.8.0" % "compile" + val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile" val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.2.1" % "compile" val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % "1.2.1" % "compile" val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile" @@ -193,7 +190,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { val jta_1_1 = "org.apache.geronimo.specs" % "geronimo-jta_1.1_spec" % "1.1.1" % "compile" intransitive() val werkz = "org.codehaus.aspectwerkz" % "aspectwerkz-nodeps-jdk5" % "2.2.1" % "compile" val werkz_core = "org.codehaus.aspectwerkz" % "aspectwerkz-jdk5" % "2.2.1" % "compile" - val configgy = "net.lag" % "configgy" % "2.8.0.RC3-1.5.2-SNAPSHOT" % "compile" + val configgy = "net.lag" % "configgy" % "2.8.0-1.5.5" % "compile" val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile" val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile" val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" % "compile" @@ -207,7 +204,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { class AkkaAMQPProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with CodeFellowPlugin { val commons_io = "commons-io" % "commons-io" % "1.4" % "compile" - val rabbit = "com.rabbitmq" % "amqp-client" % "1.8.0" % "compile" + val rabbit = "com.rabbitmq" % "amqp-client" % "1.8.1" % "compile" // testing val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "test" intransitive() @@ -250,7 +247,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { } class AkkaRedisProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { - val redis = "com.redis" % "redisclient" % "2.8.0.RC3-1.4" % "compile" + val redis = "com.redis" % "redisclient" % "2.8.0-1.4" % "compile" val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil } @@ -304,13 +301,6 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { val jta_spec = "org.apache.geronimo.specs" % "geronimo-jta_1.1_spec" % "1.1.1" % "compile" intransitive() } - // ================= TEST ================== - class AkkaActiveObjectTestProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with CodeFellowPlugin { - // testing - val junit = "junit" % "junit" % "4.5" % "test" - val jmock = "org.jmock" % "jmock" % "2.4.0" % "test" - } - // ================= EXAMPLES ================== class AkkaSampleAntsProject(info: ProjectInfo) extends DefaultSpdeProject(info) with CodeFellowPlugin { val scalaToolsSnapshots = ScalaToolsSnapshots @@ -339,10 +329,20 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { } class AkkaSampleCamelProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) with CodeFellowPlugin { - val spring_jms = "org.springframework" % "spring-jms" % SPRING_VERSION % "compile" - val camel_jetty = "org.apache.camel" % "camel-jetty" % CAMEL_VERSION % "compile" - val camel_jms = "org.apache.camel" % "camel-jms" % CAMEL_VERSION % "compile" - val activemq_core = "org.apache.activemq" % "activemq-core" % "5.3.2" % "compile" + override def ivyXML = + + + + + + + + + + + + + } class AkkaSampleSecurityProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) with CodeFellowPlugin {