!test Migrate multi node testkit to Netty 4.

Signed-off-by: He-Pin <hepin1989@gmail.com>
This commit is contained in:
He-Pin 2023-07-15 21:39:02 +08:00 committed by kerr
parent 464517a547
commit 0e89e793aa
10 changed files with 296 additions and 153 deletions

View file

@ -0,0 +1,57 @@
# Migrate to netty 4
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.pekko.remote.testconductor.RemoteConnection")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.PlayerHandler")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelOpen")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelClosed")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelBound")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelUnbound")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.writeComplete")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.exceptionCaught")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelConnected")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelDisconnected")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.messageReceived")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ServerFSM.channel")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ServerFSM.this")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.ConductorHandler")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.clients")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.channelConnected")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.channelDisconnected")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.messageReceived")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.ProtobufDecoder")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ProtobufDecoder.decode")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.ProtobufEncoder")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ProtobufEncoder.encode")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.MsgEncoder")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.MsgEncoder.encode")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.RemoteConnection.apply")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.RemoteConnection.getAddrString")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.RemoteConnection.shutdown")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.TestConductorPipelineFactory")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.TestConductorPipelineFactory.getPipeline")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.TestConductorPipelineFactory.this")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.MsgDecoder")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.MsgDecoder.decode")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller.connection")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.unapply")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.unapply")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.unapply")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.channel")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.copy")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.channel")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.copy")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.copy$default$1")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.this")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.channel")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.copy")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.copy$default$1")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.copy$default$1")
# For Scala 3 these are also needed
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected._1")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data._1")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM._1")

View file

