diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index 2ed1f26764..db80fa641b 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -219,7 +219,6 @@ trait Actor extends TransactionManagement { implicit protected val self: Actor = this - // FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait // Only mutable for RemoteServer in order to maintain identity across nodes private[akka] var _uuid = Uuid.newUuid.toString def uuid = _uuid @@ -228,10 +227,11 @@ trait Actor extends TransactionManagement { // private fields // ==================================== - @volatile private var _isRunning = false - @volatile private var _isSuspended = true - @volatile private var _isShutDown: Boolean = false - private var _isEventBased: Boolean = false + @volatile private[this] var _isRunning = false + @volatile private[this] var _isSuspended = true + @volatile private[this] var _isShutDown = false + @volatile private[this] var _isEventBased: Boolean = false + @volatile private[akka] var _isKilled = false private var _hotswap: Option[PartialFunction[Any, Unit]] = None private[akka] var _remoteAddress: Option[InetSocketAddress] = None private[akka] var _linkedActors: Option[HashSet[Actor]] = None @@ -253,14 +253,14 @@ trait Actor extends TransactionManagement { * message exchanges and which is in many ways better than using the '!!' method * which will make the sender wait for a reply using a *blocking* future. */ - protected[this] var sender: Option[Actor] = None + protected var sender: Option[Actor] = None /** * The 'senderFuture' field should normally not be touched by user code, which should instead use the 'reply' method. * But it can be used for advanced use-cases when one might want to store away the future and * resolve it later and/or somewhere else. */ - protected[this] var senderFuture: Option[CompletableFutureResult] = None + protected var senderFuture: Option[CompletableFutureResult] = None // ==================================== // ==== USER CALLBACKS TO OVERRIDE ==== @@ -317,7 +317,7 @@ trait Actor extends TransactionManagement { * trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError]) * */ - protected[this] var trapExit: List[Class[_ <: Throwable]] = Nil + protected var trapExit: List[Class[_ <: Throwable]] = Nil /** * User overridable callback/setting. @@ -491,11 +491,13 @@ trait Actor extends TransactionManagement { /** * Same as the '!' method but does not take an implicit sender as second parameter. */ - def send(message: Any) = + def send(message: Any) = { + if (_isKilled) throw new ActorKilledException(this) if (_isRunning) postMessageToMailbox(message, None) else throw new IllegalStateException( "Actor has not been started, you need to invoke 'actor.start' before using it") - + } + /** * Sends a message asynchronously and waits on a future for a reply message. *
@@ -508,21 +510,24 @@ trait Actor extends TransactionManagement { * If you are sending messages using!! then you have to use reply(..)
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
- def !: Option[T] = if (_isRunning) {
- val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
- val isActiveObject = message.isInstanceOf[Invocation]
- if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.completeWithResult(None)
- try {
- future.await
- } catch {
- case e: FutureTimeoutException =>
- if (isActiveObject) throw e
- else None
- }
- getResultOrThrowException(future)
- } else throw new IllegalStateException(
- "Actor has not been started, you need to invoke 'actor.start' before using it")
-
+ def !: Option[T] = {
+ if (_isKilled) throw new ActorKilledException(this)
+ if (_isRunning) {
+ val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
+ val isActiveObject = message.isInstanceOf[Invocation]
+ if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.completeWithResult(None)
+ try {
+ future.await
+ } catch {
+ case e: FutureTimeoutException =>
+ if (isActiveObject) throw e
+ else None
+ }
+ getResultOrThrowException(future)
+ } else throw new IllegalStateException(
+ "Actor has not been started, you need to invoke 'actor.start' before using it")
+ }
+
/**
* Sends a message asynchronously and waits on a future for a reply message.
*
@@ -869,6 +874,7 @@ trait Actor extends TransactionManagement {
else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString)
} catch {
case e =>
+ _isKilled = true
Actor.log.error(e, "Could not invoke actor [%s]", this)
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
@@ -970,6 +976,7 @@ trait Actor extends TransactionManagement {
preRestart(reason)
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
postRestart(reason)
+ _isKilled = false
}
private[akka] def registerSupervisorAsRemoteActor: Option[String] = synchronized {
diff --git a/akka-actors/src/main/scala/remote/LzfCompression.scala b/akka-actors/src/main/scala/remote/LzfCompression.scala
deleted file mode 100644
index 04d2c615de..0000000000
--- a/akka-actors/src/main/scala/remote/LzfCompression.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Copyright (C) 2009 Scalable Solutions.
- */
-
-package se.scalablesolutions.akka.remote
-
-import org.h2.compress.{LZFInputStream, LZFOutputStream}
-
-import org.jboss.netty.channel.{Channel, ChannelHandlerContext, ChannelPipelineCoverage}
-import org.jboss.netty.buffer.{ChannelBufferOutputStream, ChannelBufferInputStream, ChannelBuffer}
-import org.jboss.netty.handler.codec.oneone.{OneToOneEncoder, OneToOneDecoder};
-
-@ChannelPipelineCoverage("all")
-class LzfDecoder extends OneToOneDecoder {
- override protected def decode(ctx: ChannelHandlerContext, channel: Channel, message: AnyRef) = {
- if (!(message.isInstanceOf[ChannelBuffer])) message
- else {
- new LZFInputStream(new ChannelBufferInputStream(message.asInstanceOf[ChannelBuffer]))
- }
- }
-}
-
-@ChannelPipelineCoverage("all")
-class LzfEncoder extends OneToOneEncoder {
- override protected def encode(ctx: ChannelHandlerContext, channel: Channel, message: AnyRef) = {
- if (!(message.isInstanceOf[ChannelBuffer])) message
- else new LZFOutputStream(new ChannelBufferOutputStream(message.asInstanceOf[ChannelBuffer]))
- }
-}
diff --git a/akka-actors/src/main/scala/remote/RemoteClient.scala b/akka-actors/src/main/scala/remote/RemoteClient.scala
index adb8a65ce3..cf9182646b 100644
--- a/akka-actors/src/main/scala/remote/RemoteClient.scala
+++ b/akka-actors/src/main/scala/remote/RemoteClient.scala
@@ -102,8 +102,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
connection.getChannel.getCloseFuture.awaitUninterruptibly
channelFactory.releaseExternalResources
}
-
- timer.stop
+ timer.stop
}
def send(request: RemoteRequest): Option[CompletableFutureResult] = if (isRunning) {
@@ -169,7 +168,7 @@ class RemoteClientHandler(val name: String,
val futures: ConcurrentMap[Long, CompletableFutureResult],
val supervisors: ConcurrentMap[String, Actor],
val bootstrap: ClientBootstrap,
- val timer: HashedWheelTimer)
+ val timer: HashedWheelTimer)
extends SimpleChannelUpstreamHandler with Logging {
import Actor.Sender.Self
diff --git a/akka-actors/src/main/scala/remote/RemoteServer.scala b/akka-actors/src/main/scala/remote/RemoteServer.scala
index f297475ad4..1145689683 100755
--- a/akka-actors/src/main/scala/remote/RemoteServer.scala
+++ b/akka-actors/src/main/scala/remote/RemoteServer.scala
@@ -25,14 +25,19 @@ import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder}
* Use this object if you need a single remote server on a specific node.
*
* + * // takes hostname and port from 'akka.conf' * RemoteNode.start ** + *
+ * RemoteNode.start(hostname, port) + *+ * * If you need to create more than one, then you can use the RemoteServer: * *
* val server = new RemoteServer - * server.start + * server.start(hostname, port) ** * @author Jonas Bonér @@ -201,8 +206,7 @@ class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val appl private def dispatchToActor(request: RemoteRequest, channel: Channel) = { log.debug("Dispatching to remote actor [%s]", request.getTarget) val actor = createActor(request.getTarget, request.getUuid, request.getTimeout) - actor.start - + val message = RemoteProtocolBuilder.getMessage(request) if (request.getIsOneWay) { if (request.hasSourceHostname && request.hasSourcePort) { @@ -360,6 +364,7 @@ class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val appl newInstance.timeout = timeout newInstance._remoteAddress = None actors.put(uuid, newInstance) + newInstance.start newInstance } catch { case e => diff --git a/akka-actors/src/main/scala/serialization/Compression.scala b/akka-actors/src/main/scala/serialization/Compression.scala new file mode 100644 index 0000000000..9cc2649742 --- /dev/null +++ b/akka-actors/src/main/scala/serialization/Compression.scala @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.serialization + +/** + * @author Jonas Bonér + */ +object Compression { + + /** + * @author Jonas Bonér + */ + object LZF { + import voldemort.store.compress.lzf._ + def compress(bytes: Array[Byte]): Array[Byte] = LZFEncoder.encode(bytes) + def uncompress(bytes: Array[Byte]): Array[Byte] = LZFDecoder.decode(bytes) + } +} + diff --git a/akka-actors/src/main/scala/serialization/Serializer.scala b/akka-actors/src/main/scala/serialization/Serializer.scala index 643855a141..e6b791f168 100644 --- a/akka-actors/src/main/scala/serialization/Serializer.scala +++ b/akka-actors/src/main/scala/serialization/Serializer.scala @@ -10,7 +10,7 @@ import com.google.protobuf.Message import org.codehaus.jackson.map.ObjectMapper -import sjson.json.{Serializer =>SJSONSerializer} +import sjson.json.{Serializer => SJSONSerializer} /** * @author Jonas Bonér diff --git a/akka-rest/src/main/resources/META-INF/services/javax.ws.rs.ext.MessageBodyWriter_FIXME b/akka-rest/src/main/resources/META-INF/services/javax.ws.rs.ext.MessageBodyWriter_FIXME new file mode 100644 index 0000000000..f88c0c8601 --- /dev/null +++ b/akka-rest/src/main/resources/META-INF/services/javax.ws.rs.ext.MessageBodyWriter_FIXME @@ -0,0 +1 @@ +se.scalablesolutions.akka.rest.ListWriter \ No newline at end of file diff --git a/embedded-repo/org/h2/compress/h2-lzf/1.0/h2-lzf-1.0.jar b/embedded-repo/org/h2/compress/h2-lzf/1.0/h2-lzf-1.0.jar index 620cfb1371..5d74200e45 100644 Binary files a/embedded-repo/org/h2/compress/h2-lzf/1.0/h2-lzf-1.0.jar and b/embedded-repo/org/h2/compress/h2-lzf/1.0/h2-lzf-1.0.jar differ diff --git a/embedded-repo/org/h2/compress/h2-lzf/1.0/h2-lzf-1.0.pom b/embedded-repo/org/h2/compress/h2-lzf/1.0/h2-lzf-1.0.pom new file mode 100644 index 0000000000..b24cca8b9c --- /dev/null +++ b/embedded-repo/org/h2/compress/h2-lzf/1.0/h2-lzf-1.0.pom @@ -0,0 +1,8 @@ + +