diff --git a/akka-actors/pom.xml b/akka-actors/pom.xml
index 5138a48ccb..30362fefff 100644
--- a/akka-actors/pom.xml
+++ b/akka-actors/pom.xml
@@ -74,6 +74,11 @@
+
+ org.h2.compress
+ h2-lzf
+ 1.0
+
org.codehaus.jackson
jackson-core-asl
diff --git a/akka-actors/src/main/scala/nio/RemoteClient.scala b/akka-actors/src/main/scala/nio/RemoteClient.scala
index a77e975bcc..99872beb33 100644
--- a/akka-actors/src/main/scala/nio/RemoteClient.scala
+++ b/akka-actors/src/main/scala/nio/RemoteClient.scala
@@ -129,10 +129,18 @@ class RemoteClientPipelineFactory(name: String,
def getPipeline: ChannelPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("timeout", new ReadTimeoutHandler(RemoteClient.TIMER, RemoteClient.READ_TIMEOUT))
- if (RemoteServer.COMPRESSION) pipeline.addLast("zlibDecoder", new ZlibDecoder())
+ RemoteServer.COMPRESSION_SCHEME match {
+ case "zlib" => pipeline.addLast("zlibDecoder", new ZlibDecoder)
+ //case "lzf" => pipeline.addLast("lzfDecoder", new LzfDecoder)
+ case _ => {} // no compression
+ }
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4))
pipeline.addLast("protobufDecoder", new ProtobufDecoder(RemoteReply.getDefaultInstance))
- if (RemoteServer.COMPRESSION) pipeline.addLast("zlibEncoder", new ZlibEncoder())
+ RemoteServer.COMPRESSION_SCHEME match {
+ case "zlib" => pipeline.addLast("zlibEncoder", new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL))
+ //case "lzf" => pipeline.addLast("lzfEncoder", new LzfEncoder)
+ case _ => {} // no compression
+ }
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4))
pipeline.addLast("protobufEncoder", new ProtobufEncoder())
pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap))
diff --git a/akka-actors/src/main/scala/nio/RemoteServer.scala b/akka-actors/src/main/scala/nio/RemoteServer.scala
index 6534d2b93c..c9528dc6a4 100755
--- a/akka-actors/src/main/scala/nio/RemoteServer.scala
+++ b/akka-actors/src/main/scala/nio/RemoteServer.scala
@@ -26,7 +26,14 @@ import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder}
object RemoteServer extends Logging {
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 9999)
- val COMPRESSION = config.getBool("akka.remote.compression", true)
+ val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib")
+ val ZLIB_COMPRESSION_LEVEL = {
+ val level = config.getInt("akka.remote.zlib-compression-level", 6)
+ if (level < 1 && level > 9) throw new IllegalArgumentException(
+ "zlib compression level has to be within 1-9, with 1 being fastest and 9 being the most compressed")
+ level
+ }
+
val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000)
private var hostname = HOSTNAME
@@ -73,10 +80,18 @@ class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader])
extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
val pipeline = Channels.pipeline()
- if (RemoteServer.COMPRESSION) pipeline.addLast("zlibDecoder", new ZlibDecoder())
+ RemoteServer.COMPRESSION_SCHEME match {
+ case "zlib" => pipeline.addLast("zlibDecoder", new ZlibDecoder)
+ //case "lzf" => pipeline.addLast("lzfDecoder", new LzfDecoder)
+ case _ => {} // no compression
+ }
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4))
pipeline.addLast("protobufDecoder", new ProtobufDecoder(RemoteRequest.getDefaultInstance))
- if (RemoteServer.COMPRESSION) pipeline.addLast("zlibEncoder", new ZlibEncoder())
+ RemoteServer.COMPRESSION_SCHEME match {
+ case "zlib" => pipeline.addLast("zlibEncoder", new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL))
+ //case "lzf" => pipeline.addLast("lzfEncoder", new LzfEncoder)
+ case _ => {} // no compression
+ }
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4))
pipeline.addLast("protobufEncoder", new ProtobufEncoder)
pipeline.addLast("handler", new RemoteServerHandler(name, loader))
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index c6fbb75cb4..8ca35927e9 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -22,28 +22,26 @@
boot = ["sample.java.Boot", "sample.scala.Boot", "se.scalablesolutions.akka.security.samples.Boot"]
- timeout = 5000 # default timeout for future based invocations
- serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
+ timeout = 5000 # default timeout for future based invocations
+ serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
service = on
- max-nr-of-retries = 10
- restart-on-collision = off # (not implemented yet) if 'on' then it reschedules the transaction,
- # if 'off' then throws an exception or rollback for user to handle
- wait-for-completion = 1000 # how long time in millis a transaction should be given time to complete when a collision is detected
- wait-nr-of-times = 3 # the number of times it should check for completion of a pending transaction upon collision
- distributed = off # not implemented yet
+ distributed = off # not implemented yet
- compression = on # turn on/off zlib compression
+ compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression
+ zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
+
service = on
hostname = "localhost"
port = 9999
connection-timeout = 1000 # in millis (1 sec default)
+
reconnect-delay = 5000 # in millis (5 sec default)
read-timeout = 10000 # in millis (10 sec default)
@@ -65,6 +63,7 @@
storage-format = "scala-json" # Options: java, scala-json, java-json, protobuf
consistency-level = "QUORUM" # Options: ZERO, ONE, QUORUM, ALL
+
hostname = "127.0.0.1" # IP address or hostname of the MongoDB DB instance
port = 27017