diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index cd98cc74c7..5ea1b2e8cb 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -448,6 +448,10 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP case GetNodes ⇒ sender ! nodes.keys case GetSockAddr ⇒ sender ! connection.getLocalAddress } + + override def postStop() { + RemoteConnection.shutdown(connection) + } } /** diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala index f3b7a89c08..2a0ed93733 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala @@ -313,6 +313,7 @@ private[akka] class PlayerHandler( val channel = event.getChannel log.debug("disconnected from {}", getAddrString(channel)) fsm ! PoisonPill + RemoteConnection.shutdown(channel) } override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = { diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/RemoteConnection.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/RemoteConnection.scala index adca9518b4..2c542a2632 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/RemoteConnection.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/RemoteConnection.scala @@ -70,4 +70,8 @@ private[akka] object RemoteConnection { case i: InetSocketAddress ⇒ i.toString case _ ⇒ "[unknown]" } + + def shutdown(channel: Channel) = { + channel.getFactory.releaseExternalResources() + } } diff --git a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala index fe6a0cfebe..0e1604ff1b 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala @@ -5,7 +5,7 @@ package akka.remote.testconductor import language.postfixOps -import akka.actor.{ Props, AddressFromURIString, ActorRef, Actor, OneForOneStrategy, SupervisorStrategy } +import akka.actor.{ Props, AddressFromURIString, ActorRef, Actor, OneForOneStrategy, SupervisorStrategy, PoisonPill } import akka.testkit.{ AkkaSpec, ImplicitSender, EventFilter, TestProbe, TimingTest } import scala.concurrent.duration._ import akka.event.Logging @@ -243,6 +243,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { EventFilter[BarrierEmpty](occurrences = 1) intercept { b ! Remove(A) } + b ! PoisonPill // clean up so network connections don't accumulate during test run } "register clients and disconnect them" taggedAs TimingTest in { @@ -253,12 +254,14 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { expectNoMsg(1 second) b ! ClientDisconnected(A) expectNoMsg(1 second) + b ! PoisonPill // clean up so network connections don't accumulate during test run } "fail entering barrier when nobody registered" taggedAs TimingTest in { val b = getController(0) b ! EnterBarrier("b", None) expectMsg(ToClient(BarrierResult("b", false))) + b ! PoisonPill // clean up so network connections don't accumulate during test run } "enter barrier" taggedAs TimingTest in { @@ -275,6 +278,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { a.expectMsg(ToClient(BarrierResult("bar11", true))) b.expectMsg(ToClient(BarrierResult("bar11", true))) } + barrier ! PoisonPill // clean up so network connections don't accumulate during test run } "enter barrier with joining node" taggedAs TimingTest in { @@ -295,6 +299,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { b.expectMsg(ToClient(BarrierResult("bar12", true))) c.expectMsg(ToClient(BarrierResult("bar12", true))) } + barrier ! PoisonPill // clean up so network connections don't accumulate during test run } "enter barrier with leaving node" taggedAs TimingTest in { @@ -317,6 +322,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { } barrier ! ClientDisconnected(C) expectNoMsg(1 second) + barrier ! PoisonPill // clean up so network connections don't accumulate during test run } "leave barrier when last “arrived” is removed" taggedAs TimingTest in { @@ -330,6 +336,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { barrier ! Remove(A) b.send(barrier, EnterBarrier("foo", None)) b.expectMsg(ToClient(BarrierResult("foo", true))) + barrier ! PoisonPill // clean up so network connections don't accumulate during test run } "fail barrier with disconnecing node" taggedAs TimingTest in { @@ -347,6 +354,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { barrier ! ClientDisconnected(B) } a.expectMsg(ToClient(BarrierResult("bar15", false))) + barrier ! PoisonPill // clean up so network connections don't accumulate during test run } "fail barrier with disconnecing node who already arrived" taggedAs TimingTest in { @@ -366,6 +374,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { barrier ! ClientDisconnected(B) } a.expectMsg(ToClient(BarrierResult("bar16", false))) + barrier ! PoisonPill // clean up so network connections don't accumulate during test run } "fail when entering wrong barrier" taggedAs TimingTest in { @@ -383,6 +392,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { } a.expectMsg(ToClient(BarrierResult("bar17", false))) b.expectMsg(ToClient(BarrierResult("foo", false))) + barrier ! PoisonPill // clean up so network connections don't accumulate during test run } "fail after barrier timeout" taggedAs TimingTest in { @@ -401,6 +411,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { b.send(barrier, EnterBarrier("bar18", None)) a.expectMsg(ToClient(BarrierResult("bar18", false))) b.expectMsg(ToClient(BarrierResult("bar18", false))) + barrier ! PoisonPill // clean up so network connections don't accumulate during test run } "fail if a node registers twice" taggedAs TimingTest in { @@ -414,6 +425,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { } a.expectMsg(ToClient(BarrierResult("initial startup", false))) b.expectMsg(ToClient(BarrierResult("initial startup", false))) + controller ! PoisonPill // clean up so network connections don't accumulate during test run } "fail subsequent barriers if a node registers twice" taggedAs TimingTest in { @@ -429,6 +441,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { } a.send(controller, EnterBarrier("bar19", None)) a.expectMsg(ToClient(BarrierResult("bar19", false))) + controller ! PoisonPill // clean up so network connections don't accumulate during test run } "fail subsequent barriers after foreced failure" taggedAs TimingTest in { @@ -450,6 +463,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { b.send(barrier, EnterBarrier("bar21", None)) a.expectMsg(ToClient(BarrierResult("bar21", false))) b.expectMsg(ToClient(BarrierResult("bar21", false))) + barrier ! PoisonPill // clean up so network connections don't accumulate during test run } "timeout within the shortest timeout if the new timeout is shorter" taggedAs TimingTest in { @@ -473,6 +487,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { a.expectMsg(ToClient(BarrierResult("bar22", false))) b.expectMsg(ToClient(BarrierResult("bar22", false))) c.expectMsg(ToClient(BarrierResult("bar22", false))) + barrier ! PoisonPill // clean up so network connections don't accumulate during test run } "timeout within the shortest timeout if the new timeout is longer" taggedAs TimingTest in { @@ -496,6 +511,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { a.expectMsg(ToClient(BarrierResult("bar23", false))) b.expectMsg(ToClient(BarrierResult("bar23", false))) c.expectMsg(ToClient(BarrierResult("bar23", false))) + barrier ! PoisonPill // clean up so network connections don't accumulate during test run } "finally have no failure messages left" taggedAs TimingTest in { diff --git a/akka-remote-tests/src/test/scala/akka/remote/testconductor/ControllerSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testconductor/ControllerSpec.scala index 17d8932590..742ef36c9c 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testconductor/ControllerSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testconductor/ControllerSpec.scala @@ -4,10 +4,9 @@ package akka.remote.testconductor import akka.testkit.AkkaSpec -import akka.actor.Props +import akka.actor.{ PoisonPill, Props, AddressFromURIString } import akka.testkit.ImplicitSender import akka.remote.testconductor.Controller.NodeInfo -import akka.actor.AddressFromURIString import java.net.InetSocketAddress import java.net.InetAddress @@ -35,6 +34,7 @@ class ControllerSpec extends AkkaSpec(ControllerSpec.config) with ImplicitSender expectMsg(ToClient(Done)) c ! Controller.GetNodes expectMsgType[Iterable[RoleName]].toSet must be(Set(A, B)) + c ! PoisonPill // clean up so network connections don't accumulate during test run } } diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index 6550753214..d2bb8f1e11 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -381,8 +381,13 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s _ ← always(channelGroup.close()) } { // Release the selectors, but don't try to kill the dispatcher - if (UseDispatcherForIo.isDefined) inboundBootstrap.shutdown() - else inboundBootstrap.releaseExternalResources() + if (UseDispatcherForIo.isDefined) { + clientChannelFactory.shutdown() + serverChannelFactory.shutdown() + } else { + clientChannelFactory.releaseExternalResources() + serverChannelFactory.releaseExternalResources() + } } }