Merge pull request #1037 from akka/wip-2862-leaking-netty-threads-ban

Leaking Netty threads causes file handle exhaustion
This commit is contained in:
Björn Antonsson 2013-01-18 02:19:05 -08:00
commit 398840c204
6 changed files with 35 additions and 5 deletions

View file

@ -448,6 +448,10 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP
case GetNodes sender ! nodes.keys case GetNodes sender ! nodes.keys
case GetSockAddr sender ! connection.getLocalAddress case GetSockAddr sender ! connection.getLocalAddress
} }
override def postStop() {
RemoteConnection.shutdown(connection)
}
} }
/** /**

View file

@ -313,6 +313,7 @@ private[akka] class PlayerHandler(
val channel = event.getChannel val channel = event.getChannel
log.debug("disconnected from {}", getAddrString(channel)) log.debug("disconnected from {}", getAddrString(channel))
fsm ! PoisonPill fsm ! PoisonPill
RemoteConnection.shutdown(channel)
} }
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = { override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {

View file

@ -70,4 +70,8 @@ private[akka] object RemoteConnection {
case i: InetSocketAddress i.toString case i: InetSocketAddress i.toString
case _ "[unknown]" case _ "[unknown]"
} }
def shutdown(channel: Channel) = {
channel.getFactory.releaseExternalResources()
}
} }

View file

@ -5,7 +5,7 @@ package akka.remote.testconductor
import language.postfixOps import language.postfixOps
import akka.actor.{ Props, AddressFromURIString, ActorRef, Actor, OneForOneStrategy, SupervisorStrategy } import akka.actor.{ Props, AddressFromURIString, ActorRef, Actor, OneForOneStrategy, SupervisorStrategy, PoisonPill }
import akka.testkit.{ AkkaSpec, ImplicitSender, EventFilter, TestProbe, TimingTest } import akka.testkit.{ AkkaSpec, ImplicitSender, EventFilter, TestProbe, TimingTest }
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.event.Logging import akka.event.Logging
@ -243,6 +243,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
EventFilter[BarrierEmpty](occurrences = 1) intercept { EventFilter[BarrierEmpty](occurrences = 1) intercept {
b ! Remove(A) b ! Remove(A)
} }
b ! PoisonPill // clean up so network connections don't accumulate during test run
} }
"register clients and disconnect them" taggedAs TimingTest in { "register clients and disconnect them" taggedAs TimingTest in {
@ -253,12 +254,14 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
expectNoMsg(1 second) expectNoMsg(1 second)
b ! ClientDisconnected(A) b ! ClientDisconnected(A)
expectNoMsg(1 second) expectNoMsg(1 second)
b ! PoisonPill // clean up so network connections don't accumulate during test run
} }
"fail entering barrier when nobody registered" taggedAs TimingTest in { "fail entering barrier when nobody registered" taggedAs TimingTest in {
val b = getController(0) val b = getController(0)
b ! EnterBarrier("b", None) b ! EnterBarrier("b", None)
expectMsg(ToClient(BarrierResult("b", false))) expectMsg(ToClient(BarrierResult("b", false)))
b ! PoisonPill // clean up so network connections don't accumulate during test run
} }
"enter barrier" taggedAs TimingTest in { "enter barrier" taggedAs TimingTest in {
@ -275,6 +278,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
a.expectMsg(ToClient(BarrierResult("bar11", true))) a.expectMsg(ToClient(BarrierResult("bar11", true)))
b.expectMsg(ToClient(BarrierResult("bar11", true))) b.expectMsg(ToClient(BarrierResult("bar11", true)))
} }
barrier ! PoisonPill // clean up so network connections don't accumulate during test run
} }
"enter barrier with joining node" taggedAs TimingTest in { "enter barrier with joining node" taggedAs TimingTest in {
@ -295,6 +299,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
b.expectMsg(ToClient(BarrierResult("bar12", true))) b.expectMsg(ToClient(BarrierResult("bar12", true)))
c.expectMsg(ToClient(BarrierResult("bar12", true))) c.expectMsg(ToClient(BarrierResult("bar12", true)))
} }
barrier ! PoisonPill // clean up so network connections don't accumulate during test run
} }
"enter barrier with leaving node" taggedAs TimingTest in { "enter barrier with leaving node" taggedAs TimingTest in {
@ -317,6 +322,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
} }
barrier ! ClientDisconnected(C) barrier ! ClientDisconnected(C)
expectNoMsg(1 second) expectNoMsg(1 second)
barrier ! PoisonPill // clean up so network connections don't accumulate during test run
} }
"leave barrier when last “arrived” is removed" taggedAs TimingTest in { "leave barrier when last “arrived” is removed" taggedAs TimingTest in {
@ -330,6 +336,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
barrier ! Remove(A) barrier ! Remove(A)
b.send(barrier, EnterBarrier("foo", None)) b.send(barrier, EnterBarrier("foo", None))
b.expectMsg(ToClient(BarrierResult("foo", true))) b.expectMsg(ToClient(BarrierResult("foo", true)))
barrier ! PoisonPill // clean up so network connections don't accumulate during test run
} }
"fail barrier with disconnecing node" taggedAs TimingTest in { "fail barrier with disconnecing node" taggedAs TimingTest in {
@ -347,6 +354,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
barrier ! ClientDisconnected(B) barrier ! ClientDisconnected(B)
} }
a.expectMsg(ToClient(BarrierResult("bar15", false))) a.expectMsg(ToClient(BarrierResult("bar15", false)))
barrier ! PoisonPill // clean up so network connections don't accumulate during test run
} }
"fail barrier with disconnecing node who already arrived" taggedAs TimingTest in { "fail barrier with disconnecing node who already arrived" taggedAs TimingTest in {
@ -366,6 +374,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
barrier ! ClientDisconnected(B) barrier ! ClientDisconnected(B)
} }
a.expectMsg(ToClient(BarrierResult("bar16", false))) a.expectMsg(ToClient(BarrierResult("bar16", false)))
barrier ! PoisonPill // clean up so network connections don't accumulate during test run
} }
"fail when entering wrong barrier" taggedAs TimingTest in { "fail when entering wrong barrier" taggedAs TimingTest in {
@ -383,6 +392,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
} }
a.expectMsg(ToClient(BarrierResult("bar17", false))) a.expectMsg(ToClient(BarrierResult("bar17", false)))
b.expectMsg(ToClient(BarrierResult("foo", false))) b.expectMsg(ToClient(BarrierResult("foo", false)))
barrier ! PoisonPill // clean up so network connections don't accumulate during test run
} }
"fail after barrier timeout" taggedAs TimingTest in { "fail after barrier timeout" taggedAs TimingTest in {
@ -401,6 +411,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
b.send(barrier, EnterBarrier("bar18", None)) b.send(barrier, EnterBarrier("bar18", None))
a.expectMsg(ToClient(BarrierResult("bar18", false))) a.expectMsg(ToClient(BarrierResult("bar18", false)))
b.expectMsg(ToClient(BarrierResult("bar18", false))) b.expectMsg(ToClient(BarrierResult("bar18", false)))
barrier ! PoisonPill // clean up so network connections don't accumulate during test run
} }
"fail if a node registers twice" taggedAs TimingTest in { "fail if a node registers twice" taggedAs TimingTest in {
@ -414,6 +425,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
} }
a.expectMsg(ToClient(BarrierResult("initial startup", false))) a.expectMsg(ToClient(BarrierResult("initial startup", false)))
b.expectMsg(ToClient(BarrierResult("initial startup", false))) b.expectMsg(ToClient(BarrierResult("initial startup", false)))
controller ! PoisonPill // clean up so network connections don't accumulate during test run
} }
"fail subsequent barriers if a node registers twice" taggedAs TimingTest in { "fail subsequent barriers if a node registers twice" taggedAs TimingTest in {
@ -429,6 +441,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
} }
a.send(controller, EnterBarrier("bar19", None)) a.send(controller, EnterBarrier("bar19", None))
a.expectMsg(ToClient(BarrierResult("bar19", false))) a.expectMsg(ToClient(BarrierResult("bar19", false)))
controller ! PoisonPill // clean up so network connections don't accumulate during test run
} }
"fail subsequent barriers after foreced failure" taggedAs TimingTest in { "fail subsequent barriers after foreced failure" taggedAs TimingTest in {
@ -450,6 +463,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
b.send(barrier, EnterBarrier("bar21", None)) b.send(barrier, EnterBarrier("bar21", None))
a.expectMsg(ToClient(BarrierResult("bar21", false))) a.expectMsg(ToClient(BarrierResult("bar21", false)))
b.expectMsg(ToClient(BarrierResult("bar21", false))) b.expectMsg(ToClient(BarrierResult("bar21", false)))
barrier ! PoisonPill // clean up so network connections don't accumulate during test run
} }
"timeout within the shortest timeout if the new timeout is shorter" taggedAs TimingTest in { "timeout within the shortest timeout if the new timeout is shorter" taggedAs TimingTest in {
@ -473,6 +487,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
a.expectMsg(ToClient(BarrierResult("bar22", false))) a.expectMsg(ToClient(BarrierResult("bar22", false)))
b.expectMsg(ToClient(BarrierResult("bar22", false))) b.expectMsg(ToClient(BarrierResult("bar22", false)))
c.expectMsg(ToClient(BarrierResult("bar22", false))) c.expectMsg(ToClient(BarrierResult("bar22", false)))
barrier ! PoisonPill // clean up so network connections don't accumulate during test run
} }
"timeout within the shortest timeout if the new timeout is longer" taggedAs TimingTest in { "timeout within the shortest timeout if the new timeout is longer" taggedAs TimingTest in {
@ -496,6 +511,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
a.expectMsg(ToClient(BarrierResult("bar23", false))) a.expectMsg(ToClient(BarrierResult("bar23", false)))
b.expectMsg(ToClient(BarrierResult("bar23", false))) b.expectMsg(ToClient(BarrierResult("bar23", false)))
c.expectMsg(ToClient(BarrierResult("bar23", false))) c.expectMsg(ToClient(BarrierResult("bar23", false)))
barrier ! PoisonPill // clean up so network connections don't accumulate during test run
} }
"finally have no failure messages left" taggedAs TimingTest in { "finally have no failure messages left" taggedAs TimingTest in {

View file

@ -4,10 +4,9 @@
package akka.remote.testconductor package akka.remote.testconductor
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.actor.Props import akka.actor.{ PoisonPill, Props, AddressFromURIString }
import akka.testkit.ImplicitSender import akka.testkit.ImplicitSender
import akka.remote.testconductor.Controller.NodeInfo import akka.remote.testconductor.Controller.NodeInfo
import akka.actor.AddressFromURIString
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.net.InetAddress import java.net.InetAddress
@ -35,6 +34,7 @@ class ControllerSpec extends AkkaSpec(ControllerSpec.config) with ImplicitSender
expectMsg(ToClient(Done)) expectMsg(ToClient(Done))
c ! Controller.GetNodes c ! Controller.GetNodes
expectMsgType[Iterable[RoleName]].toSet must be(Set(A, B)) expectMsgType[Iterable[RoleName]].toSet must be(Set(A, B))
c ! PoisonPill // clean up so network connections don't accumulate during test run
} }
} }

View file

@ -381,8 +381,13 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
_ always(channelGroup.close()) _ always(channelGroup.close())
} { } {
// Release the selectors, but don't try to kill the dispatcher // Release the selectors, but don't try to kill the dispatcher
if (UseDispatcherForIo.isDefined) inboundBootstrap.shutdown() if (UseDispatcherForIo.isDefined) {
else inboundBootstrap.releaseExternalResources() clientChannelFactory.shutdown()
serverChannelFactory.shutdown()
} else {
clientChannelFactory.releaseExternalResources()
serverChannelFactory.releaseExternalResources()
}
} }
} }