Merge pull request #20660 from akka/wip-20659-leaking-tests-patriknw

fix memory leaks in tests, #20659
This commit is contained in:
Patrik Nordwall 2016-05-31 19:34:55 +02:00
commit f07041091f
38 changed files with 218 additions and 173 deletions

View file

@ -3,6 +3,7 @@ package akka.event;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.Props; import akka.actor.Props;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.JavaTestKit; import akka.testkit.JavaTestKit;
import akka.event.Logging.Error; import akka.event.Logging.Error;
import akka.event.ActorWithMDC.Log; import akka.event.ActorWithMDC.Log;
@ -10,6 +11,8 @@ import static akka.event.Logging.*;
import com.typesafe.config.Config; import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigFactory;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.scalatest.junit.JUnitSuite; import org.scalatest.junit.JUnitSuite;
import scala.concurrent.duration.Duration; import scala.concurrent.duration.Duration;
@ -30,9 +33,19 @@ public class LoggingAdapterTest extends JUnitSuite {
"akka.actor.serialize-messages = off" "akka.actor.serialize-messages = off"
); );
@Rule
public AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("LoggingAdapterTest", config);
private ActorSystem system = null;
@Before
public void beforeEach() {
system = actorSystemResource.getSystem();
}
@Test @Test
public void mustFormatMessage() { public void mustFormatMessage() {
final ActorSystem system = ActorSystem.create("test-system", config);
final LoggingAdapter log = Logging.getLogger(system, this); final LoggingAdapter log = Logging.getLogger(system, this);
new LogJavaTestKit(system) {{ new LogJavaTestKit(system) {{
system.eventStream().subscribe(getRef(), LogEvent.class); system.eventStream().subscribe(getRef(), LogEvent.class);
@ -66,7 +79,6 @@ public class LoggingAdapterTest extends JUnitSuite {
@Test @Test
public void mustCallMdcForEveryLog() throws Exception { public void mustCallMdcForEveryLog() throws Exception {
final ActorSystem system = ActorSystem.create("test-system", config);
new LogJavaTestKit(system) {{ new LogJavaTestKit(system) {{
system.eventStream().subscribe(getRef(), LogEvent.class); system.eventStream().subscribe(getRef(), LogEvent.class);
ActorRef ref = system.actorOf(Props.create(ActorWithMDC.class)); ActorRef ref = system.actorOf(Props.create(ActorWithMDC.class));
@ -86,7 +98,6 @@ public class LoggingAdapterTest extends JUnitSuite {
@Test @Test
public void mustSupportMdcNull() throws Exception { public void mustSupportMdcNull() throws Exception {
final ActorSystem system = ActorSystem.create("test-system", config);
new LogJavaTestKit(system) {{ new LogJavaTestKit(system) {{
system.eventStream().subscribe(getRef(), LogEvent.class); system.eventStream().subscribe(getRef(), LogEvent.class);
ActorRef ref = system.actorOf(Props.create(ActorWithMDC.class)); ActorRef ref = system.actorOf(Props.create(ActorWithMDC.class));
@ -131,6 +142,7 @@ public class LoggingAdapterTest extends JUnitSuite {
void expectLog(final Object level, final String message, final Throwable cause, final String mdc) { void expectLog(final Object level, final String message, final Throwable cause, final String mdc) {
new ExpectMsg<Void>(Duration.create(3, TimeUnit.SECONDS), "LogEvent") { new ExpectMsg<Void>(Duration.create(3, TimeUnit.SECONDS), "LogEvent") {
@Override
protected Void match(Object event) { protected Void match(Object event) {
LogEvent log = (LogEvent) event; LogEvent log = (LogEvent) event;
assertEquals(message, log.message()); assertEquals(message, log.message());

View file

@ -360,11 +360,10 @@ class TcpConnectionSpec extends AkkaSpec("""
"respect pull mode" in new EstablishedConnectionTest(pullMode = true) { "respect pull mode" in new EstablishedConnectionTest(pullMode = true) {
// override config to decrease default buffer size // override config to decrease default buffer size
val config = def config =
ConfigFactory.load( ConfigFactory.parseString("akka.io.tcp.direct-buffer-size = 1k")
ConfigFactory.parseString("akka.io.tcp.direct-buffer-size = 1k") .withFallback(AkkaSpec.testConf)
.withFallback(AkkaSpec.testConf)) override implicit lazy val system: ActorSystem = ActorSystem("respectPullModeTest", config)
override implicit def system: ActorSystem = ActorSystem("respectPullModeTest", config)
try run { try run {
val maxBufferSize = 1 * 1024 val maxBufferSize = 1 * 1024
@ -402,7 +401,7 @@ class TcpConnectionSpec extends AkkaSpec("""
connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should ===(vs) connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should ===(vs)
} }
finally system.terminate() finally shutdown(system)
} }
"close the connection and reply with `Closed` upon reception of a `Close` command" in "close the connection and reply with `Closed` upon reception of a `Close` command" in

View file

@ -23,16 +23,21 @@ import java.util.concurrent.TimeUnit;
public class CustomRouteTest extends JUnitSuite { public class CustomRouteTest extends JUnitSuite {
@Rule @Rule
public AkkaJUnitActorSystemResource actorSystemResource = public AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("CustomRouteTest");
new AkkaJUnitActorSystemResource("CustomRouteTest");
private final ActorSystem system = actorSystemResource.getSystem(); private ActorSystem system = null;
private Camel camel = (Camel) CamelExtension.get(system); private Camel camel = null;
public static class MyActor extends UntypedActor { public static class MyActor extends UntypedActor {
@Override public void onReceive(Object o) {} @Override public void onReceive(Object o) {}
} }
@Before
public void beforeEach() {
system = actorSystemResource.getSystem();
camel = (Camel) CamelExtension.get(system);
}
@Test @Test
public void testCustomProducerRoute() throws Exception { public void testCustomProducerRoute() throws Exception {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockProducer", MockEndpoint.class); MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockProducer", MockEndpoint.class);
@ -118,6 +123,7 @@ public class CustomRouteTest extends JUnitSuite {
private void assertMockEndpoint(MockEndpoint mockEndpoint) throws InterruptedException { private void assertMockEndpoint(MockEndpoint mockEndpoint) throws InterruptedException {
mockEndpoint.expectedMessageCount(1); mockEndpoint.expectedMessageCount(1);
mockEndpoint.expectedMessagesMatches(new Predicate() { mockEndpoint.expectedMessagesMatches(new Predicate() {
@Override
public boolean matches(Exchange exchange) { public boolean matches(Exchange exchange) {
return exchange.getIn().getBody().equals("test"); return exchange.getIn().getBody().equals("test");
} }
@ -126,8 +132,8 @@ public class CustomRouteTest extends JUnitSuite {
} }
public static class CustomRouteBuilder extends RouteBuilder { public static class CustomRouteBuilder extends RouteBuilder {
private String uri; private final String uri;
private String fromUri; private final String fromUri;
public CustomRouteBuilder(String from, String to) { public CustomRouteBuilder(String from, String to) {
fromUri = from; fromUri = from;
@ -164,7 +170,7 @@ public class CustomRouteTest extends JUnitSuite {
} }
public static class EndpointProducer extends UntypedProducerActor { public static class EndpointProducer extends UntypedProducerActor {
private String uri; private final String uri;
public EndpointProducer(String uri) { public EndpointProducer(String uri) {
this.uri = uri; this.uri = uri;
@ -192,8 +198,8 @@ public class CustomRouteTest extends JUnitSuite {
} }
public static class TestAckConsumer extends UntypedConsumerActor { public static class TestAckConsumer extends UntypedConsumerActor {
private String myuri; private final String myuri;
private String to; private final String to;
public TestAckConsumer(String uri, String to){ public TestAckConsumer(String uri, String to){
myuri = uri; myuri = uri;

View file

@ -48,7 +48,7 @@ private[camel] object TestSupport {
} }
trait SharedCamelSystem extends BeforeAndAfterAll { this: Suite trait SharedCamelSystem extends BeforeAndAfterAll { this: Suite
implicit lazy val system = ActorSystem("test", AkkaSpec.testConf) implicit lazy val system = ActorSystem("SharedCamelSystem", AkkaSpec.testConf)
implicit lazy val camel = CamelExtension(system) implicit lazy val camel = CamelExtension(system)
abstract override protected def afterAll() { abstract override protected def afterAll() {
@ -63,7 +63,7 @@ private[camel] object TestSupport {
override protected def beforeEach() { override protected def beforeEach() {
super.beforeEach() super.beforeEach()
system = ActorSystem("test", AkkaSpec.testConf) system = ActorSystem("NonSharedCamelSystem", AkkaSpec.testConf)
camel = CamelExtension(system) camel = CamelExtension(system)
} }

View file

@ -7,7 +7,7 @@ import akka.actor.{ Props, ActorSystem }
import akka.testkit.{ TimingTest, TestProbe, TestKit } import akka.testkit.{ TimingTest, TestProbe, TestKit }
import akka.camel.internal.ActivationProtocol._ import akka.camel.internal.ActivationProtocol._
class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfterEach with GivenWhenThen { class ActivationTrackerTest extends TestKit(ActorSystem("ActivationTrackerTest")) with WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfterEach with GivenWhenThen {
override protected def afterAll() { shutdown() } override protected def afterAll() { shutdown() }

View file

@ -30,7 +30,7 @@ import akka.util.Timeout
import akka.actor._ import akka.actor._
import akka.testkit._ import akka.testkit._
class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with ActorProducerFixture { class ActorProducerTest extends TestKit(ActorSystem("ActorProducerTest")) with WordSpecLike with Matchers with ActorProducerFixture {
implicit val timeout = Timeout(10 seconds) implicit val timeout = Timeout(10 seconds)
"ActorProducer" when { "ActorProducer" when {

View file

@ -7,13 +7,11 @@ import akka.testkit.{ ImplicitSender, TestKit }
import org.scalatest.FunSuiteLike import org.scalatest.FunSuiteLike
import org.scalatest.Matchers import org.scalatest.Matchers
import scala.annotation.tailrec import scala.annotation.tailrec
//#demo-code
import scala.collection._ import scala.collection._
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.math.BigDecimal.int2bigDecimal import scala.math.BigDecimal.int2bigDecimal
import akka.actor._ import akka.actor._
import org.scalatest.BeforeAndAfterAll
/** /**
* Sample and test code for the aggregator patter. * Sample and test code for the aggregator patter.
* This is based on Jamie Allen's tutorial at * This is based on Jamie Allen's tutorial at
@ -187,7 +185,11 @@ class ChainingSample extends Actor with Aggregator {
} }
//#chain-sample //#chain-sample
class AggregatorSpec extends TestKit(ActorSystem("test")) with ImplicitSender with FunSuiteLike with Matchers { class AggregatorSpec extends TestKit(ActorSystem("AggregatorSpec")) with ImplicitSender with FunSuiteLike with Matchers with BeforeAndAfterAll {
override def afterAll(): Unit = {
shutdown()
}
test("Test request 1 account type") { test("Test request 1 account type") {
system.actorOf(Props[AccountBalanceRetriever]) ! GetCustomerAccountBalances(1, Set(Savings)) system.actorOf(Props[AccountBalanceRetriever]) ! GetCustomerAccountBalances(1, Set(Savings))
@ -384,4 +386,4 @@ class WorkListSpec extends FunSuiteLike {
processed processed
} }
} }
} }

View file

@ -151,7 +151,7 @@ public class FaultHandlingTest extends AbstractJavaTest {
@BeforeClass @BeforeClass
public static void start() { public static void start() {
system = ActorSystem.create("test"); system = ActorSystem.create("FaultHandlingTest");
} }
@AfterClass @AfterClass

View file

@ -22,10 +22,10 @@ import akka.testkit.AkkaJUnitActorSystemResource;
import org.junit.*; import org.junit.*;
public class SchedulerDocTest extends AbstractJavaTest { public class SchedulerDocTest extends AbstractJavaTest {
@Rule @ClassRule
public AkkaJUnitActorSystemResource actorSystemResource = public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("SchedulerDocTest",
new AkkaJUnitActorSystemResource("SchedulerDocTest", AkkaSpec.testConf()); AkkaSpec.testConf());
private final ActorSystem system = actorSystemResource.getSystem(); private final ActorSystem system = actorSystemResource.getSystem();
private ActorRef testActor = system.actorOf(Props.create(MyUntypedActor.class)); private ActorRef testActor = system.actorOf(Props.create(MyUntypedActor.class));

View file

@ -11,11 +11,14 @@ public class CustomRouteTestBase {
public void customRoute() throws Exception{ public void customRoute() throws Exception{
//#CustomRoute //#CustomRoute
ActorSystem system = ActorSystem.create("some-system"); ActorSystem system = ActorSystem.create("some-system");
Camel camel = CamelExtension.get(system); try {
ActorRef responder = system.actorOf(Props.create(Responder.class), "TestResponder"); Camel camel = CamelExtension.get(system);
camel.context().addRoutes(new CustomRouteBuilder(responder)); ActorRef responder = system.actorOf(Props.create(Responder.class), "TestResponder");
//#CustomRoute camel.context().addRoutes(new CustomRouteBuilder(responder));
system.stop(responder); //#CustomRoute
JavaTestKit.shutdownActorSystem(system); system.stop(responder);
} finally {
JavaTestKit.shutdownActorSystem(system);
}
} }
} }

View file

@ -130,6 +130,11 @@ public class SchedulerPatternTest extends AbstractJavaTest {
testSchedule(probe, props, duration("3000 millis"), duration("2500 millis")); testSchedule(probe, props, duration("3000 millis"), duration("2500 millis"));
}}; }};
} }
@Test
public void doNothing() {
// actorSystemResource.after is not called when all tests are ignored
}
public static class TestSchedule extends JavaTestKit { public static class TestSchedule extends JavaTestKit {
private ActorSystem system; private ActorSystem system;

View file

@ -6,16 +6,15 @@ package docs.io
import java.net.{ Inet6Address, InetSocketAddress, NetworkInterface, StandardProtocolFamily } import java.net.{ Inet6Address, InetSocketAddress, NetworkInterface, StandardProtocolFamily }
import java.nio.channels.DatagramChannel import java.nio.channels.DatagramChannel
import scala.util.Random import scala.util.Random
import akka.actor.{ ActorSystem, Props } import akka.actor.{ ActorSystem, Props }
import akka.io.Udp import akka.io.Udp
import akka.testkit.TestKit import akka.testkit.TestKit
import org.scalatest.{ BeforeAndAfter, WordSpecLike } import org.scalatest.{ BeforeAndAfter, WordSpecLike }
import scala.collection.JavaConversions.enumerationAsScalaIterator import scala.collection.JavaConversions.enumerationAsScalaIterator
import org.scalatest.BeforeAndAfterAll
class ScalaUdpMulticastSpec extends TestKit(ActorSystem("ScalaUdpMulticastSpec")) with WordSpecLike with BeforeAndAfter { class ScalaUdpMulticastSpec extends TestKit(ActorSystem("ScalaUdpMulticastSpec")) with WordSpecLike with BeforeAndAfterAll {
"listener" should { "listener" should {
"send message back to sink" in { "send message back to sink" in {
@ -65,7 +64,7 @@ class ScalaUdpMulticastSpec extends TestKit(ActorSystem("ScalaUdpMulticastSpec")
} }
} }
def afterAll(): Unit = { override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system) TestKit.shutdownActorSystem(system)
} }

View file

@ -11,6 +11,8 @@ import akka.testkit.ImplicitSender
import akka.testkit.TestProbe import akka.testkit.TestProbe
import akka.testkit.TestActorRef import akka.testkit.TestActorRef
import akka.actor.ActorRefFactory import akka.actor.ActorRefFactory
import akka.testkit.TestKit
import org.scalatest.BeforeAndAfterAll
/** /**
* Parent-Child examples * Parent-Child examples
@ -74,8 +76,12 @@ class MockedChild extends Actor {
} }
} }
class ParentChildSpec extends WordSpec with Matchers with TestKitBase { class ParentChildSpec extends WordSpec with Matchers with TestKitBase with BeforeAndAfterAll {
implicit lazy val system = ActorSystem() implicit lazy val system = ActorSystem("ParentChildSpec")
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}
"A DependentChild" should { "A DependentChild" should {
"be tested without its parent" in { "be tested without its parent" in {
@ -132,4 +138,4 @@ class ParentChildSpec extends WordSpec with Matchers with TestKitBase {
} }
} }
//#test-fabricated-parent //#test-fabricated-parent
} }

View file

@ -10,11 +10,12 @@ import akka.http.scaladsl.model.HttpMethods._
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.{ Matchers, WordSpec } import org.scalatest.{ Matchers, WordSpec }
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.{ Await } import scala.concurrent.{ Await }
import org.scalatest.BeforeAndAfterAll
import akka.testkit.TestKit
class ClientSpec extends WordSpec with Matchers { class ClientSpec extends WordSpec with Matchers with BeforeAndAfterAll {
val testConf: Config = ConfigFactory.parseString(""" val testConf: Config = ConfigFactory.parseString("""
akka.loggers = ["akka.testkit.TestEventListener"] akka.loggers = ["akka.testkit.TestEventListener"]
akka.loglevel = ERROR akka.loglevel = ERROR
@ -25,6 +26,10 @@ class ClientSpec extends WordSpec with Matchers {
implicit val system = ActorSystem(getClass.getSimpleName, testConf) implicit val system = ActorSystem(getClass.getSimpleName, testConf)
implicit val materializer = ActorMaterializer() implicit val materializer = ActorMaterializer()
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}
"HTTP Client" should { "HTTP Client" should {
"reuse connection pool" in { "reuse connection pool" in {

View file

@ -7,7 +7,6 @@ package akka.http.scaladsl
import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter } import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter }
import java.net.{ BindException, Socket } import java.net.{ BindException, Socket }
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.event.Logging import akka.event.Logging
import akka.event.Logging.LogEvent import akka.event.Logging.LogEvent
@ -26,11 +25,11 @@ import akka.util.ByteString
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.concurrent.ScalaFutures import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.{ Await, Future, Promise }
import scala.util.{ Success, Try } import scala.util.{ Success, Try }
import akka.testkit.TestKit
class TightRequestTimeoutSpec extends WordSpec with Matchers with BeforeAndAfterAll with ScalaFutures { class TightRequestTimeoutSpec extends WordSpec with Matchers with BeforeAndAfterAll with ScalaFutures {
val testConf: Config = ConfigFactory.parseString(""" val testConf: Config = ConfigFactory.parseString("""
@ -46,6 +45,10 @@ class TightRequestTimeoutSpec extends WordSpec with Matchers with BeforeAndAfter
implicit val materializer = ActorMaterializer() implicit val materializer = ActorMaterializer()
implicit val patience = PatienceConfig(3.seconds) implicit val patience = PatienceConfig(3.seconds)
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}
"Tight request timeout" should { "Tight request timeout" should {
"not cause double push error caused by the late response attemting to push" in { "not cause double push error caused by the late response attemting to push" in {
@ -65,4 +68,4 @@ class TightRequestTimeoutSpec extends WordSpec with Matchers with BeforeAndAfter
} }
} }
} }

View file

@ -7,7 +7,7 @@ package akka.stream;
import org.scalatest.junit.JUnitSuite; import org.scalatest.junit.JUnitSuite;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.testkit.AkkaJUnitActorSystemResource;
public abstract class StreamTest extends JUnitSuite { public abstract class StreamTest extends JUnitSuite {
final protected ActorSystem system; final protected ActorSystem system;

View file

@ -3,7 +3,7 @@ package akka.stream.actor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import akka.stream.StreamTest; import akka.stream.StreamTest;
import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.testkit.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Source; import akka.stream.javadsl.Source;
import akka.testkit.AkkaSpec; import akka.testkit.AkkaSpec;
import akka.testkit.JavaTestKit; import akka.testkit.JavaTestKit;

View file

@ -3,7 +3,7 @@ package akka.stream.actor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import akka.stream.StreamTest; import akka.stream.StreamTest;
import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.testkit.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Sink; import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source; import akka.stream.javadsl.Source;
import akka.testkit.AkkaSpec; import akka.testkit.AkkaSpec;

View file

@ -5,7 +5,7 @@ package akka.stream.io;
import akka.japi.Pair; import akka.japi.Pair;
import akka.stream.StreamTest; import akka.stream.StreamTest;
import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.testkit.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Sink; import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source; import akka.stream.javadsl.Source;
import akka.stream.javadsl.StreamConverters; import akka.stream.javadsl.StreamConverters;

View file

@ -5,7 +5,7 @@ package akka.stream.io;
import akka.stream.IOResult; import akka.stream.IOResult;
import akka.stream.StreamTest; import akka.stream.StreamTest;
import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.testkit.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Source; import akka.stream.javadsl.Source;
import akka.stream.javadsl.StreamConverters; import akka.stream.javadsl.StreamConverters;
import akka.stream.testkit.Utils; import akka.stream.testkit.Utils;
@ -29,7 +29,7 @@ public class OutputStreamSinkTest extends StreamTest {
} }
@ClassRule @ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("OutputStreamSink", public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("OutputStreamSinkTest",
Utils.UnboundedMailboxConfig()); Utils.UnboundedMailboxConfig());
@Test @Test
public void mustSignalFailureViaIoResult() throws Exception { public void mustSignalFailureViaIoResult() throws Exception {
@ -44,7 +44,7 @@ public class OutputStreamSinkTest extends StreamTest {
} }
}; };
final CompletionStage<IOResult> resultFuture = Source.single(ByteString.fromString("123456")).runWith(StreamConverters.fromOutputStream(() -> os), materializer); final CompletionStage<IOResult> resultFuture = Source.single(ByteString.fromString("123456")).runWith(StreamConverters.fromOutputStream(() -> os), materializer);
final IOResult result = resultFuture.toCompletableFuture().get(300, TimeUnit.MILLISECONDS); final IOResult result = resultFuture.toCompletableFuture().get(3000, TimeUnit.MILLISECONDS);
assertFalse(result.wasSuccessful()); assertFalse(result.wasSuccessful());
assertTrue(result.getError().getMessage().equals("Can't accept more data.")); assertTrue(result.getError().getMessage().equals("Can't accept more data."));

View file

@ -14,7 +14,7 @@ import org.junit.Test;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.japi.function.Procedure; import akka.japi.function.Procedure;
import akka.stream.StreamTest; import akka.stream.StreamTest;
import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.testkit.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Sink; import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source; import akka.stream.javadsl.Source;
import akka.stream.javadsl.StreamConverters; import akka.stream.javadsl.StreamConverters;
@ -29,11 +29,11 @@ public class OutputStreamSourceTest extends StreamTest {
} }
@ClassRule @ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("OutputStreamSource", public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("OutputStreamSourceTest2",
Utils.UnboundedMailboxConfig()); Utils.UnboundedMailboxConfig());
@Test @Test
public void mustSendEventsViaOutputStream() throws Exception { public void mustSendEventsViaOutputStream() throws Exception {
final FiniteDuration timeout = FiniteDuration.create(300, TimeUnit.MILLISECONDS); final FiniteDuration timeout = FiniteDuration.create(3000, TimeUnit.MILLISECONDS);
final JavaTestKit probe = new JavaTestKit(system); final JavaTestKit probe = new JavaTestKit(system);
final Source<ByteString, OutputStream> source = StreamConverters.asOutputStream(timeout); final Source<ByteString, OutputStream> source = StreamConverters.asOutputStream(timeout);
@ -45,6 +45,8 @@ public class OutputStreamSourceTest extends StreamTest {
})).run(materializer); })).run(materializer);
s.write("a".getBytes()); s.write("a".getBytes());
assertEquals(ByteString.fromString("a"), probe.receiveOne(timeout)); assertEquals(ByteString.fromString("a"), probe.receiveOne(timeout));
s.close(); s.close();

View file

@ -5,7 +5,7 @@ package akka.stream.io;
import akka.stream.StreamTest; import akka.stream.StreamTest;
import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.testkit.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Sink; import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source; import akka.stream.javadsl.Source;
import akka.stream.javadsl.StreamConverters; import akka.stream.javadsl.StreamConverters;

View file

@ -1,84 +0,0 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.javadsl;
import org.junit.rules.ExternalResource;
import akka.actor.ActorSystem;
import akka.testkit.AkkaSpec;
import akka.testkit.JavaTestKit;
import com.typesafe.config.Config;
// FIXME remove this copy, and use akka.testkit.AkkaJUnitActorSystemResource when
// akka-stream-experimental becomes a normal build project
/**
* This is a resource for creating an actor system before test start and shut it
* down afterwards.
*
* To use it on a class level add this to your test class:
*
* @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource =
* new AkkaJUnitActorSystemResource(name, config);
*
* private final ActorSystem system =
* actorSystemResource.getSystem();
*
*
* To use it on a per test level add this to your test class:
*
* @Rule public AkkaJUnitActorSystemResource actorSystemResource = new
* AkkaJUnitActorSystemResource(name, config);
*
* private final ActorSystem system = actorSystemResource.getSystem();
*/
public class AkkaJUnitActorSystemResource extends ExternalResource {
private ActorSystem system = null;
private final String name;
private final Config config;
private ActorSystem createSystem(String name, Config config) {
try {
if (config == null)
return ActorSystem.create(name);
else
return ActorSystem.create(name, config);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public AkkaJUnitActorSystemResource(String name, Config config) {
this.name = name;
this.config = config;
system = createSystem(name, config);
}
public AkkaJUnitActorSystemResource(String name) {
this(name, AkkaSpec.testConf());
}
@Override
protected void before() throws Throwable {
// Sometimes the ExternalResource seems to be reused, and
// we don't run the constructor again, so if that's the case
// then create the system here
if (system == null) {
system = createSystem(name, config);
}
}
@Override
protected void after() {
JavaTestKit.shutdownActorSystem(system);
system = null;
}
public ActorSystem getSystem() {
return system;
}
}

View file

@ -14,6 +14,7 @@ import org.junit.Test;
import akka.stream.Attributes; import akka.stream.Attributes;
import akka.stream.StreamTest; import akka.stream.StreamTest;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.AkkaSpec; import akka.testkit.AkkaSpec;
public class AttributesTest extends StreamTest { public class AttributesTest extends StreamTest {

View file

@ -25,6 +25,7 @@ import akka.japi.function.*;
import akka.util.ByteString; import akka.util.ByteString;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import akka.testkit.AkkaJUnitActorSystemResource;
public class BidiFlowTest extends StreamTest { public class BidiFlowTest extends StreamTest {
public BidiFlowTest() { public BidiFlowTest() {

View file

@ -26,6 +26,7 @@ import scala.concurrent.Await;
import scala.concurrent.Future; import scala.concurrent.Future;
import scala.concurrent.duration.Duration; import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
import akka.testkit.AkkaJUnitActorSystemResource;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;

View file

@ -16,6 +16,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import akka.testkit.AkkaSpec; import akka.testkit.AkkaSpec;
import akka.testkit.AkkaJUnitActorSystemResource;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;

View file

@ -9,6 +9,7 @@ import akka.testkit.AkkaSpec;
import akka.util.ByteString; import akka.util.ByteString;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import akka.testkit.AkkaJUnitActorSystemResource;
public class FramingTest extends StreamTest { public class FramingTest extends StreamTest {
public FramingTest() { public FramingTest() {

View file

@ -14,6 +14,7 @@ import akka.japi.function.*;
import akka.testkit.AkkaSpec; import akka.testkit.AkkaSpec;
import akka.testkit.JavaTestKit; import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe; import akka.testkit.TestProbe;
import akka.testkit.AkkaJUnitActorSystemResource;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;

View file

@ -11,6 +11,7 @@ import org.junit.Test;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import akka.testkit.AkkaJUnitActorSystemResource;
import static org.junit.Assert.*; import static org.junit.Assert.*;

View file

@ -24,6 +24,7 @@ import scala.concurrent.duration.Duration;
import akka.japi.function.Function2; import akka.japi.function.Function2;
import akka.testkit.AkkaSpec; import akka.testkit.AkkaSpec;
import akka.testkit.JavaTestKit; import akka.testkit.JavaTestKit;
import akka.testkit.AkkaJUnitActorSystemResource;
import static org.junit.Assert.*; import static org.junit.Assert.*;

View file

@ -28,6 +28,7 @@ import scala.concurrent.Future;
import scala.concurrent.duration.Duration; import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
import scala.util.Try; import scala.util.Try;
import akka.testkit.AkkaJUnitActorSystemResource;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;

View file

@ -29,6 +29,7 @@ import akka.testkit.AkkaSpec;
import akka.stream.testkit.TestUtils; import akka.stream.testkit.TestUtils;
import akka.util.ByteString; import akka.util.ByteString;
import akka.testkit.JavaTestKit; import akka.testkit.JavaTestKit;
import akka.testkit.AkkaJUnitActorSystemResource;
public class TcpTest extends StreamTest { public class TcpTest extends StreamTest {
public TcpTest() { public TcpTest() {

View file

@ -5,7 +5,7 @@ package akka.stream.stage;
import akka.NotUsed; import akka.NotUsed;
import akka.stream.StreamTest; import akka.stream.StreamTest;
import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.testkit.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Sink; import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source; import akka.stream.javadsl.Source;
import akka.testkit.AkkaSpec; import akka.testkit.AkkaSpec;

View file

@ -31,7 +31,7 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher")
implicit val materializer = ActorMaterializer(settings) implicit val materializer = ActorMaterializer(settings)
val timeout = 300.milliseconds val timeout = 3.seconds
val bytesArray = Array.fill[Byte](3)(Random.nextInt(1024).asInstanceOf[Byte]) val bytesArray = Array.fill[Byte](3)(Random.nextInt(1024).asInstanceOf[Byte])
val byteString = ByteString(bytesArray) val byteString = ByteString(bytesArray)
@ -41,6 +41,16 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
def expectSuccess[T](f: Future[T], value: T) = def expectSuccess[T](f: Future[T], value: T) =
Await.result(f, remainingOrDefault) should be(value) Await.result(f, remainingOrDefault) should be(value)
def assertNoBlockedThreads(): Unit = {
def threadsBlocked =
ManagementFactory.getThreadMXBean.dumpAllThreads(true, true).toSeq
.filter(t t.getThreadName.startsWith("OutputStreamSourceSpec") &&
t.getLockName != null &&
t.getLockName.startsWith("java.util.concurrent.locks.AbstractQueuedSynchronizer"))
awaitAssert(threadsBlocked should ===(Seq()), 3.seconds)
}
"OutputStreamSource" must { "OutputStreamSource" must {
"read bytes from OutputStream" in assertAllStagesStopped { "read bytes from OutputStream" in assertAllStagesStopped {
val (outputStream, probe) = StreamConverters.asOutputStream().toMat(TestSink.probe[ByteString])(Keep.both).run val (outputStream, probe) = StreamConverters.asOutputStream().toMat(TestSink.probe[ByteString])(Keep.both).run
@ -156,11 +166,11 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
.withAttributes(inputBuffer(0, 0)) .withAttributes(inputBuffer(0, 0))
.runWith(Sink.head) .runWith(Sink.head)
/* /*
With Sink.head we test the code path in which the source With Sink.head we test the code path in which the source
itself throws an exception when being materialized. If itself throws an exception when being materialized. If
Sink.ignore is used, the same exception is thrown by Sink.ignore is used, the same exception is thrown by
Materializer. Materializer.
*/ */
} }
} }
@ -175,13 +185,22 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
sub.request(1) sub.request(1)
sub.cancel() sub.cancel()
def threadsBlocked = assertNoBlockedThreads()
ManagementFactory.getThreadMXBean.dumpAllThreads(true, true).toSeq }
.filter(t t.getThreadName.startsWith("OutputStreamSourceSpec") &&
t.getLockName != null &&
t.getLockName.startsWith("java.util.concurrent.locks.AbstractQueuedSynchronizer"))
awaitAssert(threadsBlocked should ===(Seq()), 3.seconds) "not leave blocked threads when materializer shutdown" in {
val materializer2 = ActorMaterializer(settings)
val (outputStream, probe) = StreamConverters.asOutputStream(timeout)
.toMat(TestSink.probe[ByteString])(Keep.both).run()(materializer2)
val sub = probe.expectSubscription()
// triggers a blocking read on the queue
// and then shutdown the materializer before we got anything
sub.request(1)
materializer2.shutdown()
assertNoBlockedThreads()
} }
} }
} }

View file

@ -6,18 +6,21 @@ package akka.stream.impl.io
import java.io.{ IOException, OutputStream } import java.io.{ IOException, OutputStream }
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ BlockingQueue, LinkedBlockingQueue } import java.util.concurrent.{ BlockingQueue, LinkedBlockingQueue }
import akka.stream.{ Outlet, SourceShape, Attributes } import akka.stream.{ Outlet, SourceShape, Attributes }
import akka.stream.Attributes.InputBuffer import akka.stream.Attributes.InputBuffer
import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.io.OutputStreamSourceStage._ import akka.stream.impl.io.OutputStreamSourceStage._
import akka.stream.stage._ import akka.stream.stage._
import akka.util.ByteString import akka.util.ByteString
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.{ Await, Future, Promise }
import scala.util.control.NonFatal import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try } import scala.util.{ Failure, Success, Try }
import akka.stream.ActorAttributes
import akka.stream.impl.Stages.DefaultAttributes.IODispatcher
import akka.stream.ActorAttributes.Dispatcher
import scala.concurrent.ExecutionContext
import akka.stream.ActorMaterializer
private[stream] object OutputStreamSourceStage { private[stream] object OutputStreamSourceStage {
sealed trait AdapterToStageMessage sealed trait AdapterToStageMessage
@ -40,6 +43,9 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, OutputStream) = { override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, OutputStream) = {
val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
val dispatcherId = inheritedAttributes.get[Dispatcher](IODispatcher).dispatcher
require(maxBuffer > 0, "Buffer size must be greater than 0") require(maxBuffer > 0, "Buffer size must be greater than 0")
val dataQueue = new LinkedBlockingQueue[ByteString](maxBuffer) val dataQueue = new LinkedBlockingQueue[ByteString](maxBuffer)
@ -49,6 +55,9 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration
var flush: Option[Promise[Unit]] = None var flush: Option[Promise[Unit]] = None
var close: Option[Promise[Unit]] = None var close: Option[Promise[Unit]] = None
private var dispatcher: ExecutionContext = null // set in preStart
private var blockingThread: Thread = null // for postStop interrupt
private val downstreamCallback: AsyncCallback[Try[ByteString]] = private val downstreamCallback: AsyncCallback[Try[ByteString]] =
getAsyncCallback { getAsyncCallback {
case Success(elem) onPush(elem) case Success(elem) onPush(elem)
@ -102,6 +111,11 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration
sendResponseIfNeed() sendResponseIfNeed()
} }
override def preStart(): Unit = {
dispatcher = ActorMaterializer.downcast(materializer).system.dispatchers.lookup(dispatcherId)
super.preStart()
}
setHandler(out, new OutHandler { setHandler(out, new OutHandler {
override def onDownstreamFinish(): Unit = { override def onDownstreamFinish(): Unit = {
//assuming there can be no further in messages //assuming there can be no further in messages
@ -112,10 +126,29 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration
completeStage() completeStage()
} }
override def onPull(): Unit = { override def onPull(): Unit = {
implicit val ex = interpreter.materializer.executionContext implicit val ec = dispatcher
Future(dataQueue.take()).onComplete(downstreamCallback.invoke) Future {
// keep track of the thread for postStop interrupt
blockingThread = Thread.currentThread()
try {
dataQueue.take()
} catch {
case _: InterruptedException
Thread.interrupted()
ByteString()
} finally {
blockingThread = null
}
}.onComplete(downstreamCallback.invoke)
} }
}) })
override def postStop(): Unit = {
// interrupt any pending blocking take
if (blockingThread != null)
blockingThread.interrupt()
super.postStop()
}
} }
(logic, new OutputStreamAdapter(dataQueue, downstreamStatus, logic.wakeUp, writeTimeout)) (logic, new OutputStreamAdapter(dataQueue, downstreamStatus, logic.wakeUp, writeTimeout))
} }

View file

@ -4,28 +4,44 @@
package akka.testkit; package akka.testkit;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import com.typesafe.config.Config; import com.typesafe.config.Config;
import org.junit.rules.ExternalResource; import org.junit.rules.ExternalResource;
/** /**
* This is a resource for creating an actor system before test start and shut it down afterwards. * This is a resource for creating an actor system before test start and shut it
* down afterwards.
* *
* To use it on a class level add this to your test class: * To use it on a class level add this to your test class:
* *
* @ClassRule * <code>
* &#64;ClassRule
* public static AkkaJUnitActorSystemResource actorSystemResource = * public static AkkaJUnitActorSystemResource actorSystemResource =
* new AkkaJUnitActorSystemResource(name, config); * new AkkaJUnitActorSystemResource(name, config);
* *
* private final ActorSystem system = actorSystemResource.getSystem(); * private final ActorSystem system = actorSystemResource.getSystem();
* </code>
* *
* *
* To use it on a per test level add this to your test class: * To use it on a per test level add this to your test class:
* *
* @Rule * <code>
* &#64;Rule
* public AkkaJUnitActorSystemResource actorSystemResource = * public AkkaJUnitActorSystemResource actorSystemResource =
* new AkkaJUnitActorSystemResource(name, config); * new AkkaJUnitActorSystemResource(name, config);
* *
* private final ActorSystem system = actorSystemResource.getSystem(); * private ActorSystem system = null;
*
* &#64;Before
* public void beforeEach() {
* system = actorSystemResource.getSystem();
* }
* </code>
*
* Note that it is important to not use <code>getSystem</code> from the
* constructor of the test, becuase some test runners may create an
* instance of the class without actually using it later, resulting in
* memory leaks because of not shutting down the actor system.
*/ */
public class AkkaJUnitActorSystemResource extends ExternalResource { public class AkkaJUnitActorSystemResource extends ExternalResource {
@ -49,7 +65,6 @@ public class AkkaJUnitActorSystemResource extends ExternalResource {
public AkkaJUnitActorSystemResource(String name, Config config) { public AkkaJUnitActorSystemResource(String name, Config config) {
this.name = name; this.name = name;
this.config = config; this.config = config;
system = createSystem(name, config);
} }
public AkkaJUnitActorSystemResource(String name) { public AkkaJUnitActorSystemResource(String name) {
@ -58,9 +73,6 @@ public class AkkaJUnitActorSystemResource extends ExternalResource {
@Override @Override
protected void before() throws Throwable { protected void before() throws Throwable {
// Sometimes the ExternalResource seems to be reused, and
// we don't run the constructor again, so if that's the case
// then create the system here
if (system == null) { if (system == null) {
system = createSystem(name, config); system = createSystem(name, config);
} }
@ -73,6 +85,9 @@ public class AkkaJUnitActorSystemResource extends ExternalResource {
} }
public ActorSystem getSystem() { public ActorSystem getSystem() {
if (system == null) {
system = createSystem(name, config);
}
return system; return system;
} }
} }

View file

@ -25,13 +25,13 @@ import org.scalactic.Constraint
/** /**
* Helper class for writing tests for typed Actors with ScalaTest. * Helper class for writing tests for typed Actors with ScalaTest.
*/ */
class TypedSpec(config: Config) extends Spec with Matchers with BeforeAndAfterAll with ScalaFutures with ConversionCheckedTripleEquals { abstract class TypedSpec(config: Config) extends Spec with Matchers with BeforeAndAfterAll with ScalaFutures with ConversionCheckedTripleEquals {
import TypedSpec._ import TypedSpec._
import AskPattern._ import AskPattern._
def this() = this(ConfigFactory.empty) def this() = this(ConfigFactory.empty)
implicit val system = ActorSystem(AkkaSpec.getCallerName(classOf[TypedSpec]), Props(guardian()), Some(config withFallback AkkaSpec.testConf)) implicit val system = ActorSystem(TypedSpec.getCallerName(classOf[TypedSpec]), Props(guardian()), Some(config withFallback AkkaSpec.testConf))
implicit val timeout = Timeout(1.minute) implicit val timeout = Timeout(1.minute)
implicit val patience = PatienceConfig(3.seconds) implicit val patience = PatienceConfig(3.seconds)
@ -146,4 +146,14 @@ object TypedSpec {
c.replyTo ! ctx.spawn(c.props, c.name) c.replyTo ! ctx.spawn(c.props, c.name)
Same Same
} }
def getCallerName(clazz: Class[_]): String = {
val s = (Thread.currentThread.getStackTrace map (_.getClassName) drop 1)
.dropWhile(_ matches "(java.lang.Thread|.*TypedSpec.?$)")
val reduced = s.lastIndexWhere(_ == clazz.getName) match {
case -1 s
case z s drop (z + 1)
}
reduced.head.replaceFirst(""".*\.""", "").replaceAll("[^a-zA-Z_0-9]", "_")
}
} }