From cc4fd5ca2ca1350e90cf2885e530d9027fc3fae7 Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Tue, 26 Aug 2014 09:03:48 +0200 Subject: [PATCH] !str #15121 MaterializerSettings now uses a config object Resolves #15121 --- .../docs/http/HttpServerExampleSpec.scala | 29 ++- .../main/java/akka/http/model/japi/Http.java | 5 + .../src/main/resources/reference.conf | 2 +- .../src/main/scala/akka/http/Http.scala | 8 +- .../main/scala/akka/http/HttpManager.scala | 32 +-- .../akka/http/model/japi/Accessors.scala | 12 +- .../akka/http/model/japi/JavaTestServer.java | 3 +- .../src/test/resources/reference.conf | 2 +- .../scala/akka/http/ClientServerSpec.scala | 44 ++-- .../src/test/scala/akka/http/TestClient.scala | 25 +-- .../src/test/scala/akka/http/TestServer.scala | 2 +- .../akka/http/model/HttpEntitySpec.scala | 2 +- .../akka/http/parsing/RequestParserSpec.scala | 2 +- .../http/parsing/ResponseParserSpec.scala | 2 +- .../http/rendering/RequestRendererSpec.scala | 2 +- .../http/rendering/ResponseRendererSpec.scala | 2 +- .../http/server/HttpServerPipelineSpec.scala | 14 +- .../scala/akka/http/MarshallingSpec.scala | 24 ++- .../scala/akka/http/UnmarshallingSpec.scala | 25 ++- akka-stream/src/main/resources/reference.conf | 27 +++ .../scala/akka/stream/FlowMaterializer.scala | 192 +++++++++++++----- .../scala/akka/stream/impl/FlowImpl.scala | 4 +- .../akka/stream/impl/IterablePublisher.scala | 2 +- .../main/scala/akka/stream/io/StreamIO.scala | 43 ++-- .../akka/stream/io/TcpListenStreamActor.scala | 7 +- .../scaladsl/ImplicitFlowMaterializer.scala | 7 +- .../scala/akka/stream/scaladsl2/Flow.scala | 2 +- .../akka/stream/actor/ActorPublisherTest.java | 2 +- .../stream/actor/ActorSubscriberTest.java | 2 +- .../java/akka/stream/javadsl/DuctTest.java | 2 +- .../java/akka/stream/javadsl/FlowTest.java | 2 +- akka-stream/src/test/resources/reference.conf | 15 +- .../stream/PersistentPublisherExample.scala | 3 +- .../stream/PersistentPublisherSpec.scala | 2 +- .../src/test/scala/akka/stream/DuctSpec.scala | 2 +- ...wTeeSpec.scala => FlowBroadcastSpec.scala} | 11 +- .../scala/akka/stream/FlowBufferSpec.scala | 15 +- .../scala/akka/stream/FlowCollectSpec.scala | 6 +- .../scala/akka/stream/FlowConcatAllSpec.scala | 11 +- .../scala/akka/stream/FlowConflateSpec.scala | 11 +- .../akka/stream/FlowDispatcherSpec.scala | 4 +- .../test/scala/akka/stream/FlowDropSpec.scala | 13 +- .../akka/stream/FlowDropWithinSpec.scala | 3 +- .../scala/akka/stream/FlowExpandSpec.scala | 11 +- .../scala/akka/stream/FlowFilterSpec.scala | 26 +-- .../test/scala/akka/stream/FlowFoldSpec.scala | 13 +- .../scala/akka/stream/FlowForeachSpec.scala | 9 +- .../akka/stream/FlowFromFutureSpec.scala | 4 +- .../scala/akka/stream/FlowGroupBySpec.scala | 11 +- .../scala/akka/stream/FlowGroupedSpec.scala | 13 +- .../akka/stream/FlowGroupedWithinSpec.scala | 5 +- .../scala/akka/stream/FlowIterableSpec.scala | 7 +- .../scala/akka/stream/FlowIteratorSpec.scala | 11 +- .../scala/akka/stream/FlowMapConcatSpec.scala | 12 +- .../scala/akka/stream/FlowMapFutureSpec.scala | 3 +- .../test/scala/akka/stream/FlowMapSpec.scala | 16 +- .../akka/stream/FlowOnCompleteSpec.scala | 25 +-- .../akka/stream/FlowPrefixAndTailSpec.scala | 11 +- .../stream/FlowProduceToSubscriberSpec.scala | 6 +- .../src/test/scala/akka/stream/FlowSpec.scala | 64 ++++-- .../scala/akka/stream/FlowSplitWhenSpec.scala | 11 +- .../test/scala/akka/stream/FlowTakeSpec.scala | 19 +- .../akka/stream/FlowTakeWithinSpec.scala | 3 +- .../stream/FlowTimerTransformerSpec.scala | 2 +- .../scala/akka/stream/FlowToFutureSpec.scala | 20 +- .../stream/FlowTransformRecoverSpec.scala | 11 +- .../scala/akka/stream/FlowTransformSpec.scala | 11 +- .../stream/ImplicitFlowMaterializerSpec.scala | 16 +- .../scala/akka/stream/TickPublisherSpec.scala | 3 +- .../scala/akka/stream/TwoStreamsSetup.scala | 11 +- .../stream/actor/ActorPublisherSpec.scala | 23 +-- .../stream/actor/ActorSubscriberSpec.scala | 18 +- .../akka/stream/extra/FlowTimedSpec.scala | 19 +- .../scala/akka/stream/io/TcpFlowSpec.scala | 15 +- .../akka/stream/scaladsl2/FlowSpec.scala | 8 +- .../stream/scaladsl2/FlowTransformSpec.scala | 11 +- 76 files changed, 597 insertions(+), 476 deletions(-) create mode 100644 akka-stream/src/main/resources/reference.conf rename akka-stream/src/test/scala/akka/stream/{FlowTeeSpec.scala => FlowBroadcastSpec.scala} (89%) diff --git a/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala b/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala index b9b4d3f137..07b0e16ff8 100644 --- a/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala @@ -4,11 +4,12 @@ package docs.http -import akka.stream.testkit.AkkaSpec import akka.actor.ActorSystem -import akka.util.Timeout -import scala.concurrent.duration._ import akka.http.model._ +import akka.stream.testkit.AkkaSpec +import akka.util.Timeout + +import scala.concurrent.duration._ class HttpServerExampleSpec extends AkkaSpec("akka.actor.default-mailbox.mailbox-type = akka.dispatch.UnboundedMailbox") { @@ -16,17 +17,15 @@ class HttpServerExampleSpec "binding example" in { //#bind-example - import akka.pattern.ask - - import akka.io.IO import akka.http.Http - + import akka.io.IO + import akka.pattern.ask + import akka.stream.FlowMaterializer import akka.stream.scaladsl.Flow - import akka.stream.{ MaterializerSettings, FlowMaterializer } implicit val system = ActorSystem() import system.dispatcher - implicit val materializer = FlowMaterializer(MaterializerSettings()) + implicit val materializer = FlowMaterializer() implicit val askTimeout: Timeout = 500.millis val bindingFuture = IO(Http) ? Http.Bind(interface = "localhost", port = 8080) @@ -42,23 +41,21 @@ class HttpServerExampleSpec //#bind-example } "full-server-example" in { - import akka.pattern.ask - - import akka.io.IO import akka.http.Http - + import akka.io.IO + import akka.pattern.ask + import akka.stream.FlowMaterializer import akka.stream.scaladsl.Flow - import akka.stream.{ MaterializerSettings, FlowMaterializer } implicit val system = ActorSystem() import system.dispatcher - implicit val materializer = FlowMaterializer(MaterializerSettings()) + implicit val materializer = FlowMaterializer() implicit val askTimeout: Timeout = 500.millis val bindingFuture = IO(Http) ? Http.Bind(interface = "localhost", port = 8080) //#full-server-example - import HttpMethods._ + import akka.http.model.HttpMethods._ val requestHandler: HttpRequest ⇒ HttpResponse = { case HttpRequest(GET, Uri.Path("/"), _, _, _) ⇒ diff --git a/akka-http-core/src/main/java/akka/http/model/japi/Http.java b/akka-http-core/src/main/java/akka/http/model/japi/Http.java index 6a96163352..3f4b327f47 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/Http.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/Http.java @@ -6,6 +6,7 @@ package akka.http.model.japi; import akka.actor.ActorSystem; import akka.http.HttpExt; +import akka.stream.MaterializerSettings; public final class Http { private Http(){} @@ -18,4 +19,8 @@ public final class Http { public static Object bind(String host, int port) { return Accessors$.MODULE$.Bind(host, port); } + /** Create a Bind message to send to the Http Manager */ + public static Object bind(String host, int port, MaterializerSettings materializerSettings) { + return Accessors$.MODULE$.Bind(host, port, materializerSettings); + } } diff --git a/akka-http-core/src/main/resources/reference.conf b/akka-http-core/src/main/resources/reference.conf index adf6a0d272..8fe044046f 100644 --- a/akka-http-core/src/main/resources/reference.conf +++ b/akka-http-core/src/main/resources/reference.conf @@ -166,4 +166,4 @@ akka.http { # Fully qualified config path which holds the dispatcher configuration # to be used for the HttpManager. manager-dispatcher = "akka.actor.default-dispatcher" -} \ No newline at end of file +} diff --git a/akka-http-core/src/main/scala/akka/http/Http.scala b/akka-http-core/src/main/scala/akka/http/Http.scala index 491f101046..4d38289902 100644 --- a/akka-http-core/src/main/scala/akka/http/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/Http.scala @@ -31,13 +31,13 @@ object Http extends ExtensionKey[HttpExt] { localAddress: Option[InetSocketAddress], options: immutable.Traversable[Inet.SocketOption], settings: Option[ClientConnectionSettings], - materializerSettings: MaterializerSettings) extends SetupOutgoingChannel + materializerSettings: Option[MaterializerSettings]) extends SetupOutgoingChannel object Connect { def apply(host: String, port: Int = 80, localAddress: Option[InetSocketAddress] = None, options: immutable.Traversable[Inet.SocketOption] = Nil, settings: Option[ClientConnectionSettings] = None, - materializerSettings: MaterializerSettings = MaterializerSettings()): Connect = + materializerSettings: Option[MaterializerSettings] = None): Connect = apply(new InetSocketAddress(host, port), localAddress, options, settings, materializerSettings) } @@ -100,12 +100,12 @@ object Http extends ExtensionKey[HttpExt] { backlog: Int, options: immutable.Traversable[Inet.SocketOption], serverSettings: Option[ServerSettings], - materializerSettings: MaterializerSettings) + materializerSettings: Option[MaterializerSettings]) object Bind { def apply(interface: String, port: Int = 80, backlog: Int = 100, options: immutable.Traversable[Inet.SocketOption] = Nil, serverSettings: Option[ServerSettings] = None, - materializerSettings: MaterializerSettings = MaterializerSettings()): Bind = + materializerSettings: Option[MaterializerSettings] = None): Bind = apply(new InetSocketAddress(interface, port), backlog, options, serverSettings, materializerSettings) } diff --git a/akka-http-core/src/main/scala/akka/http/HttpManager.scala b/akka-http-core/src/main/scala/akka/http/HttpManager.scala index 501d53dec4..d8f3f6d99c 100644 --- a/akka-http-core/src/main/scala/akka/http/HttpManager.scala +++ b/akka-http-core/src/main/scala/akka/http/HttpManager.scala @@ -4,17 +4,18 @@ package akka.http -import scala.util.{ Failure, Success } -import scala.concurrent.duration._ -import akka.io.IO -import akka.util.Timeout -import akka.stream.io.StreamTcp -import akka.stream.FlowMaterializer -import akka.http.client._ import akka.actor._ +import akka.http.client._ import akka.http.server.{ HttpServerPipeline, ServerSettings } +import akka.io.IO import akka.pattern.ask +import akka.stream.FlowMaterializer +import akka.stream.io.StreamTcp import akka.stream.scaladsl.Flow +import akka.util.Timeout + +import scala.concurrent.duration._ +import scala.util.{ Failure, Success } /** * INTERNAL API @@ -27,18 +28,19 @@ private[http] class HttpManager(httpSettings: HttpExt#Settings) extends Actor wi private[this] var clientPipelines = Map.empty[ClientConnectionSettings, HttpClientPipeline] def receive = { - case connect @ Http.Connect(remoteAddress, localAddress, options, settings, materializerSettings) ⇒ + case connect @ Http.Connect(remoteAddress, localAddress, options, clientConnectionSettings, materializerSettings) ⇒ log.debug("Attempting connection to {}", remoteAddress) val commander = sender() - val effectiveSettings = ClientConnectionSettings(settings) - val tcpConnect = StreamTcp.Connect(materializerSettings, remoteAddress, localAddress, options, + val effectiveSettings = ClientConnectionSettings(clientConnectionSettings) + + val tcpConnect = StreamTcp.Connect(remoteAddress, localAddress, materializerSettings, options, effectiveSettings.connectingTimeout, effectiveSettings.idleTimeout) val askTimeout = Timeout(effectiveSettings.connectingTimeout + 5.seconds) // FIXME: how can we improve this? val tcpConnectionFuture = IO(StreamTcp)(context.system).ask(tcpConnect)(askTimeout) tcpConnectionFuture onComplete { case Success(tcpConn: StreamTcp.OutgoingTcpConnection) ⇒ val pipeline = clientPipelines.getOrElse(effectiveSettings, { - val pl = new HttpClientPipeline(effectiveSettings, FlowMaterializer(materializerSettings), log) + val pl = new HttpClientPipeline(effectiveSettings, FlowMaterializer(), log) clientPipelines = clientPipelines.updated(effectiveSettings, pl) pl }) @@ -51,17 +53,17 @@ private[http] class HttpManager(httpSettings: HttpExt#Settings) extends Actor wi case x ⇒ throw new IllegalStateException("Unexpected response to `Connect` from StreamTcp: " + x) } - case Http.Bind(endpoint, backlog, options, settings, materializerSettings) ⇒ + case Http.Bind(endpoint, backlog, options, serverSettings, materializerSettings) ⇒ log.debug("Binding to {}", endpoint) val commander = sender() - val effectiveSettings = ServerSettings(settings) - val tcpBind = StreamTcp.Bind(materializerSettings, endpoint, backlog, options) + val effectiveSettings = ServerSettings(serverSettings) + val tcpBind = StreamTcp.Bind(endpoint, materializerSettings, backlog, options) val askTimeout = Timeout(effectiveSettings.bindTimeout + 5.seconds) // FIXME: how can we improve this? val tcpServerBindingFuture = IO(StreamTcp)(context.system).ask(tcpBind)(askTimeout) tcpServerBindingFuture onComplete { case Success(StreamTcp.TcpServerBinding(localAddress, connectionStream)) ⇒ log.info("Bound to {}", endpoint) - implicit val materializer = FlowMaterializer(materializerSettings) + implicit val materializer = FlowMaterializer() val httpServerPipeline = new HttpServerPipeline(effectiveSettings, materializer, log) val httpConnectionStream = Flow(connectionStream) .map(httpServerPipeline) diff --git a/akka-http-core/src/main/scala/akka/http/model/japi/Accessors.scala b/akka-http-core/src/main/scala/akka/http/model/japi/Accessors.scala index 2a720755c8..d9cf25fd7b 100644 --- a/akka-http-core/src/main/scala/akka/http/model/japi/Accessors.scala +++ b/akka-http-core/src/main/scala/akka/http/model/japi/Accessors.scala @@ -4,9 +4,8 @@ package akka.http.model.japi -import akka.http.{ HttpExt, model } -import akka.actor.ActorSystem -import java.net.InetSocketAddress +import akka.http.model +import akka.stream.MaterializerSettings /** * INTERNAL API @@ -22,5 +21,10 @@ private[http] object Accessors { /** INTERNAL API */ private[http] def Uri(uri: model.Uri): Uri = JavaUri(uri) /** INTERNAL API */ - private[http] def Bind(host: String, port: Int): AnyRef = akka.http.Http.Bind(host, port) + private[http] def Bind(host: String, port: Int): AnyRef = + akka.http.Http.Bind(host, port, materializerSettings = None) + + /** INTERNAL API */ + private[http] def Bind(host: String, port: Int, materializerSettings: MaterializerSettings): AnyRef = + akka.http.Http.Bind(host, port, materializerSettings = Some(materializerSettings)) } diff --git a/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java b/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java index 408172bbfd..82f58a6b6d 100644 --- a/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java +++ b/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java @@ -24,8 +24,7 @@ public abstract class JavaTestServer { public static void main(String[] args) throws IOException, InterruptedException { ActorSystem system = ActorSystem.create(); - MaterializerSettings settings = MaterializerSettings.create(); - final FlowMaterializer materializer = FlowMaterializer.create(settings, system); + final FlowMaterializer materializer = FlowMaterializer.create(system); ActorRef httpManager = Http.get(system).manager(); Future binding = ask(httpManager, Http.bind("localhost", 8080), 1000); diff --git a/akka-http-core/src/test/resources/reference.conf b/akka-http-core/src/test/resources/reference.conf index 994e7c97f2..85f3c7ab62 100644 --- a/akka-http-core/src/test/resources/reference.conf +++ b/akka-http-core/src/test/resources/reference.conf @@ -1,2 +1,2 @@ # override strange reference.conf setting in akka-stream test scope -akka.actor.default-mailbox.mailbox-type = akka.dispatch.UnboundedMailbox \ No newline at end of file +akka.actor.default-mailbox.mailbox-type = akka.dispatch.UnboundedMailbox diff --git a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala index 83db54a9ef..1fb92a84db 100644 --- a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala @@ -4,29 +4,31 @@ package akka.http +import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter } import java.net.Socket -import java.io.{ InputStreamReader, BufferedReader, OutputStreamWriter, BufferedWriter } -import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe } -import com.typesafe.config.{ ConfigFactory, Config } -import scala.annotation.tailrec -import scala.concurrent.duration._ -import scala.concurrent.Await -import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } + import akka.actor.ActorSystem -import akka.testkit.TestProbe -import akka.io.IO -import akka.stream.{ FlowMaterializer, MaterializerSettings } -import akka.stream.testkit.StreamTestKit -import akka.stream.impl.SynchronousPublisherFromIterable -import akka.stream.scaladsl.Flow -import akka.http.server.ServerSettings import akka.http.client.ClientConnectionSettings import akka.http.model._ -import akka.http.util._ -import headers._ -import HttpMethods._ import HttpEntity._ +import HttpMethods._ import TestUtils._ +import akka.http.server.ServerSettings +import akka.http.util._ +import akka.io.IO +import akka.stream.{ MaterializerSettings, FlowMaterializer } +import akka.stream.impl.SynchronousPublisherFromIterable +import akka.stream.scaladsl.Flow +import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.StreamTestKit.{ PublisherProbe, SubscriberProbe } +import akka.testkit.TestProbe +import com.typesafe.config.{ Config, ConfigFactory } +import headers._ +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } + +import scala.annotation.tailrec +import scala.concurrent.Await +import scala.concurrent.duration._ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { val testConf: Config = ConfigFactory.parseString(""" @@ -35,7 +37,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { implicit val system = ActorSystem(getClass.getSimpleName, testConf) import system.dispatcher - val materializerSettings = MaterializerSettings(dispatcher = "akka.test.stream-dispatcher") + val materializerSettings = MaterializerSettings(system) val materializer = FlowMaterializer(materializerSettings) "The server-side HTTP infrastructure" should { @@ -43,7 +45,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { "properly bind and unbind a server" in { val (hostname, port) = temporaryServerHostnameAndPort() val commander = TestProbe() - commander.send(IO(Http), Http.Bind(hostname, port, materializerSettings = materializerSettings)) + commander.send(IO(Http), Http.Bind(hostname, port, materializerSettings = Some(materializerSettings))) val Http.ServerBinding(localAddress, connectionStream) = commander.expectMsgType[Http.ServerBinding] localAddress.getHostName shouldEqual hostname @@ -118,7 +120,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { val connectionStream: SubscriberProbe[Http.IncomingConnection] = { val commander = TestProbe() val settings = configOverrides.toOption.map(ServerSettings.apply) - commander.send(IO(Http), Http.Bind(hostname, port, serverSettings = settings, materializerSettings = materializerSettings)) + commander.send(IO(Http), Http.Bind(hostname, port, serverSettings = settings, materializerSettings = Some(materializerSettings))) val probe = StreamTestKit.SubscriberProbe[Http.IncomingConnection] commander.expectMsgType[Http.ServerBinding].connectionStream.subscribe(probe) probe @@ -127,7 +129,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { def openNewClientConnection[T](settings: Option[ClientConnectionSettings] = None): (PublisherProbe[(HttpRequest, T)], SubscriberProbe[(HttpResponse, T)]) = { val commander = TestProbe() - commander.send(IO(Http), Http.Connect(hostname, port, settings = settings, materializerSettings = materializerSettings)) + commander.send(IO(Http), Http.Connect(hostname, port, settings = settings, materializerSettings = Some(materializerSettings))) val connection = commander.expectMsgType[Http.OutgoingConnection] connection.remoteAddress.getPort shouldEqual port connection.remoteAddress.getHostName shouldEqual hostname diff --git a/akka-http-core/src/test/scala/akka/http/TestClient.scala b/akka-http-core/src/test/scala/akka/http/TestClient.scala index c9922b50d2..dee95473f2 100644 --- a/akka-http-core/src/test/scala/akka/http/TestClient.scala +++ b/akka-http-core/src/test/scala/akka/http/TestClient.scala @@ -4,18 +4,19 @@ package akka.http -import com.typesafe.config.{ ConfigFactory, Config } -import scala.concurrent.Future -import scala.util.{ Failure, Success } -import scala.concurrent.duration._ -import akka.util.Timeout -import akka.stream.{ MaterializerSettings, FlowMaterializer } -import akka.stream.scaladsl.Flow -import akka.io.IO import akka.actor.ActorSystem -import akka.pattern.ask +import akka.http.model.HttpMethods._ import akka.http.model._ -import HttpMethods._ +import akka.io.IO +import akka.pattern.ask +import akka.stream.FlowMaterializer +import akka.stream.scaladsl.Flow +import akka.util.Timeout +import com.typesafe.config.{ Config, ConfigFactory } + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.{ Failure, Success } object TestClient extends App { val testConf: Config = ConfigFactory.parseString(""" @@ -23,9 +24,9 @@ object TestClient extends App { akka.log-dead-letters = off """) implicit val system = ActorSystem("ServerTest", testConf) - import system.dispatcher + import akka.http.TestClient.system.dispatcher - implicit val materializer = FlowMaterializer(MaterializerSettings()) + implicit val materializer = FlowMaterializer() implicit val askTimeout: Timeout = 500.millis val host = "spray.io" diff --git a/akka-http-core/src/test/scala/akka/http/TestServer.scala b/akka-http-core/src/test/scala/akka/http/TestServer.scala index 8f26eae6ad..87e1d0a17a 100644 --- a/akka-http-core/src/test/scala/akka/http/TestServer.scala +++ b/akka-http-core/src/test/scala/akka/http/TestServer.scala @@ -30,7 +30,7 @@ object TestServer extends App { case _: HttpRequest ⇒ HttpResponse(404, entity = "Unknown resource!") } - implicit val materializer = FlowMaterializer(MaterializerSettings()) + implicit val materializer = FlowMaterializer() implicit val askTimeout: Timeout = 500.millis val bindingFuture = IO(Http) ? Http.Bind(interface = "localhost", port = 8080) diff --git a/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala b/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala index 9e94749e3f..65627051a2 100644 --- a/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala +++ b/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala @@ -31,7 +31,7 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll { implicit val system = ActorSystem(getClass.getSimpleName, testConf) import system.dispatcher - val materializer = FlowMaterializer(MaterializerSettings()) + val materializer = FlowMaterializer() override def afterAll() = system.shutdown() "HttpEntity" - { diff --git a/akka-http-core/src/test/scala/akka/http/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/parsing/RequestParserSpec.scala index 215ad311a8..2c407d1999 100644 --- a/akka-http-core/src/test/scala/akka/http/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/parsing/RequestParserSpec.scala @@ -37,7 +37,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { import system.dispatcher val BOLT = HttpMethods.register(HttpMethod.custom("BOLT", safe = false, idempotent = true, entityAccepted = true)) - val materializer = FlowMaterializer(MaterializerSettings()) + val materializer = FlowMaterializer() "The request parsing logic should" - { "properly parse a request" - { diff --git a/akka-http-core/src/test/scala/akka/http/parsing/ResponseParserSpec.scala b/akka-http-core/src/test/scala/akka/http/parsing/ResponseParserSpec.scala index 5e62951dd5..a1d7e32f3e 100644 --- a/akka-http-core/src/test/scala/akka/http/parsing/ResponseParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/parsing/ResponseParserSpec.scala @@ -34,7 +34,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { implicit val system = ActorSystem(getClass.getSimpleName, testConf) import system.dispatcher - val materializer = FlowMaterializer(MaterializerSettings()) + val materializer = FlowMaterializer() val ServerOnTheMove = StatusCodes.registerCustom(331, "Server on the move") "The response parsing logic should" - { diff --git a/akka-http-core/src/test/scala/akka/http/rendering/RequestRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/rendering/RequestRendererSpec.scala index a7d11454dc..b30298babf 100644 --- a/akka-http-core/src/test/scala/akka/http/rendering/RequestRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/rendering/RequestRendererSpec.scala @@ -28,7 +28,7 @@ class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll implicit val system = ActorSystem(getClass.getSimpleName, testConf) import system.dispatcher - val materializer = FlowMaterializer(MaterializerSettings()) + val materializer = FlowMaterializer() "The request preparation logic should" - { "properly render an unchunked" - { diff --git a/akka-http-core/src/test/scala/akka/http/rendering/ResponseRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/rendering/ResponseRendererSpec.scala index 958ff59264..8ba0b338a8 100644 --- a/akka-http-core/src/test/scala/akka/http/rendering/ResponseRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/rendering/ResponseRendererSpec.scala @@ -28,7 +28,7 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll import system.dispatcher val ServerOnTheMove = StatusCodes.registerCustom(330, "Server on the move") - val materializer = FlowMaterializer(MaterializerSettings()) + val materializer = FlowMaterializer() "The response preparation logic should properly render" - { "a response with no body" - { diff --git a/akka-http-core/src/test/scala/akka/http/server/HttpServerPipelineSpec.scala b/akka-http-core/src/test/scala/akka/http/server/HttpServerPipelineSpec.scala index 23b8c6ca0d..b8afb6c25e 100644 --- a/akka-http-core/src/test/scala/akka/http/server/HttpServerPipelineSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/server/HttpServerPipelineSpec.scala @@ -5,25 +5,21 @@ package akka.http package server -import akka.http.model.HttpEntity.{ LastChunk, Chunk, ChunkStreamPart } - -import scala.concurrent.duration._ - import akka.event.NoLogging -import akka.http.model.headers.Host +import akka.http.model.HttpEntity.{ Chunk, ChunkStreamPart, LastChunk } import akka.http.model._ +import akka.http.model.headers.Host import akka.http.util._ +import akka.stream.FlowMaterializer import akka.stream.io.StreamTcp import akka.stream.testkit.{ AkkaSpec, StreamTestKit } -import akka.stream.{ FlowMaterializer, MaterializerSettings } import akka.util.ByteString import org.scalatest._ -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterAll with Inside { - val materializerSettings = MaterializerSettings(dispatcher = "akka.test.stream-dispatcher") - val materializer = FlowMaterializer(materializerSettings) + val materializer = FlowMaterializer() "The server implementation should" should { "deliver an empty request as soon as all headers are received" in new TestSetup { diff --git a/akka-http/src/test/scala/akka/http/MarshallingSpec.scala b/akka-http/src/test/scala/akka/http/MarshallingSpec.scala index 76a161dea4..1874dfeb71 100644 --- a/akka-http/src/test/scala/akka/http/MarshallingSpec.scala +++ b/akka-http/src/test/scala/akka/http/MarshallingSpec.scala @@ -4,23 +4,27 @@ package akka.http +import akka.actor.ActorSystem +import akka.http.marshalling.{ MultipartMarshallers, ToEntityMarshallers } +import akka.http.model._ +import HttpCharsets._ +import MediaTypes._ +import akka.http.util._ +import akka.stream.{ FlowMaterializer, MaterializerSettings } +import headers._ +import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } + import scala.collection.immutable.ListMap import scala.concurrent.Await import scala.concurrent.duration._ -import org.scalatest.{ BeforeAndAfterAll, Matchers, FreeSpec } -import akka.actor.ActorSystem -import akka.http.marshalling.{ ToEntityMarshallers, MultipartMarshallers } -import akka.stream.{ FlowMaterializer, MaterializerSettings } -import akka.http.util._ -import akka.http.model._ -import headers._ -import MediaTypes._ -import HttpCharsets._ class MarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll with MultipartMarshallers { implicit val system = ActorSystem(getClass.getSimpleName) import system.dispatcher - val materializerSettings = MaterializerSettings(dispatcher = "akka.test.stream-dispatcher") + + val materializerSettings = MaterializerSettings(system) + .withDispatcher("akka.test.stream-dispatcher") + implicit val materializer = FlowMaterializer(materializerSettings) "The PredefinedToEntityMarshallers." - { diff --git a/akka-http/src/test/scala/akka/http/UnmarshallingSpec.scala b/akka-http/src/test/scala/akka/http/UnmarshallingSpec.scala index ee33cd8779..5afc636ff2 100644 --- a/akka-http/src/test/scala/akka/http/UnmarshallingSpec.scala +++ b/akka-http/src/test/scala/akka/http/UnmarshallingSpec.scala @@ -4,24 +4,29 @@ package akka.http -import scala.xml.NodeSeq -import scala.concurrent.duration._ -import scala.concurrent.{ Future, Await } -import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } -import org.scalatest.matchers.Matcher import akka.actor.ActorSystem -import akka.stream.scaladsl.Flow -import akka.stream.{ FlowMaterializer, MaterializerSettings } -import akka.http.unmarshalling.Unmarshalling -import akka.http.util._ import akka.http.model._ import MediaTypes._ +import akka.http.unmarshalling.Unmarshalling +import akka.http.util._ +import akka.stream.scaladsl.Flow +import akka.stream.{ FlowMaterializer, MaterializerSettings } import headers._ +import org.scalatest.matchers.Matcher +import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } + +import scala.concurrent.duration._ +import scala.concurrent.{ Await, Future } +import scala.xml.NodeSeq class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll { implicit val system = ActorSystem(getClass.getSimpleName) - val materializerSettings = MaterializerSettings(dispatcher = "akka.test.stream-dispatcher") + import system.dispatcher + + val materializerSettings = MaterializerSettings(system) + .withDispatcher("akka.test.stream-dispatcher") + implicit val materializer = FlowMaterializer(materializerSettings) "The PredefinedFromEntityUnmarshallers." - { diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf new file mode 100644 index 0000000000..ea10b33255 --- /dev/null +++ b/akka-stream/src/main/resources/reference.conf @@ -0,0 +1,27 @@ +##################################### +# Akka Stream Reference Config File # +##################################### + +akka { + stream { + + # Default flow materializer settings + materializer { + + # Initial size of buffers used in stream elements + initial-input-buffer-size = 4 + # Maximum size of buffers used in stream elements + max-input-buffer-size = 16 + + # Initial size of fan-out buffers used in stream elements + initial-fan-out-buffer-size = 4 + # Maximum size of fan-out buffers used in stream elements + max-fan-out-buffer-size = 16 + + # Fully qualified config path which holds the dispatcher configuration + # to be used by FlowMaterialiser when creating Actors. + # When this value is left empty, the default-dispatcher will be used. + dispatcher = "" + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index c73b68d4b0..5272ca50d8 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -3,20 +3,34 @@ */ package akka.stream -import scala.concurrent.duration.FiniteDuration -import akka.actor.ActorRefFactory -import akka.stream.impl.ActorBasedFlowMaterializer -import akka.stream.impl.Ast +import akka.actor._ +import akka.stream.impl.{ ActorBasedFlowMaterializer, Ast, FlowNameCounter, StreamSupervisor } +import com.typesafe.config.Config import org.reactivestreams.{ Publisher, Subscriber } -import scala.concurrent.duration._ -import akka.actor.Deploy -import akka.actor.ExtendedActorSystem -import akka.actor.ActorContext -import akka.stream.impl.StreamSupervisor -import akka.stream.impl.FlowNameCounter object FlowMaterializer { + /** + * Scala API: Creates a FlowMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) + * will be used to create these actors, therefore it is *forbidden* to pass this object + * to another actor if the factory is an ActorContext. + * + * The materializer's [[akka.stream.MaterializerSettings]] will be obtained from the + * configuration of the `context`'s underlying [[akka.actor.ActorSystem]]. + * + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of + * `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def apply(materializerSettings: Option[MaterializerSettings] = None, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer = { + val system = actorSystemOf(context) + + val settings = materializerSettings getOrElse MaterializerSettings(system) + apply(settings, namePrefix.getOrElse("flow"))(context) + } + /** * Scala API: Creates a FlowMaterializer which will execute every step of a transformation * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] @@ -28,22 +42,43 @@ object FlowMaterializer { * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of * `namePrefix-flowNumber-flowStepNumber-stepName`. */ - def apply(settings: MaterializerSettings, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer = { - val system = context match { - case s: ExtendedActorSystem ⇒ s - case c: ActorContext ⇒ c.system - case null ⇒ throw new IllegalArgumentException("ActorRefFactory context must be defined") - case _ ⇒ throw new IllegalArgumentException(s"ActorRefFactory context must be a ActorSystem or ActorContext, " + - "got [${_contex.getClass.getName}]") - } + def apply(materializerSettings: MaterializerSettings, namePrefix: String)(implicit context: ActorRefFactory): FlowMaterializer = { + val system = actorSystemOf(context) new ActorBasedFlowMaterializer( - settings, - context.actorOf(StreamSupervisor.props(settings).withDispatcher(settings.dispatcher)), + materializerSettings, + context.actorOf(StreamSupervisor.props(materializerSettings).withDispatcher(materializerSettings.dispatcher)), FlowNameCounter(system).counter, - namePrefix.getOrElse("flow")) + namePrefix) } + /** + * Scala API: Creates a FlowMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) + * will be used to create these actors, therefore it is *forbidden* to pass this object + * to another actor if the factory is an ActorContext. + * + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of + * `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def apply(materializerSettings: MaterializerSettings)(implicit context: ActorRefFactory): FlowMaterializer = + apply(Some(materializerSettings), None) + + /** + * Java API: Creates a FlowMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) + * will be used to create these actors, therefore it is *forbidden* to pass this object + * to another actor if the factory is an ActorContext. + * + * Defaults the actor name prefix used to name actors running the processing steps to `"flow"`. + * The actor names are built up of `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def create(context: ActorRefFactory): FlowMaterializer = + apply()(context) + /** * Java API: Creates a FlowMaterializer which will execute every step of a transformation * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] @@ -52,7 +87,33 @@ object FlowMaterializer { * to another actor if the factory is an ActorContext. */ def create(settings: MaterializerSettings, context: ActorRefFactory): FlowMaterializer = - apply(settings)(context) + apply(Option(settings), None)(context) + + /** + * Java API: Creates a FlowMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) + * will be used to create these actors, therefore it is *forbidden* to pass this object + * to another actor if the factory is an ActorContext. + * + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of + * `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def create(settings: MaterializerSettings, context: ActorRefFactory, namePrefix: String): FlowMaterializer = + apply(Option(settings), Option(namePrefix))(context) + + private def actorSystemOf(context: ActorRefFactory): ActorSystem = { + val system = context match { + case s: ExtendedActorSystem ⇒ s + case c: ActorContext ⇒ c.system + case null ⇒ throw new IllegalArgumentException("ActorRefFactory context must be defined") + case _ ⇒ + throw new IllegalArgumentException(s"ActorRefFactory context must be a ActorSystem or ActorContext, got [${context.getClass.getName}]") + } + system + } + } /** @@ -89,13 +150,46 @@ abstract class FlowMaterializer(val settings: MaterializerSettings) { } object MaterializerSettings { - private val defaultSettings = new MaterializerSettings /** - * Java API: Default settings. - * Refine the settings using [[MaterializerSettings#withBuffer]], + * Create [[MaterializerSettings]]. + * + * You can refine the configuration based settings using [[MaterializerSettings#withBuffer]], * [[MaterializerSettings#withFanOut]], [[MaterializerSettings#withSubscriptionTimeout]] */ - def create(): MaterializerSettings = defaultSettings + def apply(system: ActorSystem): MaterializerSettings = + apply(system.settings.config.getConfig("akka.stream.materializer")) + + /** + * Create [[MaterializerSettings]]. + * + * You can refine the configuration based settings using [[MaterializerSettings#withBuffer]], + * [[MaterializerSettings#withFanOut]], [[MaterializerSettings#withSubscriptionTimeout]] + */ + def apply(config: Config): MaterializerSettings = + MaterializerSettings( + config.getInt("initial-input-buffer-size"), + config.getInt("max-input-buffer-size"), + config.getInt("initial-fan-out-buffer-size"), + config.getInt("max-fan-out-buffer-size"), + config.getString("dispatcher")) + + /** + * Java API + * + * You can refine the configuration based settings using [[MaterializerSettings#withBuffer]], + * [[MaterializerSettings#withFanOut]], [[MaterializerSettings#withSubscriptionTimeout]] + */ + def create(system: ActorSystem): MaterializerSettings = + apply(system) + + /** + * Java API + * + * You can refine the configuration based settings using [[MaterializerSettings#withBuffer]], + * [[MaterializerSettings#withFanOut]], [[MaterializerSettings#withSubscriptionTimeout]] + */ + def create(config: Config): MaterializerSettings = + apply(config) } /** @@ -104,33 +198,33 @@ object MaterializerSettings { * * This will likely be replaced in the future by auto-tuning these values at runtime. */ -case class MaterializerSettings( - initialFanOutBufferSize: Int = 4, - maxFanOutBufferSize: Int = 16, - initialInputBufferSize: Int = 4, - maximumInputBufferSize: Int = 16, - dispatcher: String = Deploy.NoDispatcherGiven) { - - private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0 - require(initialFanOutBufferSize > 0, "initialFanOutBufferSize must be > 0") - require(maxFanOutBufferSize > 0, "maxFanOutBufferSize must be > 0") - require(initialFanOutBufferSize <= maxFanOutBufferSize, - s"initialFanOutBufferSize($initialFanOutBufferSize) must be <= maxFanOutBufferSize($maxFanOutBufferSize)") +final case class MaterializerSettings( + initialInputBufferSize: Int, + maxInputBufferSize: Int, + initialFanOutBufferSize: Int, + maxFanOutBufferSize: Int, + dispatcher: String) { require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") - require(isPowerOfTwo(initialInputBufferSize), "initialInputBufferSize must be a power of two") - require(maximumInputBufferSize > 0, "maximumInputBufferSize must be > 0") - require(isPowerOfTwo(maximumInputBufferSize), "initialInputBufferSize must be a power of two") - require(initialInputBufferSize <= maximumInputBufferSize, - s"initialInputBufferSize($initialInputBufferSize) must be <= maximumInputBufferSize($maximumInputBufferSize)") - def withBuffer(initialInputBufferSize: Int, maximumInputBufferSize: Int): MaterializerSettings = - copy(initialInputBufferSize = initialInputBufferSize, maximumInputBufferSize = maximumInputBufferSize) + require(maxInputBufferSize > 0, "maxInputBufferSize must be > 0") + require(isPowerOfTwo(maxInputBufferSize), "maxInputBufferSize must be a power of two") + require(initialInputBufferSize <= maxInputBufferSize, s"initialInputBufferSize($initialInputBufferSize) must be <= maxInputBufferSize($maxInputBufferSize)") - def withFanOut(initialFanOutBufferSize: Int, maxFanOutBufferSize: Int): MaterializerSettings = - copy(initialFanOutBufferSize = initialFanOutBufferSize, maxFanOutBufferSize = maxFanOutBufferSize) + require(initialFanOutBufferSize > 0, "initialFanOutBufferSize must be > 0") - def withDispatcher(dispatcher: String): MaterializerSettings = copy(dispatcher = dispatcher) + require(maxFanOutBufferSize > 0, "maxFanOutBufferSize must be > 0") + require(isPowerOfTwo(maxFanOutBufferSize), "maxFanOutBufferSize must be a power of two") + require(initialFanOutBufferSize <= maxFanOutBufferSize, s"initialFanOutBufferSize($initialFanOutBufferSize) must be <= maxFanOutBufferSize($maxFanOutBufferSize)") + def withInputBuffer(initialSize: Int, maxSize: Int): MaterializerSettings = + copy(initialInputBufferSize = initialSize, maxInputBufferSize = maxSize) + + def withFanOutBuffer(initialSize: Int, maxSize: Int): MaterializerSettings = + copy(initialFanOutBufferSize = initialSize, maxFanOutBufferSize = maxSize) + + def withDispatcher(dispatcher: String): MaterializerSettings = + copy(dispatcher = dispatcher) + + private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0 } - diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index f0d813036d..56dc9dbb80 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -49,7 +49,7 @@ private[akka] case class FlowImpl[I, O](publisherNode: Ast.PublisherNode[I], ops } override def consume()(implicit materializer: FlowMaterializer): Unit = - produceTo(new BlackholeSubscriber[Any](materializer.settings.maximumInputBufferSize)) + produceTo(new BlackholeSubscriber[Any](materializer.settings.maxInputBufferSize)) override def onComplete(callback: Try[Unit] ⇒ Unit)(implicit materializer: FlowMaterializer): Unit = transform("onComplete", () ⇒ new Transformer[O, Unit] { @@ -93,7 +93,7 @@ private[akka] case class DuctImpl[In, Out](ops: List[Ast.AstNode]) extends Duct[ materializer.ductProduceTo(subscriber, ops) override def consume()(implicit materializer: FlowMaterializer): Subscriber[In] = - produceTo(new BlackholeSubscriber[Any](materializer.settings.maximumInputBufferSize)) + produceTo(new BlackholeSubscriber[Any](materializer.settings.maxInputBufferSize)) override def onComplete(callback: Try[Unit] ⇒ Unit)(implicit materializer: FlowMaterializer): Subscriber[In] = transform("onComplete", () ⇒ new Transformer[Out, Unit] { diff --git a/akka-stream/src/main/scala/akka/stream/impl/IterablePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/IterablePublisher.scala index b5587217a2..50b90a7322 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/IterablePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/IterablePublisher.scala @@ -95,7 +95,7 @@ private[akka] class IterablePublisher(iterable: immutable.Iterable[Any], setting else { val iterator = iterable.iterator val worker = context.watch(context.actorOf(IterablePublisherWorker.props(iterator, subscriber, - settings.maximumInputBufferSize).withDispatcher(context.props.dispatcher))) + settings.maxInputBufferSize).withDispatcher(context.props.dispatcher))) val subscription = new BasicActorSubscription(worker) subscribers += subscriber workers = workers.updated(worker, subscriber) diff --git a/akka-stream/src/main/scala/akka/stream/io/StreamIO.scala b/akka-stream/src/main/scala/akka/stream/io/StreamIO.scala index 63e04cf72e..51d16487ae 100644 --- a/akka-stream/src/main/scala/akka/stream/io/StreamIO.scala +++ b/akka-stream/src/main/scala/akka/stream/io/StreamIO.scala @@ -48,17 +48,24 @@ object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider { * message. * * @param remoteAddress the address to connect to + * @param settings if Some the passed [[MaterializerSettings]] will be used during stream actor creation, + * otherwise the ActorSystem's default settings will be used * @param localAddress optionally specifies a specific address to bind to * @param options Please refer to [[akka.io.TcpSO]] for a list of all supported options. * @param connectTimeout the desired timeout for connection establishment, infinite means "no timeout" * @param idleTimeout the desired idle timeout on the connection, infinite means "no timeout" */ - case class Connect(settings: MaterializerSettings, - remoteAddress: InetSocketAddress, + case class Connect(remoteAddress: InetSocketAddress, localAddress: Option[InetSocketAddress] = None, + materializerSettings: Option[MaterializerSettings] = None, options: immutable.Traversable[SocketOption] = Nil, connectTimeout: Duration = Duration.Inf, idleTimeout: Duration = Duration.Inf) { + /** + * Java API + */ + def withMaterializerSettings(materializerSettings: MaterializerSettings): Connect = + copy(materializerSettings = Option(materializerSettings)) /** * Java API @@ -92,14 +99,16 @@ object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider { * the Bind message, then the [[StreamTcp.TcpServerBinding]] message should be inspected to find * the actual port which was bound to. * + * @param settings if Some, these materializer settings will be used for stream actors, + * else the ActorSystem's default materializer settings will be used. * @param localAddress the socket address to bind to; use port zero for automatic assignment (i.e. an ephemeral port) * @param backlog the number of unaccepted connections the O/S * kernel will hold for this port before refusing connections. * @param options Please refer to [[akka.io.TcpSO]] for a list of all supported options. * @param idleTimeout the desired idle timeout on the accepted connections, infinite means "no timeout" */ - case class Bind(settings: MaterializerSettings, - localAddress: InetSocketAddress, + case class Bind(localAddress: InetSocketAddress, + settings: Option[MaterializerSettings] = None, backlog: Int = 100, options: immutable.Traversable[SocketOption] = Nil, idleTimeout: Duration = Duration.Inf) { @@ -146,15 +155,14 @@ object StreamTcpMessage { options: java.lang.Iterable[SocketOption], connectTimeout: Duration, idleTimeout: Duration): StreamTcp.Connect = - StreamTcp.Connect(settings, remoteAddress, Option(localAddress), Util.immutableSeq(options), - connectTimeout, idleTimeout) + StreamTcp.Connect(remoteAddress, Option(localAddress), Option(settings), Util.immutableSeq(options), connectTimeout, idleTimeout) /** * Java API: Message to Connect to the given `remoteAddress` without binding to a local address and without * specifying options. */ def connect(settings: MaterializerSettings, remoteAddress: InetSocketAddress): StreamTcp.Connect = - StreamTcp.Connect(settings, remoteAddress) + StreamTcp.Connect(remoteAddress, materializerSettings = Option(settings)) /** * Java API: The Bind message is send to the StreamTcp manager actor, which is obtained via @@ -163,6 +171,8 @@ object StreamTcpMessage { * the Bind message, then the [[StreamTcp.TcpServerBinding]] message should be inspected to find * the actual port which was bound to. * + * @param settings if Some, these materializer settings will be used for stream actors, + * else the ActorSystem's default materializer settings will be used. * @param localAddress the socket address to bind to; use port zero for automatic assignment (i.e. an ephemeral port) * @param backlog the number of unaccepted connections the O/S * kernel will hold for this port before refusing connections. @@ -174,14 +184,13 @@ object StreamTcpMessage { backlog: Int, options: java.lang.Iterable[SocketOption], idleTimeout: Duration): StreamTcp.Bind = - StreamTcp.Bind(settings, localAddress, backlog, Util.immutableSeq(options), idleTimeout) + StreamTcp.Bind(localAddress, Option(settings), backlog, Util.immutableSeq(options), idleTimeout) /** * Java API: Message to open a listening socket without specifying options. */ - def bind(settings: MaterializerSettings, - localAddress: InetSocketAddress): StreamTcp.Bind = - StreamTcp.Bind(settings, localAddress) + def bind(settings: MaterializerSettings, localAddress: InetSocketAddress): StreamTcp.Bind = + StreamTcp.Bind(localAddress, Option(settings)) } /** @@ -211,22 +220,26 @@ private[akka] class StreamTcpManager extends Actor { } def receive: Receive = { - case StreamTcp.Connect(settings, remoteAddress, localAddress, options, connectTimeout, idleTimeout) ⇒ + case StreamTcp.Connect(remoteAddress, localAddress, maybeMaterializerSettings, options, connectTimeout, idleTimeout) ⇒ val connTimeout = connectTimeout match { case x: FiniteDuration ⇒ Some(x) case _ ⇒ None } + val materializerSettings = maybeMaterializerSettings getOrElse MaterializerSettings(context.system) + val processorActor = context.actorOf(TcpStreamActor.outboundProps( Tcp.Connect(remoteAddress, localAddress, options, connTimeout, pullMode = true), requester = sender(), - settings), name = encName("client", remoteAddress)) + settings = materializerSettings), name = encName("client", remoteAddress)) processorActor ! ExposedProcessor(ActorProcessor[ByteString, ByteString](processorActor)) - case StreamTcp.Bind(settings, localAddress, backlog, options, idleTimeout) ⇒ + case StreamTcp.Bind(localAddress, maybeMaterializerSettings, backlog, options, idleTimeout) ⇒ + val materializerSettings = maybeMaterializerSettings getOrElse MaterializerSettings(context.system) + val publisherActor = context.actorOf(TcpListenStreamActor.props( Tcp.Bind(context.system.deadLetters, localAddress, backlog, options, pullMode = true), requester = sender(), - settings), name = encName("server", localAddress)) + materializerSettings), name = encName("server", localAddress)) publisherActor ! ExposedPublisher(ActorPublisher[Any](publisherActor)) } } diff --git a/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala index 8e7adef27d..c851e7ac8a 100644 --- a/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala @@ -19,15 +19,16 @@ import scala.util.control.NoStackTrace private[akka] object TcpListenStreamActor { class TcpListenStreamException(msg: String) extends RuntimeException(msg) with NoStackTrace - def props(bindCmd: Tcp.Bind, requester: ActorRef, settings: MaterializerSettings): Props = - Props(new TcpListenStreamActor(bindCmd, requester, settings)).withDispatcher(settings.dispatcher) + def props(bindCmd: Tcp.Bind, requester: ActorRef, settings: MaterializerSettings): Props = { + Props(new TcpListenStreamActor(bindCmd, requester, settings)) + } } /** * INTERNAL API */ -private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, val settings: MaterializerSettings) extends Actor +private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, settings: MaterializerSettings) extends Actor with Pump with Stash { import akka.stream.io.TcpListenStreamActor._ import context.system diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ImplicitFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ImplicitFlowMaterializer.scala index 5e5645d6de..97c0efc6da 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ImplicitFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ImplicitFlowMaterializer.scala @@ -15,11 +15,12 @@ import akka.stream.MaterializerSettings * [[akka.stream.MaterializerSettings]] for the `FlowMaterializer`. */ trait ImplicitFlowMaterializer { this: Actor ⇒ + /** * Subclass may override this to define custom * [[akka.stream.MaterializerSettings]] for the `FlowMaterializer`. */ - def flowMaterializerSettings: MaterializerSettings = MaterializerSettings() + def flowMaterializerSettings: MaterializerSettings = MaterializerSettings(context.system) - final implicit val flowMaterializer: FlowMaterializer = FlowMaterializer(flowMaterializerSettings) -} \ No newline at end of file + final implicit val flowMaterializer: FlowMaterializer = FlowMaterializer(Some(flowMaterializerSettings)) +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index 832bc9d17b..e6e4a90208 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -201,7 +201,7 @@ final case class RunnableFlow[-In, +Out](input: Input[In], output: Output[Out], // FIXME def run()(implicit materializer: FlowMaterializer): Unit = - produceTo(new BlackholeSubscriber[Any](materializer.settings.maximumInputBufferSize)) + produceTo(new BlackholeSubscriber[Any](materializer.settings.maxInputBufferSize)) // FIXME replace with run and input/output factories def toPublisher[U >: Out]()(implicit materializer: FlowMaterializer): Publisher[U] = diff --git a/akka-stream/src/test/java/akka/stream/actor/ActorPublisherTest.java b/akka-stream/src/test/java/akka/stream/actor/ActorPublisherTest.java index dc29397c6e..beb6ceafbf 100644 --- a/akka-stream/src/test/java/akka/stream/actor/ActorPublisherTest.java +++ b/akka-stream/src/test/java/akka/stream/actor/ActorPublisherTest.java @@ -41,7 +41,7 @@ public class ActorPublisherTest { final ActorSystem system = actorSystemResource.getSystem(); - final MaterializerSettings settings = MaterializerSettings.create().withDispatcher("akka.test.stream-dispatcher"); + final MaterializerSettings settings = MaterializerSettings.create(system.settings().config()).withDispatcher("akka.test.stream-dispatcher"); final FlowMaterializer materializer = FlowMaterializer.create(settings, system); @Test diff --git a/akka-stream/src/test/java/akka/stream/actor/ActorSubscriberTest.java b/akka-stream/src/test/java/akka/stream/actor/ActorSubscriberTest.java index a8fc39ae1f..7f98b37abc 100644 --- a/akka-stream/src/test/java/akka/stream/actor/ActorSubscriberTest.java +++ b/akka-stream/src/test/java/akka/stream/actor/ActorSubscriberTest.java @@ -58,7 +58,7 @@ public class ActorSubscriberTest { final ActorSystem system = actorSystemResource.getSystem(); - final MaterializerSettings settings = MaterializerSettings.create().withDispatcher("akka.test.stream-dispatcher"); + final MaterializerSettings settings = MaterializerSettings.create(system.settings().config()).withDispatcher("akka.test.stream-dispatcher"); final FlowMaterializer materializer = FlowMaterializer.create(settings, system); @Test diff --git a/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java b/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java index 7536c75016..ce15cca961 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java @@ -29,7 +29,7 @@ public class DuctTest { final ActorSystem system = actorSystemResource.getSystem(); - final MaterializerSettings settings = MaterializerSettings.create().withDispatcher("akka.test.stream-dispatcher"); + final MaterializerSettings settings = MaterializerSettings.create(system.settings().config()).withDispatcher("akka.test.stream-dispatcher"); final FlowMaterializer materializer = FlowMaterializer.create(settings, system); @Test diff --git a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java index 6bde628812..74baf70681 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java @@ -30,7 +30,7 @@ public class FlowTest { final ActorSystem system = actorSystemResource.getSystem(); - final MaterializerSettings settings = MaterializerSettings.create().withDispatcher("akka.test.stream-dispatcher"); + final MaterializerSettings settings = MaterializerSettings.create(system.settings().config()).withDispatcher("akka.test.stream-dispatcher"); final FlowMaterializer materializer = FlowMaterializer.create(settings, system); @Test diff --git a/akka-stream/src/test/resources/reference.conf b/akka-stream/src/test/resources/reference.conf index 80fc5fb4ab..7421093be9 100644 --- a/akka-stream/src/test/resources/reference.conf +++ b/akka-stream/src/test/resources/reference.conf @@ -1,7 +1,6 @@ -# The StreamTestDefaultMailbox verifies that stream actors are using -# the dispatcher defined in MaterializerSettings. All tests should use -# MaterializerSettings(dispatcher = "akka.test.stream-dispatcher") -# or disable this check by defining +# The StreamTestDefaultMailbox verifies that stream actors are using the dispatcher defined in MaterializerSettings. +# +# All stream tests should use the dedicated `akka.test.stream-dispatcher` or disable this validation by defining: # akka.actor.default-mailbox.mailbox-type = "akka.dispatch.UnboundedMailbox" akka.actor.default-mailbox.mailbox-type = "akka.stream.testkit.StreamTestDefaultMailbox" @@ -15,4 +14,10 @@ akka.test.stream-dispatcher { parallelism-max = 8 } mailbox-requirement = "akka.dispatch.UnboundedMessageQueueSemantics" -} \ No newline at end of file +} + +akka.stream { + materializer { + dispatcher = "akka.test.stream-dispatcher" + } +} diff --git a/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherExample.scala b/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherExample.scala index 1ef8d78061..dba4cc873f 100644 --- a/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherExample.scala +++ b/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherExample.scala @@ -7,7 +7,6 @@ import org.reactivestreams.Publisher import akka.actor._ import akka.persistence.PersistentActor -import akka.persistence.stream.PersistentFlow import akka.stream._ import akka.stream.scaladsl._ @@ -37,7 +36,7 @@ object PersistentPublisherExample extends App { val p1 = system.actorOf(Props(classOf[ExamplePersistentActor], "p1")) val p2 = system.actorOf(Props(classOf[ExamplePersistentActor], "p2")) - implicit val materializer = FlowMaterializer(MaterializerSettings()) + implicit val materializer = FlowMaterializer() // 1 view-backed publisher and 2 subscribers: val publisher1: Publisher[Any] = PersistentFlow.fromPersistentActor("p1").toPublisher() diff --git a/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala b/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala index 25aefbc85c..16224935dc 100644 --- a/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala +++ b/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala @@ -34,7 +34,7 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", val numMessages = 10 val publisherSettings = PersistentPublisherSettings(idle = Some(100.millis)) - implicit val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) + implicit val materializer = FlowMaterializer() var persistentActor1: ActorRef = _ var persistentActor2: ActorRef = _ diff --git a/akka-stream/src/test/scala/akka/stream/DuctSpec.scala b/akka-stream/src/test/scala/akka/stream/DuctSpec.scala index 1e32cc9e39..7c48917d57 100644 --- a/akka-stream/src/test/scala/akka/stream/DuctSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/DuctSpec.scala @@ -22,7 +22,7 @@ object DuctSpec { class DuctSpec extends AkkaSpec { import DuctSpec._ - implicit val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) + implicit val materializer = FlowMaterializer() "A Duct" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowTeeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowBroadcastSpec.scala similarity index 89% rename from akka-stream/src/test/scala/akka/stream/FlowTeeSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowBroadcastSpec.scala index c0bad11047..c6b6e38c60 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTeeSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowBroadcastSpec.scala @@ -11,12 +11,11 @@ import akka.stream.testkit.StreamTestKit @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowBroadcastSpec extends AkkaSpec { - implicit val materializer = FlowMaterializer(MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 16, - initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16, - dispatcher = "akka.test.stream-dispatcher")) + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) + + implicit val materializer = FlowMaterializer(settings) "A broadcast" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowBufferSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowBufferSpec.scala index a4393562b4..b58684778b 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowBufferSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowBufferSpec.scala @@ -3,20 +3,19 @@ */ package akka.stream -import akka.stream.testkit.{ StreamTestKit, AkkaSpec } import akka.stream.scaladsl.Flow +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } + import scala.concurrent.Await import scala.concurrent.duration._ -import OverflowStrategy._ class FlowBufferSpec extends AkkaSpec { - implicit val materializer = FlowMaterializer(MaterializerSettings( - initialInputBufferSize = 1, - maximumInputBufferSize = 1, - initialFanOutBufferSize = 1, - maxFanOutBufferSize = 1, - dispatcher = "akka.test.stream-dispatcher")) + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 1, maxSize = 1) + .withFanOutBuffer(initialSize = 1, maxSize = 1) + + implicit val materializer = FlowMaterializer(settings) "Buffer" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowCollectSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowCollectSpec.scala index 80520ac851..474b83e0d1 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowCollectSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowCollectSpec.scala @@ -3,13 +3,13 @@ */ package akka.stream -import akka.stream.testkit.AkkaSpec -import akka.stream.testkit.{ StreamTestKit, ScriptedTest } +import akka.stream.testkit.{ AkkaSpec, ScriptedTest } + import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class FlowCollectSpec extends AkkaSpec with ScriptedTest { - val settings = MaterializerSettings(dispatcher = "akka.test.stream-dispatcher") + val settings = MaterializerSettings(system) "A Collect" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowConcatAllSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowConcatAllSpec.scala index e03a4e37b4..bee140b39f 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowConcatAllSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowConcatAllSpec.scala @@ -12,12 +12,11 @@ import scala.util.control.NoStackTrace class FlowConcatAllSpec extends AkkaSpec { - implicit val materializer = FlowMaterializer(MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 2, - initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2, - dispatcher = "akka.test.stream-dispatcher")) + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + .withFanOutBuffer(initialSize = 2, maxSize = 2) + + implicit val materializer = FlowMaterializer(settings) "ConcatAll" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowConflateSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowConflateSpec.scala index 659c899217..ce0c3cc699 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowConflateSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowConflateSpec.scala @@ -11,12 +11,11 @@ import scala.concurrent.duration._ class FlowConflateSpec extends AkkaSpec { - implicit val materializer = FlowMaterializer(MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 2, - initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2, - dispatcher = "akka.test.stream-dispatcher")) + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + .withFanOutBuffer(initialSize = 2, maxSize = 2) + + implicit val materializer = FlowMaterializer(settings) "Conflate" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowDispatcherSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowDispatcherSpec.scala index e2f3022b94..bfd070a2f2 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowDispatcherSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowDispatcherSpec.scala @@ -4,13 +4,13 @@ package akka.stream import akka.stream.scaladsl.Flow -import akka.stream.testkit.AkkaSpec import akka.testkit.TestProbe +import akka.stream.testkit.AkkaSpec @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowDispatcherSpec extends AkkaSpec { - implicit val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) + implicit val materializer = FlowMaterializer() "Flow with dispatcher setting" must { "use the specified dispatcher" in { diff --git a/akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala index 873b76b3d4..2f477fe034 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala @@ -3,20 +3,17 @@ */ package akka.stream -import akka.stream.testkit.AkkaSpec -import akka.stream.testkit.ScriptedTest +import akka.stream.testkit.{ AkkaSpec, ScriptedTest } + import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } import akka.stream.testkit.StreamTestKit import akka.stream.scaladsl.Flow class FlowDropSpec extends AkkaSpec with ScriptedTest { - val settings = MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 16, - initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16, - dispatcher = "akka.test.stream-dispatcher") + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) implicit val materializer = FlowMaterializer(settings) diff --git a/akka-stream/src/test/scala/akka/stream/FlowDropWithinSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowDropWithinSpec.scala index cd336c7594..611d8fb28b 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowDropWithinSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowDropWithinSpec.scala @@ -11,8 +11,7 @@ import akka.stream.testkit.StreamTestKit @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowDropWithinSpec extends AkkaSpec { - implicit val materializer = FlowMaterializer(MaterializerSettings( - dispatcher = "akka.test.stream-dispatcher")) + implicit val materializer = FlowMaterializer() "A DropWithin" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowExpandSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowExpandSpec.scala index 2df3c392c3..f1848f7071 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowExpandSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowExpandSpec.scala @@ -11,12 +11,11 @@ import scala.concurrent.duration._ class FlowExpandSpec extends AkkaSpec { - implicit val materializer = FlowMaterializer(MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 2, - initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2, - dispatcher = "akka.test.stream-dispatcher")) + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + .withFanOutBuffer(initialSize = 2, maxSize = 2) + + implicit val materializer = FlowMaterializer(settings) "Expand" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala index 11d1227176..e24d640e32 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala @@ -3,20 +3,16 @@ */ package akka.stream -import akka.stream.testkit.AkkaSpec -import akka.stream.testkit.{ StreamTestKit, ScriptedTest } -import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } import akka.stream.scaladsl.Flow -import akka.stream.impl.ActorBasedFlowMaterializer +import akka.stream.testkit.{ AkkaSpec, ScriptedTest, StreamTestKit } + +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class FlowFilterSpec extends AkkaSpec with ScriptedTest { - val settings = MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 16, - initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16, - dispatcher = "akka.test.stream-dispatcher") + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) "A Filter" must { @@ -26,12 +22,10 @@ class FlowFilterSpec extends AkkaSpec with ScriptedTest { } "not blow up with high request counts" in { - implicit val materializer = FlowMaterializer(MaterializerSettings( - initialInputBufferSize = 1, - maximumInputBufferSize = 1, - initialFanOutBufferSize = 1, - maxFanOutBufferSize = 1, - dispatcher = "akka.test.stream-dispatcher")) + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 1, maxSize = 1) + .withFanOutBuffer(initialSize = 1, maxSize = 1) + implicit val materializer = FlowMaterializer(settings) val probe = StreamTestKit.SubscriberProbe[Int]() Flow(Iterator.fill(1000)(0) ++ List(1)).filter(_ != 0). diff --git a/akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala index f239951bf3..01eb050fd4 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala @@ -3,18 +3,15 @@ */ package akka.stream -import akka.stream.testkit.AkkaSpec -import akka.stream.testkit.ScriptedTest +import akka.stream.testkit.{ AkkaSpec, ScriptedTest } + import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class FlowFoldSpec extends AkkaSpec with ScriptedTest { - val settings = MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 16, - initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16, - dispatcher = "akka.test.stream-dispatcher") + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 2, maxSize = 16) "A Fold" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowForeachSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowForeachSpec.scala index 7da645d13a..8052ef6fa0 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowForeachSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowForeachSpec.scala @@ -3,16 +3,15 @@ */ package akka.stream -import akka.stream.testkit.AkkaSpec -import akka.stream.testkit.ScriptedTest -import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } import akka.stream.scaladsl.Flow -import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } + +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } import scala.util.control.NoStackTrace class FlowForeachSpec extends AkkaSpec { - implicit val mat = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) + implicit val mat = FlowMaterializer() import system.dispatcher "A Foreach" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowFromFutureSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowFromFutureSpec.scala index 624f45fcb9..0e9b43f765 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowFromFutureSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowFromFutureSpec.scala @@ -12,7 +12,9 @@ import scala.concurrent.duration._ @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowFromFutureSpec extends AkkaSpec { - implicit val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) + val settings = MaterializerSettings(system) + + implicit val materializer = FlowMaterializer(settings) "A Flow based on a Future" must { "produce one element from already successful Future" in { diff --git a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala index 6b599f5ea2..399f402cc9 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala @@ -12,12 +12,11 @@ import scala.util.control.NoStackTrace @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowGroupBySpec extends AkkaSpec { - implicit val materializer = FlowMaterializer(MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 2, - initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2, - dispatcher = "akka.test.stream-dispatcher")) + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + .withFanOutBuffer(initialSize = 2, maxSize = 2) + + implicit val materializer = FlowMaterializer(settings) case class StreamPuppet(p: Publisher[Int]) { val probe = StreamTestKit.SubscriberProbe[Int]() diff --git a/akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala index a9c5a1865d..bd505ba283 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala @@ -3,19 +3,16 @@ */ package akka.stream -import akka.stream.testkit.AkkaSpec -import akka.stream.testkit.ScriptedTest +import akka.stream.testkit.{ AkkaSpec, ScriptedTest } + import scala.collection.immutable import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class FlowGroupedSpec extends AkkaSpec with ScriptedTest { - val settings = MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 16, - initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16, - dispatcher = "akka.test.stream-dispatcher") + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) "A Grouped" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowGroupedWithinSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupedWithinSpec.scala index ec4ac86457..0065e19733 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowGroupedWithinSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupedWithinSpec.scala @@ -14,8 +14,9 @@ import akka.stream.testkit.ScriptedTest @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { - val settings = MaterializerSettings(dispatcher = "akka.test.stream-dispatcher") - implicit val materializer = FlowMaterializer(settings) + val settings = MaterializerSettings(system) + + implicit val materializer = FlowMaterializer() "A GroupedWithin" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala index b053a1c620..8831561b32 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala @@ -12,9 +12,10 @@ import scala.concurrent.duration._ @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowIterableSpec extends AkkaSpec { - implicit val materializer = FlowMaterializer(MaterializerSettings( - maximumInputBufferSize = 512, - dispatcher = "akka.test.stream-dispatcher")) + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 512) + + implicit val materializer = FlowMaterializer(settings) "A Flow based on an iterable" must { "produce elements" in { diff --git a/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala index c42f20588c..0d3fe5d080 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala @@ -14,12 +14,11 @@ import akka.stream.scaladsl.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowIteratorSpec extends AkkaSpec { - implicit val materializer = FlowMaterializer(MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 2, - initialFanOutBufferSize = 4, - maxFanOutBufferSize = 4, - dispatcher = "akka.test.stream-dispatcher")) + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + .withFanOutBuffer(initialSize = 4, maxSize = 4) + + implicit val materializer = FlowMaterializer(settings) "A Flow based on an iterator" must { "produce elements" in { diff --git a/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala index aac8579af1..7a1bcb7ca9 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala @@ -3,17 +3,13 @@ */ package akka.stream -import akka.stream.testkit.AkkaSpec -import akka.stream.testkit.ScriptedTest +import akka.stream.testkit.{ AkkaSpec, ScriptedTest } class FlowMapConcatSpec extends AkkaSpec with ScriptedTest { - val settings = MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 16, - initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16, - dispatcher = "akka.test.stream-dispatcher") + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) "A MapConcat" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowMapFutureSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMapFutureSpec.scala index 259eb78344..bad853cf64 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowMapFutureSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMapFutureSpec.scala @@ -17,8 +17,7 @@ import scala.concurrent.Await @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowMapFutureSpec extends AkkaSpec { - implicit val materializer = FlowMaterializer(MaterializerSettings( - dispatcher = "akka.test.stream-dispatcher")) + implicit val materializer = FlowMaterializer() "A Flow with mapFuture" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala index a9ea38aa3a..8edb0c2762 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala @@ -3,20 +3,16 @@ */ package akka.stream -import akka.stream.testkit.AkkaSpec -import akka.stream.testkit.{ StreamTestKit, ScriptedTest } -import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } import akka.stream.scaladsl.Flow -import akka.stream.impl.ActorBasedFlowMaterializer +import akka.stream.testkit.{ AkkaSpec, ScriptedTest, StreamTestKit } + +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class FlowMapSpec extends AkkaSpec with ScriptedTest { - val settings = MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 16, - initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16, - dispatcher = "akka.test.stream-dispatcher") + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) implicit val materializer = FlowMaterializer(settings) diff --git a/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala index 7fd33b06df..9a50190cea 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala @@ -3,28 +3,23 @@ */ package akka.stream -import akka.stream.testkit.AkkaSpec -import akka.stream.testkit.ScriptedTest -import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } -import akka.stream.testkit.StreamTestKit -import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.util.Failure import akka.stream.scaladsl.Flow +import akka.stream.testkit.{ AkkaSpec, ScriptedTest, StreamTestKit } import akka.testkit.TestProbe -import scala.util.Try -import scala.util.Success + +import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } import scala.util.control.NoStackTrace +import scala.util.{ Failure, Success } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { - implicit val materializer = FlowMaterializer(MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 16, - initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16, - dispatcher = "akka.test.stream-dispatcher")) + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) + + implicit val materializer = FlowMaterializer(settings) "A Flow with onComplete" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowPrefixAndTailSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowPrefixAndTailSpec.scala index 15ebb932e8..4c97da6bc4 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowPrefixAndTailSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowPrefixAndTailSpec.scala @@ -14,12 +14,11 @@ import scala.util.control.NoStackTrace class FlowPrefixAndTailSpec extends AkkaSpec { - implicit val materializer = FlowMaterializer(MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 2, - initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2, - dispatcher = "akka.test.stream-dispatcher")) + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + .withFanOutBuffer(initialSize = 2, maxSize = 2) + + implicit val materializer = FlowMaterializer(settings) "PrefixAndTail" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowProduceToSubscriberSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowProduceToSubscriberSpec.scala index b0b0909c2f..57efdd25d9 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowProduceToSubscriberSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowProduceToSubscriberSpec.scala @@ -9,7 +9,11 @@ import akka.stream.testkit.StreamTestKit class FlowProduceToSubscriberSpec extends AkkaSpec { - implicit val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) + + implicit val materializer = FlowMaterializer(settings) "A Flow with toPublisher" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSpec.scala index db16784bf0..863612d5c9 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSpec.scala @@ -3,14 +3,15 @@ */ package akka.stream -import scala.collection.immutable -import scala.concurrent.duration._ +import akka.stream.scaladsl.Flow import akka.stream.testkit.{ AkkaSpec, ChainSetup, StreamTestKit } import akka.testkit._ import com.typesafe.config.ConfigFactory -import akka.stream.scaladsl.Flow import org.reactivestreams.Publisher +import scala.collection.immutable +import scala.concurrent.duration._ + object FlowSpec { class Fruit class Apple extends Fruit @@ -19,14 +20,11 @@ object FlowSpec { @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { import FlowSpec._ - import system.dispatcher - val settings = MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 16, - initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16, - dispatcher = "akka.test.stream-dispatcher") + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) + implicit val mat = FlowMaterializer(settings) val identity: Flow[Any] ⇒ Flow[Any] = in ⇒ in.map(e ⇒ e) @@ -36,7 +34,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece for ((name, op) ← List("identity" -> identity, "identity2" -> identity2); n ← List(1, 2, 4)) { s"requests initial elements from upstream ($name, $n)" in { - new ChainSetup(op, settings.copy(initialInputBufferSize = n)) { + new ChainSetup(op, settings.withInputBuffer(initialSize = n, maxSize = settings.maxInputBufferSize)) { upstream.expectRequest(upstreamSubscription, settings.initialInputBufferSize) } } @@ -89,7 +87,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "single subscriber cancels subscription while receiving data" in { - new ChainSetup(identity, settings.copy(initialInputBufferSize = 1)) { + new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = settings.maxInputBufferSize)) { downstreamSubscription.request(5) upstreamSubscription.expectRequest(1) upstreamSubscription.sendNext("test") @@ -109,7 +107,11 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "A Flow with multiple subscribers (FanOutBox)" must { "adapt speed to the currently slowest subscriber" in { - new ChainSetup(identity, settings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { + val tweakedSettings = settings + .withInputBuffer(initialSize = 1, maxSize = settings.maxInputBufferSize) + .withFanOutBuffer(initialSize = settings.initialFanOutBufferSize, maxSize = 1) + + new ChainSetup(identity, tweakedSettings) { val downstream2 = StreamTestKit.SubscriberProbe[Any]() publisher.subscribe(downstream2) val downstream2Subscription = downstream2.expectSubscription() @@ -135,7 +137,11 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "support slow subscriber with fan-out 2" in { - new ChainSetup(identity, settings.copy(initialInputBufferSize = 1, initialFanOutBufferSize = 2, maxFanOutBufferSize = 2)) { + val tweakedSettings = settings + .withInputBuffer(initialSize = 1, maxSize = settings.maxInputBufferSize) + .withFanOutBuffer(initialSize = 2, maxSize = 2) + + new ChainSetup(identity, tweakedSettings) { val downstream2 = StreamTestKit.SubscriberProbe[Any]() publisher.subscribe(downstream2) val downstream2Subscription = downstream2.expectSubscription() @@ -174,7 +180,11 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "incoming subscriber while elements were requested before" in { - new ChainSetup(identity, settings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { + val tweakedSettings = settings + .withInputBuffer(initialSize = 1, maxSize = settings.maxInputBufferSize) + .withFanOutBuffer(initialSize = settings.initialFanOutBufferSize, maxSize = 1) + + new ChainSetup(identity, tweakedSettings) { downstreamSubscription.request(5) upstream.expectRequest(upstreamSubscription, 1) upstreamSubscription.sendNext("a1") @@ -211,7 +221,11 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "blocking subscriber cancels subscription" in { - new ChainSetup(identity, settings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { + val tweakedSettings = settings + .withInputBuffer(initialSize = 1, maxSize = settings.maxInputBufferSize) + .withFanOutBuffer(initialSize = settings.initialFanOutBufferSize, maxSize = 1) + + new ChainSetup(identity, tweakedSettings) { val downstream2 = StreamTestKit.SubscriberProbe[Any]() publisher.subscribe(downstream2) val downstream2Subscription = downstream2.expectSubscription() @@ -246,7 +260,11 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "after initial upstream was completed future subscribers' onComplete should be called instead of onSubscribed" in { - new ChainSetup(identity, settings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { + val tweakedSettings = settings + .withInputBuffer(initialSize = 1, maxSize = settings.maxInputBufferSize) + .withFanOutBuffer(initialSize = settings.initialFanOutBufferSize, maxSize = 1) + + new ChainSetup(identity, tweakedSettings) { val downstream2 = StreamTestKit.SubscriberProbe[Any]() // don't link it just yet @@ -285,7 +303,11 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "after initial upstream reported an error future subscribers' onError should be called instead of onSubscribed" in { - new ChainSetup[Int, String](_.map(_ ⇒ throw TestException), settings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { + val tweakedSettings = settings + .withInputBuffer(initialSize = 1, maxSize = settings.maxInputBufferSize) + .withFanOutBuffer(initialSize = settings.initialFanOutBufferSize, maxSize = 1) + + new ChainSetup[Int, String](_.map(_ ⇒ throw TestException), tweakedSettings) { downstreamSubscription.request(1) upstreamSubscription.expectRequest(1) @@ -303,7 +325,11 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "when all subscriptions were cancelled future subscribers' onError should be called" in { - new ChainSetup(identity, settings.copy(initialInputBufferSize = 1)) { + val tweakedSettings = settings + .withInputBuffer(initialSize = 1, maxSize = settings.maxInputBufferSize) + .withFanOutBuffer(initialSize = settings.initialFanOutBufferSize, maxSize = settings.maxFanOutBufferSize) + + new ChainSetup(identity, tweakedSettings) { upstreamSubscription.expectRequest(1) downstreamSubscription.cancel() upstreamSubscription.expectCancellation() diff --git a/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala index 8ea77c70da..a89b199edb 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala @@ -12,12 +12,11 @@ import akka.stream.scaladsl.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowSplitWhenSpec extends AkkaSpec { - implicit val materializer = FlowMaterializer(MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 2, - initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2, - dispatcher = "akka.test.stream-dispatcher")) + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + .withFanOutBuffer(initialSize = 2, maxSize = 2) + + implicit val materializer = FlowMaterializer(settings) case class StreamPuppet(p: Publisher[Int]) { val probe = StreamTestKit.SubscriberProbe[Int]() diff --git a/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala index 51bc4504b5..2f6dacc8b1 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala @@ -3,22 +3,21 @@ */ package akka.stream +import akka.stream.actor.ActorSubscriberMessage.OnComplete +import akka.stream.actor.ActorSubscriberMessage.OnNext +import akka.stream.impl.RequestMore +import akka.stream.scaladsl.Flow import akka.stream.testkit.AkkaSpec import akka.stream.testkit.ScriptedTest -import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } -import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnComplete } -import akka.stream.impl.RequestMore import akka.stream.testkit.StreamTestKit -import akka.stream.scaladsl.Flow + +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class FlowTakeSpec extends AkkaSpec with ScriptedTest { - val settings = MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 16, - initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16, - dispatcher = "akka.test.stream-dispatcher") + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) implicit val materializer = FlowMaterializer(settings) diff --git a/akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala index 578e0a80a1..688c000e60 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala @@ -11,8 +11,7 @@ import akka.stream.scaladsl.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowTakeWithinSpec extends AkkaSpec { - implicit val materializer = FlowMaterializer(MaterializerSettings( - dispatcher = "akka.test.stream-dispatcher")) + implicit val materializer = FlowMaterializer() "A TakeWithin" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowTimerTransformerSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTimerTransformerSpec.scala index 781ea44482..19ff93c994 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTimerTransformerSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTimerTransformerSpec.scala @@ -12,7 +12,7 @@ import scala.util.control.NoStackTrace @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowTimerTransformerSpec extends AkkaSpec { - implicit val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) + implicit val materializer = FlowMaterializer() "A Flow with TimerTransformer operations" must { "produce scheduled ticks as expected" in { diff --git a/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala index 6c1a4c83cb..ec2097c081 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala @@ -3,23 +3,21 @@ */ package akka.stream -import akka.stream.testkit.AkkaSpec -import akka.stream.testkit.ScriptedTest -import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } -import akka.stream.testkit.StreamTestKit +import akka.stream.scaladsl.Flow +import akka.stream.testkit.{ AkkaSpec, ScriptedTest, StreamTestKit } + import scala.concurrent.Await import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } import scala.util.Failure -import akka.stream.scaladsl.Flow class FlowToFutureSpec extends AkkaSpec with ScriptedTest { - implicit val materializer = FlowMaterializer(MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 16, - initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16, - dispatcher = "akka.test.stream-dispatcher")) + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) + + implicit val materializer = FlowMaterializer(settings) "A Flow with toFuture" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala index f751afd90b..cd02e32a3e 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala @@ -31,12 +31,11 @@ object FlowTransformRecoverSpec { class FlowTransformRecoverSpec extends AkkaSpec { import FlowTransformRecoverSpec._ - implicit val materializer = FlowMaterializer(MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 2, - initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2, - dispatcher = "akka.test.stream-dispatcher")) + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + .withFanOutBuffer(initialSize = 2, maxSize = 2) + + implicit val materializer = FlowMaterializer(settings) "A Flow with transformRecover operations" must { "produce one-to-one transformation as expected" in { diff --git a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala index 3cd81ac15b..9796f4e4e6 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala @@ -15,12 +15,11 @@ import scala.util.control.NoStackTrace @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { - implicit val materializer = FlowMaterializer(MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 2, - initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2, - dispatcher = "akka.test.stream-dispatcher")) + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + .withFanOutBuffer(initialSize = 2, maxSize = 2) + + implicit val materializer = FlowMaterializer(settings) "A Flow with transform operations" must { "produce one-to-one transformation as expected" in { diff --git a/akka-stream/src/test/scala/akka/stream/ImplicitFlowMaterializerSpec.scala b/akka-stream/src/test/scala/akka/stream/ImplicitFlowMaterializerSpec.scala index d21955d19c..d281e134d0 100644 --- a/akka-stream/src/test/scala/akka/stream/ImplicitFlowMaterializerSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/ImplicitFlowMaterializerSpec.scala @@ -3,26 +3,26 @@ */ package akka.stream -import akka.actor.Actor -import akka.actor.Props +import akka.actor.{ Actor, Props } import akka.pattern.pipe -import akka.stream.scaladsl.ImplicitFlowMaterializer -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.{ Flow, ImplicitFlowMaterializer } import akka.stream.testkit.AkkaSpec import akka.testkit._ object ImplicitFlowMaterializerSpec { class SomeActor(input: List[String]) extends Actor with ImplicitFlowMaterializer { - override def flowMaterializerSettings = MaterializerSettings(dispatcher = "akka.test.stream-dispatcher") + override def flowMaterializerSettings = MaterializerSettings(context.system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) val flow = Flow(input).map(_.toUpperCase()).fold("")(_ + _) def receive = { case "run" ⇒ - // toFuture takes an implicit FlowMaterializer parameter, which is provided by ImplicitFlowMaterializer - val futureResult = flow.toFuture() + // toFuture takes an implicit FlowMaterializer parameter, which is provided by ImplicitFlowMaterializer import context.dispatcher + val futureResult = flow.toFuture() futureResult pipeTo sender() } } @@ -30,7 +30,7 @@ object ImplicitFlowMaterializerSpec { @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ImplicitFlowMaterializerSpec extends AkkaSpec with ImplicitSender { - import ImplicitFlowMaterializerSpec._ + import akka.stream.ImplicitFlowMaterializerSpec._ "An ImplicitFlowMaterializer" must { diff --git a/akka-stream/src/test/scala/akka/stream/TickPublisherSpec.scala b/akka-stream/src/test/scala/akka/stream/TickPublisherSpec.scala index b64971686a..4a2d20e64b 100644 --- a/akka-stream/src/test/scala/akka/stream/TickPublisherSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/TickPublisherSpec.scala @@ -12,8 +12,7 @@ import scala.util.control.NoStackTrace @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class TickPublisherSpec extends AkkaSpec { - implicit val materializer = FlowMaterializer(MaterializerSettings( - dispatcher = "akka.test.stream-dispatcher")) + implicit val materializer = FlowMaterializer() "A Flow based on tick publisher" must { "produce ticks" in { diff --git a/akka-stream/src/test/scala/akka/stream/TwoStreamsSetup.scala b/akka-stream/src/test/scala/akka/stream/TwoStreamsSetup.scala index 934f172657..a3a7f4f891 100644 --- a/akka-stream/src/test/scala/akka/stream/TwoStreamsSetup.scala +++ b/akka-stream/src/test/scala/akka/stream/TwoStreamsSetup.scala @@ -11,12 +11,11 @@ import scala.util.control.NoStackTrace abstract class TwoStreamsSetup extends AkkaSpec { - implicit val materializer = FlowMaterializer(MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 2, - initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2, - dispatcher = "akka.test.stream-dispatcher")) + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + .withFanOutBuffer(initialSize = 2, maxSize = 2) + + implicit val materializer = FlowMaterializer(settings) case class TE(message: String) extends RuntimeException(message) with NoStackTrace diff --git a/akka-stream/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala b/akka-stream/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala index f42bee3686..d9cf395f2a 100644 --- a/akka-stream/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala @@ -3,20 +3,15 @@ */ package akka.stream.actor +import akka.actor.{ ActorRef, PoisonPill, Props } +import akka.stream.{ MaterializerSettings, FlowMaterializer } +import akka.stream.scaladsl.Flow +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.testkit.TestEvent.Mute +import akka.testkit.{ EventFilter, ImplicitSender, TestProbe } + import scala.concurrent.duration._ import scala.util.control.NoStackTrace -import akka.actor.ActorRef -import akka.actor.PoisonPill -import akka.actor.Props -import akka.stream.FlowMaterializer -import akka.stream.MaterializerSettings -import akka.stream.scaladsl.Flow -import akka.stream.testkit.AkkaSpec -import akka.stream.testkit.StreamTestKit -import akka.testkit.EventFilter -import akka.testkit.ImplicitSender -import akka.testkit.TestEvent.Mute -import akka.testkit.TestProbe object ActorPublisherSpec { @@ -30,7 +25,6 @@ object ActorPublisherSpec { case object Complete class TestPublisher(probe: ActorRef) extends ActorPublisher[String] { - import ActorPublisher._ import ActorPublisherMessage._ def receive = { @@ -90,7 +84,6 @@ object ActorPublisherSpec { @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ActorPublisherSpec extends AkkaSpec with ImplicitSender { import ActorPublisherSpec._ - import ActorPublisher._ system.eventStream.publish(Mute(EventFilter[IllegalStateException]())) @@ -225,7 +218,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender { } "work together with Flow and ActorSubscriber" in { - implicit val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) + implicit val materializer = FlowMaterializer() val probe = TestProbe() val snd = system.actorOf(senderProps) val rcv = system.actorOf(receiverProps(probe.ref)) diff --git a/akka-stream/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala b/akka-stream/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala index 515f08192d..83982c2d3f 100644 --- a/akka-stream/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala @@ -3,18 +3,14 @@ */ package akka.stream.actor -import scala.concurrent.duration._ -import akka.actor.ActorRef -import akka.actor.Props -import akka.stream.FlowMaterializer -import akka.stream.MaterializerSettings +import akka.actor.{ Actor, ActorRef, Props } +import akka.routing.{ ActorRefRoutee, RoundRobinRoutingLogic, Router } import akka.stream.scaladsl.Flow import akka.stream.testkit.AkkaSpec -import akka.actor.Actor -import akka.routing.ActorRefRoutee -import akka.routing.Router -import akka.routing.RoundRobinRoutingLogic +import akka.stream.{ FlowMaterializer, MaterializerSettings } import akka.testkit.ImplicitSender + +import scala.concurrent.duration._ import scala.util.control.NoStackTrace object ActorSubscriberSpec { @@ -96,10 +92,10 @@ object ActorSubscriberSpec { @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ActorSubscriberSpec extends AkkaSpec with ImplicitSender { - import ActorSubscriberSpec._ import ActorSubscriberMessage._ + import ActorSubscriberSpec._ - implicit val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) + implicit val materializer = FlowMaterializer() "An ActorSubscriber" must { diff --git a/akka-stream/src/test/scala/akka/stream/extra/FlowTimedSpec.scala b/akka-stream/src/test/scala/akka/stream/extra/FlowTimedSpec.scala index fbfd613eec..5867c5763f 100644 --- a/akka-stream/src/test/scala/akka/stream/extra/FlowTimedSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/extra/FlowTimedSpec.scala @@ -3,24 +3,19 @@ */ package akka.stream.extra -import akka.stream.testkit.{ StreamTestKit, ScriptedTest, AkkaSpec } -import akka.stream.{ FlowMaterializer, MaterializerSettings } +import akka.stream.{ MaterializerSettings, FlowMaterializer } +import akka.stream.scaladsl.{ Duct, Flow } +import akka.stream.testkit.{ AkkaSpec, ScriptedTest, StreamTestKit } import akka.testkit.TestProbe -import akka.stream.scaladsl.{ Flow, Duct } -import org.reactivestreams.{ Subscriber, Publisher } +import org.reactivestreams.{ Publisher, Subscriber } class FlowTimedSpec extends AkkaSpec with ScriptedTest { import scala.concurrent.duration._ - val settings = MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 16, - initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16, - dispatcher = "akka.test.stream-dispatcher") - - lazy val metricsConfig = system.settings.config + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) implicit val materializer = FlowMaterializer(settings) diff --git a/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala b/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala index a4fe85a8d3..e9503ad402 100644 --- a/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala @@ -109,12 +109,9 @@ object TcpFlowSpec { class TcpFlowSpec extends AkkaSpec { import TcpFlowSpec._ - val settings = MaterializerSettings( - initialInputBufferSize = 4, - maximumInputBufferSize = 4, - initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2, - dispatcher = "akka.test.stream-dispatcher") + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 4, maxSize = 4) + .withFanOutBuffer(initialSize = 2, maxSize = 2) implicit val materializer = FlowMaterializer(settings) @@ -182,7 +179,7 @@ class TcpFlowSpec extends AkkaSpec { def connect(server: Server): (Processor[ByteString, ByteString], ServerConnection) = { val tcpProbe = TestProbe() - tcpProbe.send(IO(StreamTcp), StreamTcp.Connect(settings, server.address)) + tcpProbe.send(IO(StreamTcp), StreamTcp.Connect(server.address, materializerSettings = Some(settings))) val client = server.waitAccept() val outgoingConnection = tcpProbe.expectMsgType[StreamTcp.OutgoingTcpConnection] @@ -191,13 +188,13 @@ class TcpFlowSpec extends AkkaSpec { def connect(serverAddress: InetSocketAddress): StreamTcp.OutgoingTcpConnection = { val connectProbe = TestProbe() - connectProbe.send(IO(StreamTcp), StreamTcp.Connect(settings, serverAddress)) + connectProbe.send(IO(StreamTcp), StreamTcp.Connect(serverAddress, materializerSettings = Some(settings))) connectProbe.expectMsgType[StreamTcp.OutgoingTcpConnection] } def bind(serverAddress: InetSocketAddress = temporaryServerAddress): StreamTcp.TcpServerBinding = { val bindProbe = TestProbe() - bindProbe.send(IO(StreamTcp), StreamTcp.Bind(settings, serverAddress)) + bindProbe.send(IO(StreamTcp), StreamTcp.Bind(serverAddress, Some(settings))) bindProbe.expectMsgType[StreamTcp.TcpServerBinding] } diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala index e0da6678ab..ec261a4a6a 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala @@ -3,11 +3,11 @@ */ package akka.stream.scaladsl2 -import org.scalatest.{ Matchers, WordSpec } +import akka.stream.MaterializerSettings +import akka.stream.testkit.AkkaSpec + import scala.collection.immutable.Seq import scala.concurrent.Future -import akka.stream.testkit.AkkaSpec -import akka.stream.MaterializerSettings class FlowSpec extends AkkaSpec { @@ -16,7 +16,7 @@ class FlowSpec extends AkkaSpec { import scala.concurrent.ExecutionContext.Implicits.global val intFut = FutureIn(Future { 3 }) - implicit val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) + implicit val materializer = FlowMaterializer(MaterializerSettings(system)) "ProcessorFlow" should { "go through all states" in { diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala index 6c32bda6bc..fb25ddb227 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala @@ -15,12 +15,11 @@ import akka.stream.MaterializerSettings @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { - implicit val materializer = FlowMaterializer(MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 2, - initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2, - dispatcher = "akka.test.stream-dispatcher")) + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + .withFanOutBuffer(initialSize = 2, maxSize = 2) + + implicit val materializer = FlowMaterializer(settings) "A Flow with transform operations" must { "produce one-to-one transformation as expected" in {