diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala index d2f9f2fa80..6cb4cac268 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala @@ -27,7 +27,8 @@ abstract class AkkaPublisherVerification[T](val env: TestEnvironment, publisherS def this() = this(false) - implicit lazy val materializer = ActorFlowMaterializer(ActorFlowMaterializerSettings(system).copy(maxInputBufferSize = 512))(system) + implicit lazy val materializer = ActorFlowMaterializer( + ActorFlowMaterializerSettings(system).withInputBuffer(initialSize = 512, maxSize = 512))(system) override def createFailedPublisher(): Publisher[T] = TestPublisher.error(new Exception("Unable to serve subscribers right now!")) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 0f1b05744a..43ab2cbe62 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -315,7 +315,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "A Flow with multiple subscribers (FanOutBox)" must { "adapt speed to the currently slowest subscriber" in { - new ChainSetup(identity, settings.copy(initialInputBufferSize = 1), + new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1), toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) { val downstream2 = TestSubscriber.manualProbe[Any]() publisher.subscribe(downstream2) @@ -342,7 +342,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "support slow subscriber with fan-out 2" in { - new ChainSetup(identity, settings.copy(initialInputBufferSize = 1), + new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1), toFanoutPublisher(initialBufferSize = 2, maximumBufferSize = 2)) { val downstream2 = TestSubscriber.manualProbe[Any]() publisher.subscribe(downstream2) @@ -382,7 +382,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "support incoming subscriber while elements were requested before" in { - new ChainSetup(identity, settings.copy(initialInputBufferSize = 1), + new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1), toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) { downstreamSubscription.request(5) upstream.expectRequest(upstreamSubscription, 1) @@ -420,7 +420,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "be unblocked when blocking subscriber cancels subscription" in { - new ChainSetup(identity, settings.copy(initialInputBufferSize = 1), + new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1), toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) { val downstream2 = TestSubscriber.manualProbe[Any]() publisher.subscribe(downstream2) @@ -457,7 +457,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "call future subscribers' onError after onSubscribe if initial upstream was completed" in { - new ChainSetup(identity, settings.copy(initialInputBufferSize = 1), + new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1), toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) { val downstream2 = TestSubscriber.manualProbe[Any]() // don't link it just yet @@ -496,7 +496,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "call future subscribers' onError should be called instead of onSubscribed after initial upstream reported an error" in { - new ChainSetup[Int, String](_.map(_ ⇒ throw TestException), settings.copy(initialInputBufferSize = 1), + new ChainSetup[Int, String](_.map(_ ⇒ throw TestException), settings.withInputBuffer(initialSize = 1, maxSize = 1), toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) { downstreamSubscription.request(1) upstreamSubscription.expectRequest(1) @@ -513,7 +513,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "call future subscribers' onError when all subscriptions were cancelled" in { - new ChainSetup(identity, settings.copy(initialInputBufferSize = 1), + new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1), toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 16)) { upstreamSubscription.expectRequest(1) downstreamSubscription.cancel() @@ -529,7 +529,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "A broken Flow" must { "cancel upstream and call onError on current and future downstream subscribers if an internal error occurs" in { - new ChainSetup(faultyFlow, settings.copy(initialInputBufferSize = 1), toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 16)) { + new ChainSetup(faultyFlow, settings.withInputBuffer(initialSize = 1, maxSize = 1), toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 16)) { def checkError(sprobe: TestSubscriber.ManualProbe[Any]): Unit = { val error = sprobe.expectError() diff --git a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala index 5c113d9fb8..472c8ebb43 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala @@ -164,6 +164,20 @@ abstract class ActorFlowMaterializer extends FlowMaterializer { class MaterializationException(msg: String, cause: Throwable = null) extends RuntimeException(msg, cause) object ActorFlowMaterializerSettings { + + def apply( + initialInputBufferSize: Int, + maxInputBufferSize: Int, + dispatcher: String, + supervisionDecider: Supervision.Decider, + subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, + debugLogging: Boolean, + outputBurstLimit: Int, + optimizations: Optimizations) = + new ActorFlowMaterializerSettings( + initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, + outputBurstLimit, optimizations) + /** * Create [[ActorFlowMaterializerSettings]]. * @@ -215,21 +229,34 @@ object ActorFlowMaterializerSettings { * * This will likely be replaced in the future by auto-tuning these values at runtime. */ -final case class ActorFlowMaterializerSettings( - initialInputBufferSize: Int, - maxInputBufferSize: Int, - dispatcher: String, - supervisionDecider: Supervision.Decider, - subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, - debugLogging: Boolean, - outputBurstLimit: Int, - optimizations: Optimizations) { +final class ActorFlowMaterializerSettings( + val initialInputBufferSize: Int, + val maxInputBufferSize: Int, + val dispatcher: String, + val supervisionDecider: Supervision.Decider, + val subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, + val debugLogging: Boolean, + val outputBurstLimit: Int, + val optimizations: Optimizations) { require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") requirePowerOfTwo(maxInputBufferSize, "maxInputBufferSize") require(initialInputBufferSize <= maxInputBufferSize, s"initialInputBufferSize($initialInputBufferSize) must be <= maxInputBufferSize($maxInputBufferSize)") + private def copy( + initialInputBufferSize: Int = this.initialInputBufferSize, + maxInputBufferSize: Int = this.maxInputBufferSize, + dispatcher: String = this.dispatcher, + supervisionDecider: Supervision.Decider = this.supervisionDecider, + subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings = this.subscriptionTimeoutSettings, + debugLogging: Boolean = this.debugLogging, + outputBurstLimit: Int = this.outputBurstLimit, + optimizations: Optimizations = this.optimizations) = + new ActorFlowMaterializerSettings( + initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, + outputBurstLimit, optimizations) + def withInputBuffer(initialSize: Int, maxSize: Int): ActorFlowMaterializerSettings = copy(initialInputBufferSize = initialSize, maxInputBufferSize = maxSize) @@ -274,6 +301,9 @@ final case class ActorFlowMaterializerSettings( object StreamSubscriptionTimeoutSettings { import akka.stream.StreamSubscriptionTimeoutTerminationMode._ + def apply(mode: StreamSubscriptionTimeoutTerminationMode, timeout: FiniteDuration): StreamSubscriptionTimeoutSettings = + new StreamSubscriptionTimeoutSettings(mode, timeout) + /** Java API */ def create(config: Config): StreamSubscriptionTimeoutSettings = apply(config) @@ -289,7 +319,8 @@ object StreamSubscriptionTimeoutSettings { timeout = c.getDuration("timeout", TimeUnit.MILLISECONDS).millis) } } -final case class StreamSubscriptionTimeoutSettings(mode: StreamSubscriptionTimeoutTerminationMode, timeout: FiniteDuration) + +final class StreamSubscriptionTimeoutSettings(val mode: StreamSubscriptionTimeoutTerminationMode, val timeout: FiniteDuration) sealed abstract class StreamSubscriptionTimeoutTerminationMode