From 08cf576225ab6ef71b5fea493309977667d00ef8 Mon Sep 17 00:00:00 2001 From: jboner Date: Sun, 22 Nov 2009 14:32:27 +0100 Subject: [PATCH] added compression level config options --- akka-actors/pom.xml | 5 +++++ .../src/main/scala/nio/RemoteClient.scala | 12 +++++++++-- .../src/main/scala/nio/RemoteServer.scala | 21 ++++++++++++++++--- config/akka-reference.conf | 17 +++++++-------- 4 files changed, 41 insertions(+), 14 deletions(-) diff --git a/akka-actors/pom.xml b/akka-actors/pom.xml index 5138a48ccb..30362fefff 100644 --- a/akka-actors/pom.xml +++ b/akka-actors/pom.xml @@ -74,6 +74,11 @@ + + org.h2.compress + h2-lzf + 1.0 + org.codehaus.jackson jackson-core-asl diff --git a/akka-actors/src/main/scala/nio/RemoteClient.scala b/akka-actors/src/main/scala/nio/RemoteClient.scala index a77e975bcc..99872beb33 100644 --- a/akka-actors/src/main/scala/nio/RemoteClient.scala +++ b/akka-actors/src/main/scala/nio/RemoteClient.scala @@ -129,10 +129,18 @@ class RemoteClientPipelineFactory(name: String, def getPipeline: ChannelPipeline = { val pipeline = Channels.pipeline() pipeline.addLast("timeout", new ReadTimeoutHandler(RemoteClient.TIMER, RemoteClient.READ_TIMEOUT)) - if (RemoteServer.COMPRESSION) pipeline.addLast("zlibDecoder", new ZlibDecoder()) + RemoteServer.COMPRESSION_SCHEME match { + case "zlib" => pipeline.addLast("zlibDecoder", new ZlibDecoder) + //case "lzf" => pipeline.addLast("lzfDecoder", new LzfDecoder) + case _ => {} // no compression + } pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)) pipeline.addLast("protobufDecoder", new ProtobufDecoder(RemoteReply.getDefaultInstance)) - if (RemoteServer.COMPRESSION) pipeline.addLast("zlibEncoder", new ZlibEncoder()) + RemoteServer.COMPRESSION_SCHEME match { + case "zlib" => pipeline.addLast("zlibEncoder", new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)) + //case "lzf" => pipeline.addLast("lzfEncoder", new LzfEncoder) + case _ => {} // no compression + } pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)) pipeline.addLast("protobufEncoder", new ProtobufEncoder()) pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap)) diff --git a/akka-actors/src/main/scala/nio/RemoteServer.scala b/akka-actors/src/main/scala/nio/RemoteServer.scala index 6534d2b93c..c9528dc6a4 100755 --- a/akka-actors/src/main/scala/nio/RemoteServer.scala +++ b/akka-actors/src/main/scala/nio/RemoteServer.scala @@ -26,7 +26,14 @@ import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder} object RemoteServer extends Logging { val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") val PORT = config.getInt("akka.remote.server.port", 9999) - val COMPRESSION = config.getBool("akka.remote.compression", true) + val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib") + val ZLIB_COMPRESSION_LEVEL = { + val level = config.getInt("akka.remote.zlib-compression-level", 6) + if (level < 1 && level > 9) throw new IllegalArgumentException( + "zlib compression level has to be within 1-9, with 1 being fastest and 9 being the most compressed") + level + } + val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000) private var hostname = HOSTNAME @@ -73,10 +80,18 @@ class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader]) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { val pipeline = Channels.pipeline() - if (RemoteServer.COMPRESSION) pipeline.addLast("zlibDecoder", new ZlibDecoder()) + RemoteServer.COMPRESSION_SCHEME match { + case "zlib" => pipeline.addLast("zlibDecoder", new ZlibDecoder) + //case "lzf" => pipeline.addLast("lzfDecoder", new LzfDecoder) + case _ => {} // no compression + } pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)) pipeline.addLast("protobufDecoder", new ProtobufDecoder(RemoteRequest.getDefaultInstance)) - if (RemoteServer.COMPRESSION) pipeline.addLast("zlibEncoder", new ZlibEncoder()) + RemoteServer.COMPRESSION_SCHEME match { + case "zlib" => pipeline.addLast("zlibEncoder", new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)) + //case "lzf" => pipeline.addLast("lzfEncoder", new LzfEncoder) + case _ => {} // no compression + } pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)) pipeline.addLast("protobufEncoder", new ProtobufEncoder) pipeline.addLast("handler", new RemoteServerHandler(name, loader)) diff --git a/config/akka-reference.conf b/config/akka-reference.conf index c6fbb75cb4..8ca35927e9 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -22,28 +22,26 @@ boot = ["sample.java.Boot", "sample.scala.Boot", "se.scalablesolutions.akka.security.samples.Boot"] - timeout = 5000 # default timeout for future based invocations - serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability + timeout = 5000 # default timeout for future based invocations + serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability service = on - max-nr-of-retries = 10 - restart-on-collision = off # (not implemented yet) if 'on' then it reschedules the transaction, - # if 'off' then throws an exception or rollback for user to handle - wait-for-completion = 1000 # how long time in millis a transaction should be given time to complete when a collision is detected - wait-nr-of-times = 3 # the number of times it should check for completion of a pending transaction upon collision - distributed = off # not implemented yet + distributed = off # not implemented yet - compression = on # turn on/off zlib compression + compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression + zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6 + service = on hostname = "localhost" port = 9999 connection-timeout = 1000 # in millis (1 sec default) + reconnect-delay = 5000 # in millis (5 sec default) read-timeout = 10000 # in millis (10 sec default) @@ -65,6 +63,7 @@ storage-format = "scala-json" # Options: java, scala-json, java-json, protobuf consistency-level = "QUORUM" # Options: ZERO, ONE, QUORUM, ALL + hostname = "127.0.0.1" # IP address or hostname of the MongoDB DB instance port = 27017