@ -16,6 +16,7 @@ package org.apache.pekko.remote.testconductor
import java.net.InetSocketAddress
import java.util.concurrent.ConcurrentHashMap
import scala.annotation.nowarn
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
@ -23,18 +24,13 @@ import scala.reflect.classTag
import scala.util.control.NoStackTrace
import RemoteConnection.getAddrString
import io.netty.channel.{ Channel, ChannelHandlerContext, ChannelInboundHandlerAdapter }
import io.netty.channel.ChannelHandler.Sharable
import language.postfixOps
import org.jboss.netty.channel.{
Channel,
ChannelHandlerContext,
ChannelStateEvent,
MessageEvent,
SimpleChannelUpstreamHandler
}
import org.apache.pekko
import pekko.PekkoException
import pekko.ConfigurationException
import pekko.PekkoException
import pekko.actor.{
Actor,
ActorRef,
@ -286,32 +282,33 @@ trait Conductor { this: TestConductorExt =>
*
* INTERNAL API.
*/
@Sharable
private[pekko] class ConductorHandler(_createTimeout: Timeout, controller: ActorRef, log: LoggingAdapter)
extends SimpleChannelUpstreamHandler {
extends ChannelInboundHandlerAdapter {
implicit val createTimeout: Timeout = _createTimeout
val clients = new ConcurrentHashMap[Channel, ActorRef]()
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val channel = event.getChannel
override def channelActive(ctx: ChannelHandlerContext): Unit = {
val channel = ctx.channel()
log.debug("connection from {}", getAddrString(channel))
val fsm: ActorRef =
Await.result((controller ? Controller.CreateServerFSM(channel)).mapTo(classTag[ActorRef]), Duration.Inf)
clients.put(channel, fsm)
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val channel = event.getChannel
override def channelInactive(ctx: ChannelHandlerContext): Unit = {
val channel = ctx.channel()
log.debug("disconnect from {}", getAddrString(channel))
val fsm = clients.get(channel)
fsm ! Controller.ClientDisconnected
clients.remove(channel)
}
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
val channel = event.getChannel
log.debug("message from {}: {}", getAddrString(channel), event.getMessage)
event.getMessage match {
override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = {
val channel = ctx.channel()
log.debug("message from {}: {}", getAddrString(channel), msg)
msg match {
case msg: NetworkOp =>
clients.get(channel) ! msg
case msg =>
@ -320,6 +317,11 @@ private[pekko] class ConductorHandler(_createTimeout: Timeout, controller: Actor
}
}
@nowarn("msg=deprecated")
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
log.error("channel {} exception {}", ctx.channel(), cause)
ctx.close()
}
}
/**
@ -398,10 +400,10 @@ private[pekko] class ServerFSM(val controller: ActorRef, val channel: Channel)
log.warning("client {} sent unsupported message {}", getAddrString(channel), msg)
stop()
case Event(ToClient(msg: UnconfirmedClientOp), _) =>
channel.write(msg)
channel.writeAndFlush(msg)
stay()
case Event(ToClient(msg), None) =>
channel.write(msg)
channel.writeAndFlush(msg)
stay().using(Some(sender()))
case Event(ToClient(msg), _) =>
log.warning("cannot send {} while waiting for previous ACK", msg)
@ -436,7 +438,7 @@ private[pekko] class Controller(private var initialParticipants: Int, controller
import Controller._
val settings = TestConductor().Settings
val connection = RemoteConnection(
val connection: RemoteConnection = RemoteConnection(
Server,
controllerPort,
settings.ServerSocketWorkerPoolSize,
@ -472,7 +474,7 @@ private[pekko] class Controller(private var initialParticipants: Int, controller
override def receive = LoggingReceive {
case CreateServerFSM(channel) =>
val (ip, port) = channel.getRemoteAddress match {
val (ip, port) = channel.remoteAddress() match {
case s: InetSocketAddress => (s.getAddress.getHostAddress, s.getPort)
case _ => throw new RuntimeException() // compiler exhaustiveness check pleaser
}
@ -525,12 +527,13 @@ private[pekko] class Controller(private var initialParticipants: Int, controller
case Remove(node) =>
barrier ! BarrierCoordinator.RemoveClient(node)
}
case GetNodes => sender() ! nodes.keys
case GetSockAddr => sender() ! connection.getLocalAddress
case GetNodes => sender() ! nodes.keys
case GetSockAddr =>
sender() ! connection.channelFuture.sync().channel().localAddress()
}
override def postStop(): Unit = {
RemoteConnection.shutdown(connection)
connection.shutdown()
}
}

View file

@ -15,12 +15,9 @@ package org.apache.pekko.remote.testconductor
import scala.concurrent.duration._
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.{ MessageToMessageDecoder, MessageToMessageEncoder }
import language.implicitConversions
import org.jboss.netty.channel.Channel
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder
import org.apache.pekko
import pekko.actor.Address
import pekko.remote.testconductor.{ TestConductorProtocol => TCP }
@ -74,7 +71,7 @@ private[pekko] case object Done extends Done {
private[pekko] final case class Remove(node: RoleName) extends CommandOp
private[pekko] class MsgEncoder extends OneToOneEncoder {
private[pekko] class MsgEncoder extends MessageToMessageEncoder[AnyRef] {
implicit def address2proto(addr: Address): TCP.Address =
TCP.Address.newBuilder
@ -90,7 +87,11 @@ private[pekko] class MsgEncoder extends OneToOneEncoder {
case Direction.Both => TCP.Direction.Both
}
def encode(ctx: ChannelHandlerContext, ch: Channel, msg: AnyRef): AnyRef = msg match {
override def encode(ctx: ChannelHandlerContext, msg: AnyRef, out: java.util.List[AnyRef]): Unit = {
out.add(encode0(msg))
}
private def encode0(msg: AnyRef): AnyRef = msg match {
case x: NetworkOp =>
val w = TCP.Wrapper.newBuilder
x match {
@ -136,7 +137,7 @@ private[pekko] class MsgEncoder extends OneToOneEncoder {
}
}
private[pekko] class MsgDecoder extends OneToOneDecoder {
private[pekko] class MsgDecoder extends MessageToMessageDecoder[AnyRef] {
implicit def address2scala(addr: TCP.Address): Address =
Address(addr.getProtocol, addr.getSystem, addr.getHost, addr.getPort)
@ -147,7 +148,11 @@ private[pekko] class MsgDecoder extends OneToOneDecoder {
case TCP.Direction.Both => Direction.Both
}
def decode(ctx: ChannelHandlerContext, ch: Channel, msg: AnyRef): AnyRef = msg match {
override def decode(ctx: ChannelHandlerContext, msg: AnyRef, out: java.util.List[AnyRef]): Unit = {
out.add(decode0(msg))
}
private def decode0(msg: AnyRef): AnyRef = msg match {
case w: TCP.Wrapper if w.getAllFields.size == 1 =>
if (w.hasHello) {
val h = w.getHello

View file

@ -15,7 +15,9 @@ package org.apache.pekko.remote.testconductor
import java.net.{ ConnectException, InetSocketAddress }
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.nowarn
import scala.collection.immutable
import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.concurrent.duration._
@ -23,16 +25,8 @@ import scala.reflect.classTag
import scala.util.control.NoStackTrace
import scala.util.control.NonFatal
import org.jboss.netty.channel.{
Channel,
ChannelHandlerContext,
ChannelStateEvent,
ExceptionEvent,
MessageEvent,
SimpleChannelUpstreamHandler,
WriteCompletionEvent
}
import io.netty.channel.{ Channel, ChannelHandlerContext, ChannelInboundHandlerAdapter }
import io.netty.channel.ChannelHandler.Sharable
import org.apache.pekko
import pekko.actor._
import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
@ -202,7 +196,7 @@ private[pekko] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress
case Event(_: ClientOp, _) =>
stay().replying(Status.Failure(new IllegalStateException("not connected yet")))
case Event(Connected(channel), _) =>
channel.write(Hello(name.name, TestConductor().address))
channel.writeAndFlush(Hello(name.name, TestConductor().address))
goto(AwaitDone).using(Data(Some(channel), None))
case Event(e: ConnectionFailure, _) =>
log.error(e, "ConnectionFailure")
@ -229,12 +223,12 @@ private[pekko] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress
when(Connected) {
case Event(Disconnected, _) =>
log.info("disconnected from TestConductor")
throw new ConnectionFailure("disconnect")
throw ConnectionFailure("disconnect")
case Event(ToServer(_: Done), Data(Some(channel), _)) =>
channel.write(Done)
channel.writeAndFlush(Done)
stay()
case Event(ToServer(msg), d @ Data(Some(channel), None)) =>
channel.write(msg)
channel.writeAndFlush(msg)
val token = msg match {
case EnterBarrier(barrier, _) => Some(barrier -> sender())
case GetAddress(node) => Some(node.name -> sender())
@ -331,6 +325,7 @@ private[pekko] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress
*
* INTERNAL API.
*/
@Sharable
private[pekko] class PlayerHandler(
server: InetSocketAddress,
private var reconnects: Int,
@ -339,57 +334,47 @@ private[pekko] class PlayerHandler(
fsm: ActorRef,
log: LoggingAdapter,
scheduler: Scheduler)(implicit executor: ExecutionContext)
extends SimpleChannelUpstreamHandler {
extends ChannelInboundHandlerAdapter {
import ClientFSM._
reconnect()
val connectionRef: AtomicReference[RemoteConnection] = new AtomicReference[RemoteConnection](reconnect())
var nextAttempt: Deadline = _
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) =
log.debug("channel {} open", event.getChannel)
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) =
log.debug("channel {} closed", event.getChannel)
override def channelBound(ctx: ChannelHandlerContext, event: ChannelStateEvent) =
log.debug("channel {} bound", event.getChannel)
override def channelUnbound(ctx: ChannelHandlerContext, event: ChannelStateEvent) =
log.debug("channel {} unbound", event.getChannel)
override def writeComplete(ctx: ChannelHandlerContext, event: WriteCompletionEvent) =
log.debug("channel {} written {}", event.getChannel, event.getWrittenAmount)
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
log.debug("channel {} exception {}", event.getChannel, event.getCause)
event.getCause match {
@nowarn("msg=deprecated")
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
log.error("channel {} exception {}", ctx.channel(), cause)
cause match {
case _: ConnectException if reconnects > 0 =>
reconnects -= 1
scheduler.scheduleOnce(nextAttempt.timeLeft)(reconnect())
scheduler.scheduleOnce(nextAttempt.timeLeft)(connectionRef.set(reconnect()))
case e => fsm ! ConnectionFailure(e.getMessage)
}
}
private def reconnect(): Unit = {
private def reconnect(): RemoteConnection = {
nextAttempt = Deadline.now + backoff
RemoteConnection(Client, server, poolSize, this)
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val ch = event.getChannel
override def channelActive(ctx: ChannelHandlerContext): Unit = {
val ch = ctx.channel()
log.debug("connected to {}", getAddrString(ch))
fsm ! Connected(ch)
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val channel = event.getChannel
override def channelInactive(ctx: ChannelHandlerContext): Unit = {
val channel = ctx.channel()
log.debug("disconnected from {}", getAddrString(channel))
fsm ! PoisonPill
executor.execute(new Runnable { def run = RemoteConnection.shutdown(channel) }) // Must be shutdown outside of the Netty IO pool
executor.execute(() => connectionRef.get().shutdown()) // Must be shutdown outside of the Netty IO pool
}
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
val channel = event.getChannel
log.debug("message from {}: {}", getAddrString(channel), event.getMessage)
event.getMessage match {
override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = {
val channel = ctx.channel()
log.debug("message from {}: {}", getAddrString(channel), msg)
msg match {
case msg: NetworkOp =>
fsm ! msg
case msg =>

View file

@ -14,68 +14,68 @@
package org.apache.pekko.remote.testconductor
import java.net.InetSocketAddress
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import scala.util.control.NonFatal
import org.jboss.netty.bootstrap.{ ClientBootstrap, ServerBootstrap }
import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.channel.{
Channel,
ChannelPipeline,
ChannelPipelineFactory,
ChannelUpstreamHandler,
DefaultChannelPipeline
import io.netty.bootstrap.{ Bootstrap, ServerBootstrap }
import io.netty.buffer.{ ByteBuf, ByteBufUtil }
import io.netty.channel._
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.{ NioServerSocketChannel, NioSocketChannel }
import io.netty.handler.codec.{
LengthFieldBasedFrameDecoder,
LengthFieldPrepender,
MessageToMessageDecoder,
MessageToMessageEncoder
}
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.channel.socket.nio.{ NioClientSocketChannelFactory, NioServerSocketChannelFactory }
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import org.jboss.netty.handler.codec.oneone.{ OneToOneDecoder, OneToOneEncoder }
import org.apache.pekko
import pekko.event.Logging
import pekko.protobufv3.internal.Message
import pekko.util.Helpers
/**
* INTERNAL API.
*/
private[pekko] class ProtobufEncoder extends OneToOneEncoder {
override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: AnyRef): AnyRef =
private[pekko] class ProtobufEncoder extends MessageToMessageEncoder[Message] {
override def encode(ctx: ChannelHandlerContext, msg: Message, out: java.util.List[AnyRef]): Unit = {
msg match {
case m: Message =>
val bytes = m.toByteArray()
ctx.getChannel.getConfig.getBufferFactory.getBuffer(bytes, 0, bytes.length)
case other => other
case message: Message =>
val bytes = message.toByteArray
out.add(ctx.alloc().buffer(bytes.length).writeBytes(bytes))
}
}
}
/**
* INTERNAL API.
*/
private[pekko] class ProtobufDecoder(prototype: Message) extends OneToOneDecoder {
override def decode(ctx: ChannelHandlerContext, ch: Channel, obj: AnyRef): AnyRef =
obj match {
case buf: ChannelBuffer =>
val len = buf.readableBytes()
val bytes = new Array[Byte](len)
buf.getBytes(buf.readerIndex, bytes, 0, len)
prototype.getParserForType.parseFrom(bytes)
case other => other
}
private[pekko] class ProtobufDecoder(prototype: Message) extends MessageToMessageDecoder[ByteBuf] {
override def decode(ctx: ChannelHandlerContext, msg: ByteBuf, out: java.util.List[AnyRef]): Unit = {
val bytes = ByteBufUtil.getBytes(msg)
out.add(prototype.getParserForType.parseFrom(bytes))
}
}
/**
* INTERNAL API.
*/
private[pekko] class TestConductorPipelineFactory(handler: ChannelUpstreamHandler) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
val encap = List(new LengthFieldPrepender(4), new LengthFieldBasedFrameDecoder(10000, 0, 4, 0, 4))
val proto = List(new ProtobufEncoder, new ProtobufDecoder(TestConductorProtocol.Wrapper.getDefaultInstance))
val msg = List(new MsgEncoder, new MsgDecoder)
(encap ::: proto ::: msg ::: handler :: Nil).foldLeft(new DefaultChannelPipeline) { (pipe, handler) =>
pipe.addLast(Logging.simpleName(handler.getClass), handler); pipe
}
@Sharable
private[pekko] class TestConductorPipelineFactory(
handler: ChannelInboundHandler) extends ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel): Unit = {
val pipe = ch.pipeline()
pipe.addLast("lengthFieldPrepender", new LengthFieldPrepender(4))
pipe.addLast("lengthFieldDecoder", new LengthFieldBasedFrameDecoder(10000, 0, 4, 0, 4, false))
pipe.addLast("protoEncoder", new ProtobufEncoder)
pipe.addLast("protoDecoder", new ProtobufDecoder(TestConductorProtocol.Wrapper.getDefaultInstance))
pipe.addLast("msgEncoder", new MsgEncoder)
pipe.addLast("msgDecoder", new MsgDecoder)
pipe.addLast("userHandler", handler)
}
}
@ -94,44 +94,87 @@ private[pekko] case object Client extends Role
*/
private[pekko] case object Server extends Role
/**
* INTERNAL API.
*/
private[pekko] trait RemoteConnection {
/**
* The channel future associated with this connection.
*/
def channelFuture: ChannelFuture
/**
* Shutdown the connection and release the resources.
*/
def shutdown(): Unit
}
/**
* INTERNAL API.
*/
private[pekko] object RemoteConnection {
def apply(role: Role, sockaddr: InetSocketAddress, poolSize: Int, handler: ChannelUpstreamHandler): Channel = {
def apply(
role: Role,
sockaddr: InetSocketAddress,
poolSize: Int,
handler: ChannelInboundHandler): RemoteConnection = {
role match {
case Client =>
val socketfactory =
new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool, poolSize)
val bootstrap = new ClientBootstrap(socketfactory)
bootstrap.setPipelineFactory(new TestConductorPipelineFactory(handler))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.connect(sockaddr).getChannel
val bootstrap = new Bootstrap()
val eventLoopGroup = new NioEventLoopGroup(poolSize)
val cf = bootstrap
.group(eventLoopGroup)
.channel(classOf[NioSocketChannel])
.handler(new TestConductorPipelineFactory(handler))
.option[java.lang.Boolean](ChannelOption.TCP_NODELAY, true)
.option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true)
.connect(sockaddr)
new RemoteConnection {
override def channelFuture: ChannelFuture = cf
override def shutdown(): Unit = {
try {
channelFuture.channel().close().sync()
eventLoopGroup.shutdownGracefully(0L, 0L, TimeUnit.SECONDS)
} catch {
case NonFatal(_) => // silence this one to not make tests look like they failed, it's not really critical
}
}
}
case Server =>
val socketfactory =
new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool, poolSize)
val bootstrap = new ServerBootstrap(socketfactory)
bootstrap.setPipelineFactory(new TestConductorPipelineFactory(handler))
bootstrap.setOption("reuseAddress", !Helpers.isWindows)
bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.bind(sockaddr)
val bootstrap = new ServerBootstrap()
val parentEventLoopGroup = new NioEventLoopGroup(poolSize)
val childEventLoopGroup = new NioEventLoopGroup(poolSize)
val cf = bootstrap
.group(parentEventLoopGroup, childEventLoopGroup)
.channel(classOf[NioServerSocketChannel])
.childHandler(new TestConductorPipelineFactory(handler))
.option[java.lang.Boolean](ChannelOption.SO_REUSEADDR, !Helpers.isWindows)
.option[java.lang.Integer](ChannelOption.SO_BACKLOG, 2048)
.childOption[java.lang.Boolean](ChannelOption.TCP_NODELAY, true)
.childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true)
.bind(sockaddr)
new RemoteConnection {
override def channelFuture: ChannelFuture = cf
override def shutdown(): Unit = {
try {
channelFuture.channel().close().sync()
parentEventLoopGroup.shutdownGracefully(0L, 0L, TimeUnit.SECONDS)
childEventLoopGroup.shutdownGracefully(0L, 0L, TimeUnit.SECONDS)
} catch {
case NonFatal(_) => // silence this one to not make tests look like they failed, it's not really critical
}
}
}
}
}
def getAddrString(channel: Channel) = channel.getRemoteAddress match {
def getAddrString(channel: Channel): String = channel.remoteAddress() match {
case i: InetSocketAddress => i.toString
case _ => "[unknown]"
}
def shutdown(channel: Channel): Unit = {
try {
try channel.close()
finally
try channel.getFactory.shutdown()
finally channel.getFactory.releaseExternalResources()
} catch {
case NonFatal(_) =>
// silence this one to not make tests look like they failed, it's not really critical
}
}
}

View file

@ -14,14 +14,15 @@
package org.apache.pekko.remote.testkit
import java.net.{ InetAddress, InetSocketAddress }
import scala.collection.immutable
import scala.concurrent.{ Await, Awaitable }
import scala.concurrent.duration._
import scala.util.control.NonFatal
import com.typesafe.config.{ Config, ConfigFactory, ConfigObject }
import com.typesafe.config.{ Config, ConfigFactory, ConfigObject }
import io.netty.channel.ChannelException
import language.implicitConversions
import org.jboss.netty.channel.ChannelException
import org.apache.pekko
import pekko.actor._
import pekko.actor.RootActorPath