From fca3e1e485644c7d6f01be8a646b0ee09b3e17f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andre=CC=81n?= Date: Tue, 29 Nov 2016 11:51:31 +0100 Subject: [PATCH] Add flight recording and dump on failure for more artery tests --- .../remote/artery/ArteryMultiNodeSpec.scala | 48 +---------- .../remote/artery/ArterySpecSupport.scala | 81 +++++++++++++++++++ .../remote/artery/FlushOnShutdownSpec.scala | 16 +--- .../remote/artery/HandshakeDenySpec.scala | 12 +-- .../remote/artery/HandshakeFailureSpec.scala | 14 +--- .../remote/artery/HandshakeRetrySpec.scala | 18 ++--- .../akka/remote/artery/LateConnectSpec.scala | 17 ++-- .../artery/OutboundControlJunctionSpec.scala | 13 +-- .../remote/artery/RemoteDeathWatchSpec.scala | 8 +- .../remote/artery/RemoteDeployerSpec.scala | 8 +- .../remote/artery/RemoteDeploymentSpec.scala | 8 +- .../akka/remote/artery/RemoteRouterSpec.scala | 9 +-- .../artery/RemoteSendConsistencySpec.scala | 35 +++----- .../remote/artery/RemoteWatcherSpec.scala | 11 +-- .../artery/SerializationErrorSpec.scala | 15 +--- .../artery/SystemMessageDeliverySpec.scala | 19 ++--- .../akka/remote/artery/UntrustedSpec.scala | 26 +++--- .../scala/akka/stream/javadsl/Source.scala | 2 +- .../scala/akka/stream/scaladsl/Source.scala | 2 +- 19 files changed, 149 insertions(+), 213 deletions(-) create mode 100644 akka-remote/src/test/scala/akka/remote/artery/ArterySpecSupport.scala diff --git a/akka-remote/src/test/scala/akka/remote/artery/ArteryMultiNodeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/ArteryMultiNodeSpec.scala index 32afc155a7..e1557522ca 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/ArteryMultiNodeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/ArteryMultiNodeSpec.scala @@ -12,33 +12,12 @@ import akka.testkit.AkkaSpec import com.typesafe.config.{ Config, ConfigFactory } import org.scalatest.Outcome -object ArteryMultiNodeSpec { - - def defaultConfig = - ConfigFactory.parseString(s""" - akka { - actor.provider = remote - actor.warn-about-java-serializer-usage = off - remote.artery { - enabled = on - canonical { - hostname = localhost - port = 0 - } - advanced.flight-recorder { - enabled=on - destination=target/flight-recorder-${UUID.randomUUID().toString}.afr - } - } - } - """) -} - /** * Base class for remoting tests what needs to test interaction between a "local" actor system * which is always created (the usual AkkaSpec system), and multiple additional actor systems over artery */ -abstract class ArteryMultiNodeSpec(config: Config) extends AkkaSpec(config.withFallback(ArteryMultiNodeSpec.defaultConfig)) { +abstract class ArteryMultiNodeSpec(config: Config) extends AkkaSpec(config.withFallback(ArterySpecSupport.defaultConfig)) + with FlightRecorderSpecIntegration { def this() = this(ConfigFactory.empty()) def this(extraConfig: String) = this(ConfigFactory.parseString(extraConfig)) @@ -50,8 +29,6 @@ abstract class ArteryMultiNodeSpec(config: Config) extends AkkaSpec(config.withF def address(sys: ActorSystem) = RARP(sys).provider.getDefaultAddress def rootActorPath(sys: ActorSystem) = RootActorPath(address(sys)) def nextGeneratedSystemName = s"${localSystem.name}-remote-${remoteSystems.size}" - private val flightRecorderFile: Path = - FileSystems.getDefault.getPath(RARP(system).provider.remoteSettings.Artery.Advanced.FlightRecorderDestination) private var remoteSystems: Vector[ActorSystem] = Vector.empty @@ -73,29 +50,10 @@ abstract class ArteryMultiNodeSpec(config: Config) extends AkkaSpec(config.withF remoteSystem } - // keep track of failure so that we can print flight recorder output on failures - private var failed = false - override protected def withFixture(test: NoArgTest): Outcome = { - val out = super.withFixture(test) - if (!out.isSucceeded) failed = true - out - } - override def afterTermination(): Unit = { remoteSystems.foreach(sys ⇒ shutdown(sys)) remoteSystems = Vector.empty - handleFlightRecorderFile() - } - - private def handleFlightRecorderFile(): Unit = { - if (Files.exists(flightRecorderFile)) { - if (failed) { - // logger may not be alive anymore so we have to use stdout here - println("Flight recorder dump:") - FlightRecorderReader.dumpToStdout(flightRecorderFile) - } - Files.delete(flightRecorderFile) - } + super.afterTermination() } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/ArterySpecSupport.scala b/akka-remote/src/test/scala/akka/remote/artery/ArterySpecSupport.scala new file mode 100644 index 0000000000..69d7e3040c --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/ArterySpecSupport.scala @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.nio.file.{ FileSystems, Files, Path } +import java.util.UUID + +import akka.actor.ActorSystem +import akka.remote.RARP +import akka.testkit.AkkaSpec +import com.typesafe.config.ConfigFactory +import org.scalatest.Outcome + +object ArterySpecSupport { + // same for all artery enabled remoting tests + private val staticArteryRemotingConfig = ConfigFactory.parseString(s""" + akka { + actor { + provider = remote + warn-about-java-serializer-usage = off + serialize-creators = off + } + remote.artery { + enabled = on + canonical { + hostname = localhost + port = 0 + } + } + }""") + + /** artery enabled, flight recorder enabled, dynamic selection of port on localhost */ + def defaultConfig = + ConfigFactory.parseString(s""" + akka { + remote.artery { + advanced.flight-recorder { + enabled=on + destination=target/flight-recorder-${UUID.randomUUID().toString}.afr + } + } + } + """).withFallback(staticArteryRemotingConfig) + +} + +/** + * Dumps flight recorder data on test failure if artery flight recorder is enabled + */ +trait FlightRecorderSpecIntegration { self: AkkaSpec ⇒ + + def system: ActorSystem + + private val flightRecorderFile: Path = + FileSystems.getDefault.getPath(RARP(system).provider.remoteSettings.Artery.Advanced.FlightRecorderDestination) + + // keep track of failure so that we can print flight recorder output on failures + private var failed = false + override protected def withFixture(test: NoArgTest): Outcome = { + val out = test() + if (!out.isSucceeded) failed = true + out + } + + override def afterTermination(): Unit = { + self.afterTermination() + handleFlightRecorderFile() + } + + protected def handleFlightRecorderFile(): Unit = { + if (Files.exists(flightRecorderFile)) { + if (failed) { + // logger may not be alive anymore so we have to use stdout here + println("Flight recorder dump:") + FlightRecorderReader.dumpToStdout(flightRecorderFile) + } + Files.delete(flightRecorderFile) + } + } +} \ No newline at end of file diff --git a/akka-remote/src/test/scala/akka/remote/artery/FlushOnShutdownSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/FlushOnShutdownSpec.scala index 918241ed77..816ba20a2a 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/FlushOnShutdownSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/FlushOnShutdownSpec.scala @@ -11,21 +11,7 @@ import com.typesafe.config.ConfigFactory import scala.concurrent.Await import scala.concurrent.duration._ -object FlushOnShutdownSpec { - - val config = ConfigFactory.parseString(s""" - akka { - actor.provider = remote - actor.serialize-creators = off - remote.artery.enabled = on - remote.artery.canonical.hostname = localhost - remote.artery.canonical.port = 0 - } - """) - -} - -class FlushOnShutdownSpec extends ArteryMultiNodeSpec(FlushOnShutdownSpec.config) { +class FlushOnShutdownSpec extends ArteryMultiNodeSpec(ArterySpecSupport.defaultConfig) { val remoteSystem = newRemoteSystem() diff --git a/akka-remote/src/test/scala/akka/remote/artery/HandshakeDenySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/HandshakeDenySpec.scala index fe6e9993ca..7640ec72a8 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/HandshakeDenySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/HandshakeDenySpec.scala @@ -14,15 +14,9 @@ object HandshakeDenySpec { val commonConfig = ConfigFactory.parseString(s""" akka.loglevel = WARNING - akka { - actor.provider = remote - remote.artery.enabled = on - remote.artery.canonical.hostname = localhost - remote.artery.canonical.port = 0 - remote.artery.advanced.handshake-timeout = 2s - remote.artery.advanced.image-liveness-timeout = 1.9s - } - """) + akka.remote.artery.advanced.handshake-timeout = 2s + akka.remote.artery.advanced.image-liveness-timeout = 1.9s + """).withFallback(ArterySpecSupport.defaultConfig) } diff --git a/akka-remote/src/test/scala/akka/remote/artery/HandshakeFailureSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/HandshakeFailureSpec.scala index ec72c0bd4a..9faed684ef 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/HandshakeFailureSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/HandshakeFailureSpec.scala @@ -18,22 +18,16 @@ object HandshakeFailureSpec { val portB = SocketUtil.temporaryServerAddress("localhost", udp = true).getPort val commonConfig = ConfigFactory.parseString(s""" - akka { - actor.provider = remote - remote.artery.enabled = on - remote.artery.canonical.hostname = localhost - remote.artery.canonical.port = 0 - remote.artery.advanced.handshake-timeout = 2s - remote.artery.advanced.image-liveness-timeout = 1.9s - } - """) + akka.remote.artery.advanced.handshake-timeout = 2s + akka.remote.artery.advanced.image-liveness-timeout = 1.9s + """).withFallback(ArterySpecSupport.defaultConfig) val configB = ConfigFactory.parseString(s"akka.remote.artery.canonical.port = $portB") .withFallback(commonConfig) } -class HandshakeFailureSpec extends AkkaSpec(HandshakeFailureSpec.commonConfig) with ImplicitSender { +class HandshakeFailureSpec extends AkkaSpec(HandshakeFailureSpec.commonConfig) with ImplicitSender with FlightRecorderSpecIntegration { import HandshakeFailureSpec._ var systemB: ActorSystem = null diff --git a/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala index 8573ad34f1..71049a0c3d 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala @@ -17,22 +17,16 @@ object HandshakeRetrySpec { val portB = SocketUtil.temporaryServerAddress("localhost", udp = true).getPort val commonConfig = ConfigFactory.parseString(s""" - akka { - actor.provider = remote - remote.artery.enabled = on - remote.artery.canonical.hostname = localhost - remote.artery.canonical.port = 0 - remote.artery.advanced.handshake-timeout = 10s - remote.artery.advanced.image-liveness-timeout = 7s - } - """) + akka.remote.artery.advanced.handshake-timeout = 10s + akka.remote.artery.advanced.image-liveness-timeout = 7s + """).withFallback(ArterySpecSupport.defaultConfig) val configB = ConfigFactory.parseString(s"akka.remote.artery.canonical.port = $portB") .withFallback(commonConfig) } -class HandshakeRetrySpec extends AkkaSpec(HandshakeRetrySpec.commonConfig) with ImplicitSender { +class HandshakeRetrySpec extends AkkaSpec(HandshakeRetrySpec.commonConfig) with ImplicitSender with FlightRecorderSpecIntegration { import HandshakeRetrySpec._ var systemB: ActorSystem = null @@ -58,7 +52,9 @@ class HandshakeRetrySpec extends AkkaSpec(HandshakeRetrySpec.commonConfig) with } - override def afterTermination(): Unit = + override def afterTermination(): Unit = { if (systemB != null) shutdown(systemB) + super.afterTermination() + } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/LateConnectSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/LateConnectSpec.scala index e50b71d91a..6b87251301 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/LateConnectSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/LateConnectSpec.scala @@ -18,15 +18,9 @@ import com.typesafe.config.ConfigFactory object LateConnectSpec { val config = ConfigFactory.parseString(s""" - akka { - actor.provider = remote - remote.artery.enabled = on - remote.artery.canonical.hostname = localhost - remote.artery.canonical.port = 0 - remote.artery.advanced.handshake-timeout = 3s - remote.artery.advanced.image-liveness-timeout = 2.9s - } - """) + akka.remote.artery.advanced.handshake-timeout = 3s + akka.remote.artery.advanced.image-liveness-timeout = 2.9s + """).withFallback(ArterySpecSupport.defaultConfig) } @@ -61,6 +55,9 @@ class LateConnectSpec extends AkkaSpec(LateConnectSpec.config) with ImplicitSend } } - override def afterTermination(): Unit = shutdown(systemB) + override def afterTermination(): Unit = { + shutdown(systemB) + super.afterTermination() + } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/OutboundControlJunctionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/OutboundControlJunctionSpec.scala index 58192b5099..66745ecc67 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/OutboundControlJunctionSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/OutboundControlJunctionSpec.scala @@ -3,19 +3,12 @@ */ package akka.remote.artery -import scala.concurrent.duration._ import akka.actor.Address -import akka.remote.EndpointManager.Send -import akka.remote.RemoteActorRef import akka.remote.UniqueAddress -import akka.remote.artery.SystemMessageDelivery._ -import akka.stream.ActorMaterializer -import akka.stream.ActorMaterializerSettings +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.scaladsl.Keep -import akka.stream.testkit.scaladsl.TestSink -import akka.stream.testkit.scaladsl.TestSource -import akka.testkit.AkkaSpec -import akka.testkit.ImplicitSender +import akka.stream.testkit.scaladsl.{ TestSink, TestSource } +import akka.testkit.{ AkkaSpec, ImplicitSender } import akka.util.OptionVal object OutboundControlJunctionSpec { diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala index 635b73dc8e..288de197e2 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala @@ -26,14 +26,11 @@ object RemoteDeathWatchSpec { } } remote.watch-failure-detector.acceptable-heartbeat-pause = 3s - remote.artery.enabled = on - remote.artery.canonical.hostname = localhost - remote.artery.canonical.port = 0 } - """) + """).withFallback(ArterySpecSupport.defaultConfig) } -class RemoteDeathWatchSpec extends AkkaSpec(RemoteDeathWatchSpec.config) with ImplicitSender with DefaultTimeout with DeathWatchSpec { +class RemoteDeathWatchSpec extends AkkaSpec(RemoteDeathWatchSpec.config) with ImplicitSender with DefaultTimeout with DeathWatchSpec with FlightRecorderSpecIntegration { import RemoteDeathWatchSpec._ system.eventStream.publish(TestEvent.Mute( @@ -44,6 +41,7 @@ class RemoteDeathWatchSpec extends AkkaSpec(RemoteDeathWatchSpec.config) with Im override def afterTermination() { shutdown(other) + super.afterTermination() } override def expectedTestDuration: FiniteDuration = 120.seconds diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeployerSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeployerSpec.scala index b4152ae111..67e6182d5c 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeployerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeployerSpec.scala @@ -12,7 +12,6 @@ import akka.remote.RemoteScope object RemoteDeployerSpec { val deployerConf = ConfigFactory.parseString(""" - akka.actor.provider = remote akka.actor.deployment { /service2 { router = round-robin-pool @@ -21,10 +20,7 @@ object RemoteDeployerSpec { dispatcher = mydispatcher } } - akka.remote.artery.enabled = on - akka.remote.artery.canonical.hostname = localhost - akka.remote.artery.canonical.port = 0 - """, ConfigParseOptions.defaults) + """).withFallback(ArterySpecSupport.defaultConfig) class RecipeActor extends Actor { def receive = { case _ ⇒ } @@ -32,7 +28,7 @@ object RemoteDeployerSpec { } -class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) { +class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) with FlightRecorderSpecIntegration { "A RemoteDeployer" must { diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeploymentSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeploymentSpec.scala index 90f89c3147..b08db3cfce 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeploymentSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeploymentSpec.scala @@ -32,13 +32,7 @@ object RemoteDeploymentSpec { } } -class RemoteDeploymentSpec extends AkkaSpec(""" - #akka.loglevel=DEBUG - akka.actor.provider = remote - akka.remote.artery.enabled = on - akka.remote.artery.canonical.hostname = localhost - akka.remote.artery.canonical.port = 0 - """) { +class RemoteDeploymentSpec extends AkkaSpec(ArterySpecSupport.defaultConfig) { import RemoteDeploymentSpec._ diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala index 80f133b7cc..afb9b56e81 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala @@ -21,11 +21,7 @@ object RemoteRouterSpec { } } -class RemoteRouterSpec extends AkkaSpec(""" - akka.actor.provider = remote - akka.remote.artery.enabled = on - akka.remote.artery.canonical.hostname = localhost - akka.remote.artery.canonical.port = 0 +class RemoteRouterSpec extends AkkaSpec(ConfigFactory.parseString(""" akka.actor.deployment { /remote-override { router = round-robin-pool @@ -39,7 +35,7 @@ class RemoteRouterSpec extends AkkaSpec(""" router = round-robin-pool nr-of-instances = 6 } - }""") { + }""").withFallback(ArterySpecSupport.defaultConfig)) with FlightRecorderSpecIntegration { import RemoteRouterSpec._ @@ -85,6 +81,7 @@ class RemoteRouterSpec extends AkkaSpec(""" override def afterTermination(): Unit = { shutdown(masterSystem) + super.afterTermination() } def collectRouteePaths(probe: TestProbe, router: ActorRef, n: Int): immutable.Seq[ActorPath] = { diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala index 2b6100be6a..e9e7e95b8b 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala @@ -3,38 +3,20 @@ */ package akka.remote.artery -import scala.concurrent.duration._ -import akka.actor.{ Actor, ActorIdentity, ActorSystem, Deploy, ExtendedActorSystem, Identify, Props, RootActorPath } -import akka.testkit.{ AkkaSpec, ImplicitSender } -import com.typesafe.config.ConfigFactory -import akka.actor.Actor.Receive +import akka.actor.{ Actor, ActorIdentity, ActorRef, ActorSystem, Deploy, Identify, PoisonPill, Props, RootActorPath } import akka.remote.RARP -import akka.testkit.TestActors -import akka.actor.PoisonPill -import akka.testkit.TestProbe -import akka.actor.ActorRef -import com.typesafe.config.Config +import akka.testkit.{ AkkaSpec, ImplicitSender, TestActors, TestProbe } +import com.typesafe.config.{ Config, ConfigFactory } -object RemoteSendConsistencySpec { +import scala.concurrent.duration._ - val config = ConfigFactory.parseString(s""" - akka { - actor.provider = remote - remote.artery.enabled = on - remote.artery.canonical.hostname = localhost - remote.artery.canonical.port = 0 - } - """) - -} - -class RemoteSendConsistencySpec extends AbstractRemoteSendConsistencySpec(RemoteSendConsistencySpec.config) +class RemoteSendConsistencySpec extends AbstractRemoteSendConsistencySpec(ArterySpecSupport.defaultConfig) class RemoteSendConsistencyWithThreeLanesSpec extends AbstractRemoteSendConsistencySpec( ConfigFactory.parseString(""" akka.remote.artery.advanced.outbound-lanes = 3 akka.remote.artery.advanced.inbound-lanes = 3 - """).withFallback(RemoteSendConsistencySpec.config)) + """).withFallback(ArterySpecSupport.defaultConfig)) with FlightRecorderSpecIntegration abstract class AbstractRemoteSendConsistencySpec(config: Config) extends AkkaSpec(config) with ImplicitSender { @@ -137,6 +119,9 @@ abstract class AbstractRemoteSendConsistencySpec(config: Config) extends AkkaSpe } - override def afterTermination(): Unit = shutdown(systemB) + override def afterTermination(): Unit = { + shutdown(systemB) + super.shutdown(systemB) + } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala index dc236b272f..a47096a3af 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala @@ -63,15 +63,7 @@ object RemoteWatcherSpec { } -class RemoteWatcherSpec extends AkkaSpec( - """akka { - loglevel = INFO - log-dead-letters-during-shutdown = false - actor.provider = remote - remote.artery.enabled = on - remote.artery.canonical.hostname = localhost - remote.artery.canonical.port = 0 - }""") with ImplicitSender { +class RemoteWatcherSpec extends AkkaSpec(ArterySpecSupport.defaultConfig) with ImplicitSender with FlightRecorderSpecIntegration { import RemoteWatcherSpec._ import RemoteWatcher._ @@ -88,6 +80,7 @@ class RemoteWatcherSpec extends AkkaSpec( override def afterTermination() { shutdown(remoteSystem) + super.afterTermination() } val heartbeatRspB = ArteryHeartbeatRsp(remoteAddressUid) diff --git a/akka-remote/src/test/scala/akka/remote/artery/SerializationErrorSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SerializationErrorSpec.scala index 63b19320d4..995cff9555 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SerializationErrorSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SerializationErrorSpec.scala @@ -13,24 +13,11 @@ import akka.testkit.EventFilter object SerializationErrorSpec { - val config = ConfigFactory.parseString(s""" - akka { - actor.provider = remote - remote.artery.enabled = on - remote.artery.canonical.hostname = localhost - remote.artery.canonical.port = 0 - actor { - serialize-creators = false - serialize-messages = false - } - } - """) - object NotSerializableMsg } -class SerializationErrorSpec extends AkkaSpec(SerializationErrorSpec.config) with ImplicitSender { +class SerializationErrorSpec extends AkkaSpec(ArterySpecSupport.defaultConfig) with ImplicitSender { import SerializationErrorSpec._ val configB = ConfigFactory.parseString(""" diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala index a702272148..3cbff738ec 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala @@ -31,23 +31,11 @@ import akka.util.OptionVal object SystemMessageDeliverySpec { - val config = ConfigFactory.parseString(s""" - akka.loglevel=INFO - akka { - actor.provider = remote - remote.artery.enabled = on - remote.artery.canonical.hostname = localhost - remote.artery.canonical.port = 0 - } - akka.actor.serialize-creators = off - akka.actor.serialize-messages = off - """) - case class TestSysMsg(s: String) extends SystemMessageDelivery.AckedDeliveryMessage } -class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.config) with ImplicitSender { +class SystemMessageDeliverySpec extends AkkaSpec(ArterySpecSupport.defaultConfig) with ImplicitSender with FlightRecorderSpecIntegration { import SystemMessageDeliverySpec._ val addressA = UniqueAddress( @@ -63,7 +51,10 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 16) - override def afterTermination(): Unit = shutdown(systemB) + override def afterTermination(): Unit = { + shutdown(systemB) + super.afterTermination() + } private def send(sendCount: Int, resendInterval: FiniteDuration, outboundContext: OutboundContext): Source[OutboundEnvelope, NotUsed] = { val deadLetters = TestProbe().ref diff --git a/akka-remote/src/test/scala/akka/remote/artery/UntrustedSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/UntrustedSpec.scala index 2d901f492e..df2db63dca 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/UntrustedSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/UntrustedSpec.scala @@ -59,26 +59,21 @@ object UntrustedSpec { } } + val config = ConfigFactory.parseString( + """ + akka.remote.artery.untrusted-mode = on + akka.remote.artery.trusted-selection-paths = ["/user/receptionist", ] + akka.loglevel = DEBUG # the test is verifying some Debug logging + """ + ).withFallback(ArterySpecSupport.defaultConfig) + } -class UntrustedSpec extends AkkaSpec(""" - akka.actor.provider = remote - akka.remote.artery.untrusted-mode = on - akka.remote.artery.trusted-selection-paths = ["/user/receptionist", ] - akka.remote.artery.enabled = on - akka.remote.artery.canonical.hostname = localhost - akka.remote.artery.canonical.port = 0 - akka.loglevel = DEBUG # the test is verifying some Debug logging - """) with ImplicitSender { +class UntrustedSpec extends AkkaSpec(UntrustedSpec.config) with ImplicitSender with FlightRecorderSpecIntegration { import UntrustedSpec._ - val client = ActorSystem("UntrustedSpec-client", ConfigFactory.parseString(""" - akka.actor.provider = remote - akka.remote.artery.enabled = on - akka.remote.artery.canonical.hostname = localhost - akka.remote.artery.canonical.port = 0 - """)) + val client = ActorSystem("UntrustedSpec-client", ArterySpecSupport.defaultConfig) val addr = RARP(system).provider.getDefaultAddress val receptionist = system.actorOf(Props(classOf[Receptionist], testActor), "receptionist") @@ -100,6 +95,7 @@ class UntrustedSpec extends AkkaSpec(""" override def afterTermination() { shutdown(client) + super.afterTermination() } // need to enable debug log-level without actually printing those messages diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 501e1dec44..aae573eb12 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -227,7 +227,7 @@ object Source { * `create` factory is never called and the materialized `CompletionStage` is failed. */ def lazily[T, M](create: function.Creator[Source[T, M]]): Source[T, CompletionStage[M]] = - scaladsl.Source.lazily[T, M](() => create.create().asScala).mapMaterializedValue(_.toJava).asJava + scaladsl.Source.lazily[T, M](() ⇒ create.create().asScala).mapMaterializedValue(_.toJava).asJava /** * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index a7adcfb691..34e09ec9f3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -360,7 +360,7 @@ object Source { * the materialized future is completed with its value, if downstream cancels or fails without any demand the * create factory is never called and the materialized `Future` is failed. */ - def lazily[T, M](create: () => Source[T, M]): Source[T, Future[M]] = + def lazily[T, M](create: () ⇒ Source[T, M]): Source[T, Future[M]] = Source.fromGraph(new LazySource[T, M](create)) /**