From 5d6ac4668323515cc18e4dfbe8a2f9e78ee401ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bjo=CC=88rn=20Antonsson?= Date: Fri, 20 Sep 2013 11:54:54 +0200 Subject: [PATCH] =rem #3621 Make Netty respect akka.daemonic setting --- .../transport/netty/NettyTransport.scala | 11 +++- .../test/scala/akka/remote/DaemonicSpec.scala | 54 +++++++++++++++++++ 2 files changed, 63 insertions(+), 2 deletions(-) create mode 100644 akka-remote/src/test/scala/akka/remote/DaemonicSpec.scala diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index 60322758f6..0dd2fb99da 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -19,7 +19,7 @@ import org.jboss.netty.bootstrap.{ ConnectionlessBootstrap, Bootstrap, ClientBoo import org.jboss.netty.buffer.{ ChannelBuffers, ChannelBuffer } import org.jboss.netty.channel._ import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture, ChannelGroupFutureListener } -import org.jboss.netty.channel.socket.nio.{ NioDatagramChannelFactory, NioServerSocketChannelFactory, NioClientSocketChannelFactory } +import org.jboss.netty.channel.socket.nio.{ NioWorkerPool, NioDatagramChannelFactory, NioServerSocketChannelFactory, NioClientSocketChannelFactory } import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } import org.jboss.netty.handler.ssl.SslHandler import scala.concurrent.duration.{ Duration, FiniteDuration, MILLISECONDS } @@ -29,6 +29,7 @@ import scala.util.control.{ NoStackTrace, NonFatal } import akka.util.Helpers.Requiring import akka.util.Helpers import akka.remote.RARP +import org.jboss.netty.util.HashedWheelTimer object NettyTransportSettings { sealed trait Mode @@ -276,16 +277,22 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA private val clientChannelFactory: ChannelFactory = TransportMode match { case Tcp ⇒ val boss, worker = createExecutorService() - new NioClientSocketChannelFactory(boss, worker, ClientSocketWorkerPoolSize) + // We need to create a HashedWheelTimer here since Netty creates one with a thread that + // doesn't respect the akka.daemonic setting + new NioClientSocketChannelFactory(boss, 1, new NioWorkerPool(worker, ClientSocketWorkerPoolSize), + new HashedWheelTimer(system.threadFactory)) case Udp ⇒ + // This does not create a HashedWheelTimer internally new NioDatagramChannelFactory(createExecutorService(), ClientSocketWorkerPoolSize) } private val serverChannelFactory: ChannelFactory = TransportMode match { case Tcp ⇒ val boss, worker = createExecutorService() + // This does not create a HashedWheelTimer internally new NioServerSocketChannelFactory(boss, worker, ServerSocketWorkerPoolSize) case Udp ⇒ + // This does not create a HashedWheelTimer internally new NioDatagramChannelFactory(createExecutorService(), ServerSocketWorkerPoolSize) } diff --git a/akka-remote/src/test/scala/akka/remote/DaemonicSpec.scala b/akka-remote/src/test/scala/akka/remote/DaemonicSpec.scala new file mode 100644 index 0000000000..776aab1417 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/DaemonicSpec.scala @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.remote + +import akka.testkit._ +import scala.concurrent.duration._ +import akka.actor.{ Address, ExtendedActorSystem, ActorSystem } +import com.typesafe.config.ConfigFactory +import java.nio.channels.ServerSocketChannel +import java.net.InetSocketAddress +import scala.collection.JavaConverters._ + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class DaemonicSpec extends AkkaSpec { + + def addr(sys: ActorSystem, proto: String) = + sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"akka.$proto", "", "", 0)).get + + def unusedPort = { + val ss = ServerSocketChannel.open().socket() + ss.bind(new InetSocketAddress("localhost", 0)) + val port = ss.getLocalPort + ss.close() + port + } + + "Remoting configured with daemonic = on" must { + + "shut down correctly after getting connection refused" in { + // get all threads running before actor system i started + val origThreads: Set[Thread] = Thread.getAllStackTraces().keySet().asScala.to[Set] + // create a separate actor system that we can check the threads for + val daemonicSystem = ActorSystem("daemonic", ConfigFactory.parseString(""" + akka.daemonic = on + akka.actor.provider = "akka.remote.RemoteActorRefProvider" + akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" + akka.remote.netty.tcp.port = 0 + """)) + + val unusedAddress = addr(daemonicSystem, "tcp").copy(port = Some(unusedPort)) + val selection = daemonicSystem.actorSelection(s"${unusedAddress}/user/SomeActor") + selection ! "whatever" + Thread.sleep(2.seconds.dilated.toMillis) + + // get new non daemonic threads running + val newNonDaemons: Set[Thread] = Thread.getAllStackTraces().keySet().asScala.seq. + filter(t ⇒ !origThreads(t) && t.isDaemon == false).to[Set] + + newNonDaemons must be(Set.empty[Thread]) + shutdown(daemonicSystem) + } + } +}