diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index ef3a7e37f1..e2f1e6d032 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -25,7 +25,6 @@ import java.net.{SocketAddress, InetSocketAddress} import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap} import java.util.concurrent.atomic.AtomicLong - /** * @author Jonas Bonér */ @@ -147,26 +146,6 @@ class RemoteClientPipelineFactory(name: String, timer: HashedWheelTimer, client: RemoteClient) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { - /* - val pipeline = Channels.pipeline() - pipeline.addLast("timeout", new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT)) - RemoteServer.COMPRESSION_SCHEME match { - case "zlib" => pipeline.addLast("zlibDecoder", new ZlibDecoder) - //case "lzf" => pipeline.addLast("lzfDecoder", new LzfDecoder) - case _ => {} // no compression - } - pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)) - pipeline.addLast("protobufDecoder", new ProtobufDecoder(RemoteReply.getDefaultInstance)) - RemoteServer.COMPRESSION_SCHEME match { - case "zlib" => pipeline.addLast("zlibEncoder", new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)) - //case "lzf" => pipeline.addLast("lzfEncoder", new LzfEncoder) - case _ => {} // no compression - } - pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)) - pipeline.addLast("protobufEncoder", new ProtobufEncoder()) - pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client)) - pipeline - */ val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT) val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) diff --git a/akka-samples-chat/src/main/scala/ChatServer.scala b/akka-samples-chat/src/main/scala/ChatServer.scala index 81794b2ea8..039cece3fa 100644 --- a/akka-samples-chat/src/main/scala/ChatServer.scala +++ b/akka-samples-chat/src/main/scala/ChatServer.scala @@ -67,29 +67,9 @@ class Session(user: String, storage: Actor) extends Actor { } /** - * Chat storage holding the chat log. + * Abstraction of Chat storage holding the chat log. */ -class Storage extends Actor { - lifeCycle = Some(LifeCycle(Permanent)) - - private val chatLog = RedisStorage.getVector("akka.chat.log") - - log.info("Redis-based chat storage is starting up...") - - def receive = { - case msg @ ChatMessage(from, message) => - log.debug("New chat message [%s]", message) - atomic { - chatLog + message.getBytes("UTF-8") - } - - case GetChatLog(_) => - val messageList = atomic { - chatLog.map(bytes => new String(bytes, "UTF-8")).toList - } - reply(ChatLog(messageList)) - } -} +trait Storage /** * Implements user session management. @@ -137,8 +117,6 @@ trait ChatManagement { this: Actor => * Chat server. Manages sessions and redirects all other messages to the Session for the client. */ trait ChatServer extends Actor { - id = "ChatServer" // setting ID to make sure there is only one single ChatServer on the remote node - faultHandler = Some(OneForOneStrategy(5, 5000)) trapExit = List(classOf[Exception]) @@ -162,6 +140,31 @@ trait ChatServer extends Actor { } } +/** + * Redis-backed storage implementation. + */ +class RedisStorage extends Actor with Storage { + lifeCycle = Some(LifeCycle(Permanent)) + + private val chatLog = RedisStorage.getVector("akka.chat.log") + + log.info("Redis-based chat storage is starting up...") + + def receive = { + case msg @ ChatMessage(from, message) => + log.debug("New chat message [%s]", message) + atomic { + chatLog + message.getBytes("UTF-8") + } + + case GetChatLog(_) => + val messageList = atomic { + chatLog.map(bytes => new String(bytes, "UTF-8")).toList + } + reply(ChatLog(messageList)) + } +} + /** * Object encapsulating the full Chat Service. */