Added test for forward of !! messages + Added StaticChannelPipeline for RemoteClient
This commit is contained in:
parent
835428e4b9
commit
1f71e5d88f
3 changed files with 51 additions and 12 deletions
|
|
@ -9,7 +9,7 @@ import scala.collection.mutable.HashMap
|
||||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
|
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
|
||||||
import se.scalablesolutions.akka.actor.{Exit, Actor}
|
import se.scalablesolutions.akka.actor.{Exit, Actor}
|
||||||
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFutureResult, CompletableFutureResult}
|
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFutureResult, CompletableFutureResult}
|
||||||
import se.scalablesolutions.akka.util.Logging
|
import se.scalablesolutions.akka.util.{UUID, Logging}
|
||||||
import se.scalablesolutions.akka.Config.config
|
import se.scalablesolutions.akka.Config.config
|
||||||
|
|
||||||
import org.jboss.netty.channel._
|
import org.jboss.netty.channel._
|
||||||
|
|
@ -25,13 +25,12 @@ import java.net.{SocketAddress, InetSocketAddress}
|
||||||
import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap}
|
import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap}
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
|
|
||||||
import org.codehaus.aspectwerkz.proxy.Uuid
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
object RemoteRequestIdFactory {
|
object RemoteRequestIdFactory {
|
||||||
private val nodeId = Uuid.newUuid
|
private val nodeId = UUID.newUuid
|
||||||
private val id = new AtomicLong
|
private val id = new AtomicLong
|
||||||
def nextId: Long = id.getAndIncrement + nodeId
|
def nextId: Long = id.getAndIncrement + nodeId
|
||||||
}
|
}
|
||||||
|
|
@ -148,6 +147,7 @@ class RemoteClientPipelineFactory(name: String,
|
||||||
timer: HashedWheelTimer,
|
timer: HashedWheelTimer,
|
||||||
client: RemoteClient) extends ChannelPipelineFactory {
|
client: RemoteClient) extends ChannelPipelineFactory {
|
||||||
def getPipeline: ChannelPipeline = {
|
def getPipeline: ChannelPipeline = {
|
||||||
|
/*
|
||||||
val pipeline = Channels.pipeline()
|
val pipeline = Channels.pipeline()
|
||||||
pipeline.addLast("timeout", new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT))
|
pipeline.addLast("timeout", new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT))
|
||||||
RemoteServer.COMPRESSION_SCHEME match {
|
RemoteServer.COMPRESSION_SCHEME match {
|
||||||
|
|
@ -166,6 +166,23 @@ class RemoteClientPipelineFactory(name: String,
|
||||||
pipeline.addLast("protobufEncoder", new ProtobufEncoder())
|
pipeline.addLast("protobufEncoder", new ProtobufEncoder())
|
||||||
pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client))
|
pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client))
|
||||||
pipeline
|
pipeline
|
||||||
|
*/
|
||||||
|
val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT)
|
||||||
|
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
|
||||||
|
val lenPrep = new LengthFieldPrepender(4)
|
||||||
|
val protobufDec = new ProtobufDecoder(RemoteReply.getDefaultInstance)
|
||||||
|
val protobufEnc = new ProtobufEncoder
|
||||||
|
val zipCodec = RemoteServer.COMPRESSION_SCHEME match {
|
||||||
|
case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder))
|
||||||
|
//case "lzf" => Some(Codec(new LzfEncoder, new LzfDecoder))
|
||||||
|
case _ => None
|
||||||
|
}
|
||||||
|
val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client)
|
||||||
|
|
||||||
|
val stages: Array[ChannelHandler] =
|
||||||
|
zipCodec.map(codec => Array(timeout, codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteClient))
|
||||||
|
.getOrElse(Array(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient))
|
||||||
|
new StaticChannelPipeline(stages: _*)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -194,7 +194,8 @@ class RemoteServerPipelineFactory(
|
||||||
}
|
}
|
||||||
val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, activeObjects)
|
val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, activeObjects)
|
||||||
|
|
||||||
val stages: Array[ChannelHandler] = zipCodec.map(codec => Array(codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteServer))
|
val stages: Array[ChannelHandler] =
|
||||||
|
zipCodec.map(codec => Array(codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteServer))
|
||||||
.getOrElse(Array(lenDec, protobufDec, lenPrep, protobufEnc, remoteServer))
|
.getOrElse(Array(lenDec, protobufDec, lenPrep, protobufEnc, remoteServer))
|
||||||
new StaticChannelPipeline(stages: _*)
|
new StaticChannelPipeline(stages: _*)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,37 +8,58 @@ class ForwardActorTest extends JUnitSuite {
|
||||||
|
|
||||||
object ForwardState {
|
object ForwardState {
|
||||||
var sender: Actor = null
|
var sender: Actor = null
|
||||||
|
var result: String = "nada"
|
||||||
}
|
}
|
||||||
|
|
||||||
class ReceiverActor extends Actor {
|
class ReceiverActor extends Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "Send" => ForwardState.sender = sender.get
|
case "SendBang" => ForwardState.sender = sender.get
|
||||||
|
case "SendBangBang" => reply("SendBangBang")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class ForwardActor extends Actor {
|
class ForwardActor extends Actor {
|
||||||
val receiverActor = new ReceiverActor
|
val receiverActor = new ReceiverActor
|
||||||
receiverActor.start
|
receiverActor.start
|
||||||
def receive = {
|
def receive = {
|
||||||
case "Send" => receiverActor.forward("Send")
|
case "SendBang" => receiverActor.forward("SendBang")
|
||||||
|
case "SendBangBang" => receiverActor.forward("SendBangBang")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class SenderActor extends Actor {
|
class BangSenderActor extends Actor {
|
||||||
val forwardActor = new ForwardActor
|
val forwardActor = new ForwardActor
|
||||||
forwardActor.start
|
forwardActor.start
|
||||||
forwardActor ! "Send"
|
forwardActor ! "SendBang"
|
||||||
|
def receive = {
|
||||||
|
case _ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class BangBangSenderActor extends Actor {
|
||||||
|
val forwardActor = new ForwardActor
|
||||||
|
forwardActor.start
|
||||||
|
ForwardState.result = (forwardActor !! "SendBangBang").getOrElse("nada")
|
||||||
def receive = {
|
def receive = {
|
||||||
case _ => {}
|
case _ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def shouldForwardActorReferenceWhenInvokingForward = {
|
def shouldForwardActorReferenceWhenInvokingForwardOnBang = {
|
||||||
val senderActor = new SenderActor
|
val senderActor = new BangSenderActor
|
||||||
senderActor.start
|
senderActor.start
|
||||||
Thread.sleep(1000)
|
Thread.sleep(1000)
|
||||||
assert(ForwardState.sender ne null)
|
assert(ForwardState.sender ne null)
|
||||||
assert(senderActor === ForwardState.sender)
|
assert(senderActor === ForwardState.sender)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def shouldForwardActorReferenceWhenInvokingForwardOnBangBang = {
|
||||||
|
val senderActor = new BangBangSenderActor
|
||||||
|
senderActor.start
|
||||||
|
Thread.sleep(1000)
|
||||||
|
assert(ForwardState.result === "SendBangBang")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue