=rem #3621 Make Netty respect akka.daemonic setting

This commit is contained in:
Björn Antonsson 2013-09-20 11:54:54 +02:00
parent 6f89d346ec
commit 5d6ac46683
2 changed files with 63 additions and 2 deletions

View file

@ -19,7 +19,7 @@ import org.jboss.netty.bootstrap.{ ConnectionlessBootstrap, Bootstrap, ClientBoo
import org.jboss.netty.buffer.{ ChannelBuffers, ChannelBuffer }
import org.jboss.netty.channel._
import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture, ChannelGroupFutureListener }
import org.jboss.netty.channel.socket.nio.{ NioDatagramChannelFactory, NioServerSocketChannelFactory, NioClientSocketChannelFactory }
import org.jboss.netty.channel.socket.nio.{ NioWorkerPool, NioDatagramChannelFactory, NioServerSocketChannelFactory, NioClientSocketChannelFactory }
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import org.jboss.netty.handler.ssl.SslHandler
import scala.concurrent.duration.{ Duration, FiniteDuration, MILLISECONDS }
@ -29,6 +29,7 @@ import scala.util.control.{ NoStackTrace, NonFatal }
import akka.util.Helpers.Requiring
import akka.util.Helpers
import akka.remote.RARP
import org.jboss.netty.util.HashedWheelTimer
object NettyTransportSettings {
sealed trait Mode
@ -276,16 +277,22 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
private val clientChannelFactory: ChannelFactory = TransportMode match {
case Tcp
val boss, worker = createExecutorService()
new NioClientSocketChannelFactory(boss, worker, ClientSocketWorkerPoolSize)
// We need to create a HashedWheelTimer here since Netty creates one with a thread that
// doesn't respect the akka.daemonic setting
new NioClientSocketChannelFactory(boss, 1, new NioWorkerPool(worker, ClientSocketWorkerPoolSize),
new HashedWheelTimer(system.threadFactory))
case Udp
// This does not create a HashedWheelTimer internally
new NioDatagramChannelFactory(createExecutorService(), ClientSocketWorkerPoolSize)
}
private val serverChannelFactory: ChannelFactory = TransportMode match {
case Tcp
val boss, worker = createExecutorService()
// This does not create a HashedWheelTimer internally
new NioServerSocketChannelFactory(boss, worker, ServerSocketWorkerPoolSize)
case Udp
// This does not create a HashedWheelTimer internally
new NioDatagramChannelFactory(createExecutorService(), ServerSocketWorkerPoolSize)
}

View file

@ -0,0 +1,54 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.testkit._
import scala.concurrent.duration._
import akka.actor.{ Address, ExtendedActorSystem, ActorSystem }
import com.typesafe.config.ConfigFactory
import java.nio.channels.ServerSocketChannel
import java.net.InetSocketAddress
import scala.collection.JavaConverters._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DaemonicSpec extends AkkaSpec {
def addr(sys: ActorSystem, proto: String) =
sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"akka.$proto", "", "", 0)).get
def unusedPort = {
val ss = ServerSocketChannel.open().socket()
ss.bind(new InetSocketAddress("localhost", 0))
val port = ss.getLocalPort
ss.close()
port
}
"Remoting configured with daemonic = on" must {
"shut down correctly after getting connection refused" in {
// get all threads running before actor system i started
val origThreads: Set[Thread] = Thread.getAllStackTraces().keySet().asScala.to[Set]
// create a separate actor system that we can check the threads for
val daemonicSystem = ActorSystem("daemonic", ConfigFactory.parseString("""
akka.daemonic = on
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
akka.remote.netty.tcp.port = 0
"""))
val unusedAddress = addr(daemonicSystem, "tcp").copy(port = Some(unusedPort))
val selection = daemonicSystem.actorSelection(s"${unusedAddress}/user/SomeActor")
selection ! "whatever"
Thread.sleep(2.seconds.dilated.toMillis)
// get new non daemonic threads running
val newNonDaemons: Set[Thread] = Thread.getAllStackTraces().keySet().asScala.seq.
filter(t !origThreads(t) && t.isDaemon == false).to[Set]
newNonDaemons must be(Set.empty[Thread])
shutdown(daemonicSystem)
}
}
}