From 056cb478f4f2020f7bfb719a986471c94b7c2307 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Wed, 30 Dec 2009 09:24:10 +0100 Subject: [PATCH] Added test for forward of !! messages + Added StaticChannelPipeline for RemoteClient --- .../src/main/scala/remote/RemoteClient.scala | 23 +++++++++++-- .../src/main/scala/remote/RemoteServer.scala | 7 ++-- akka-core/src/test/scala/ForwardActor.scala | 33 +++++++++++++++---- 3 files changed, 51 insertions(+), 12 deletions(-) diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index fa291d343b..ef3a7e37f1 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -9,7 +9,7 @@ import scala.collection.mutable.HashMap import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} import se.scalablesolutions.akka.actor.{Exit, Actor} import se.scalablesolutions.akka.dispatch.{DefaultCompletableFutureResult, CompletableFutureResult} -import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.util.{UUID, Logging} import se.scalablesolutions.akka.Config.config import org.jboss.netty.channel._ @@ -25,13 +25,12 @@ import java.net.{SocketAddress, InetSocketAddress} import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap} import java.util.concurrent.atomic.AtomicLong -import org.codehaus.aspectwerkz.proxy.Uuid /** * @author Jonas Bonér */ object RemoteRequestIdFactory { - private val nodeId = Uuid.newUuid + private val nodeId = UUID.newUuid private val id = new AtomicLong def nextId: Long = id.getAndIncrement + nodeId } @@ -148,6 +147,7 @@ class RemoteClientPipelineFactory(name: String, timer: HashedWheelTimer, client: RemoteClient) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { + /* val pipeline = Channels.pipeline() pipeline.addLast("timeout", new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT)) RemoteServer.COMPRESSION_SCHEME match { @@ -166,6 +166,23 @@ class RemoteClientPipelineFactory(name: String, pipeline.addLast("protobufEncoder", new ProtobufEncoder()) pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client)) pipeline + */ + val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT) + val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) + val lenPrep = new LengthFieldPrepender(4) + val protobufDec = new ProtobufDecoder(RemoteReply.getDefaultInstance) + val protobufEnc = new ProtobufEncoder + val zipCodec = RemoteServer.COMPRESSION_SCHEME match { + case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder)) + //case "lzf" => Some(Codec(new LzfEncoder, new LzfDecoder)) + case _ => None + } + val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client) + + val stages: Array[ChannelHandler] = + zipCodec.map(codec => Array(timeout, codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteClient)) + .getOrElse(Array(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient)) + new StaticChannelPipeline(stages: _*) } } diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 22e3927629..46f6f13eb7 100755 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -188,14 +188,15 @@ class RemoteServerPipelineFactory( val protobufDec = new ProtobufDecoder(RemoteRequest.getDefaultInstance) val protobufEnc = new ProtobufEncoder val zipCodec = RemoteServer.COMPRESSION_SCHEME match { - case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL),new ZlibDecoder)) + case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder)) //case "lzf" => Some(Codec(new LzfEncoder, new LzfDecoder)) case _ => None } val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, activeObjects) - val stages: Array[ChannelHandler] = zipCodec.map(codec => Array(codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteServer)) - .getOrElse(Array(lenDec, protobufDec, lenPrep, protobufEnc, remoteServer)) + val stages: Array[ChannelHandler] = + zipCodec.map(codec => Array(codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteServer)) + .getOrElse(Array(lenDec, protobufDec, lenPrep, protobufEnc, remoteServer)) new StaticChannelPipeline(stages: _*) } } diff --git a/akka-core/src/test/scala/ForwardActor.scala b/akka-core/src/test/scala/ForwardActor.scala index d44c53d63b..ff493c80e8 100644 --- a/akka-core/src/test/scala/ForwardActor.scala +++ b/akka-core/src/test/scala/ForwardActor.scala @@ -8,37 +8,58 @@ class ForwardActorTest extends JUnitSuite { object ForwardState { var sender: Actor = null + var result: String = "nada" } class ReceiverActor extends Actor { def receive = { - case "Send" => ForwardState.sender = sender.get + case "SendBang" => ForwardState.sender = sender.get + case "SendBangBang" => reply("SendBangBang") } } + class ForwardActor extends Actor { val receiverActor = new ReceiverActor receiverActor.start def receive = { - case "Send" => receiverActor.forward("Send") + case "SendBang" => receiverActor.forward("SendBang") + case "SendBangBang" => receiverActor.forward("SendBangBang") } } - class SenderActor extends Actor { + class BangSenderActor extends Actor { val forwardActor = new ForwardActor forwardActor.start - forwardActor ! "Send" + forwardActor ! "SendBang" + def receive = { + case _ => {} + } + } + + class BangBangSenderActor extends Actor { + val forwardActor = new ForwardActor + forwardActor.start + ForwardState.result = (forwardActor !! "SendBangBang").getOrElse("nada") def receive = { case _ => {} } } @Test - def shouldForwardActorReferenceWhenInvokingForward = { - val senderActor = new SenderActor + def shouldForwardActorReferenceWhenInvokingForwardOnBang = { + val senderActor = new BangSenderActor senderActor.start Thread.sleep(1000) assert(ForwardState.sender ne null) assert(senderActor === ForwardState.sender) } + + @Test + def shouldForwardActorReferenceWhenInvokingForwardOnBangBang = { + val senderActor = new BangBangSenderActor + senderActor.start + Thread.sleep(1000) + assert(ForwardState.result === "SendBangBang") + } }