!str #17355: Don't use case classes for settings
This commit is contained in:
parent
035037dd24
commit
e0d078c676
3 changed files with 51 additions and 19 deletions
|
|
@ -27,7 +27,8 @@ abstract class AkkaPublisherVerification[T](val env: TestEnvironment, publisherS
|
||||||
|
|
||||||
def this() = this(false)
|
def this() = this(false)
|
||||||
|
|
||||||
implicit lazy val materializer = ActorFlowMaterializer(ActorFlowMaterializerSettings(system).copy(maxInputBufferSize = 512))(system)
|
implicit lazy val materializer = ActorFlowMaterializer(
|
||||||
|
ActorFlowMaterializerSettings(system).withInputBuffer(initialSize = 512, maxSize = 512))(system)
|
||||||
|
|
||||||
override def createFailedPublisher(): Publisher[T] =
|
override def createFailedPublisher(): Publisher[T] =
|
||||||
TestPublisher.error(new Exception("Unable to serve subscribers right now!"))
|
TestPublisher.error(new Exception("Unable to serve subscribers right now!"))
|
||||||
|
|
|
||||||
|
|
@ -315,7 +315,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
|
||||||
|
|
||||||
"A Flow with multiple subscribers (FanOutBox)" must {
|
"A Flow with multiple subscribers (FanOutBox)" must {
|
||||||
"adapt speed to the currently slowest subscriber" in {
|
"adapt speed to the currently slowest subscriber" in {
|
||||||
new ChainSetup(identity, settings.copy(initialInputBufferSize = 1),
|
new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1),
|
||||||
toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) {
|
toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) {
|
||||||
val downstream2 = TestSubscriber.manualProbe[Any]()
|
val downstream2 = TestSubscriber.manualProbe[Any]()
|
||||||
publisher.subscribe(downstream2)
|
publisher.subscribe(downstream2)
|
||||||
|
|
@ -342,7 +342,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
|
||||||
}
|
}
|
||||||
|
|
||||||
"support slow subscriber with fan-out 2" in {
|
"support slow subscriber with fan-out 2" in {
|
||||||
new ChainSetup(identity, settings.copy(initialInputBufferSize = 1),
|
new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1),
|
||||||
toFanoutPublisher(initialBufferSize = 2, maximumBufferSize = 2)) {
|
toFanoutPublisher(initialBufferSize = 2, maximumBufferSize = 2)) {
|
||||||
val downstream2 = TestSubscriber.manualProbe[Any]()
|
val downstream2 = TestSubscriber.manualProbe[Any]()
|
||||||
publisher.subscribe(downstream2)
|
publisher.subscribe(downstream2)
|
||||||
|
|
@ -382,7 +382,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
|
||||||
}
|
}
|
||||||
|
|
||||||
"support incoming subscriber while elements were requested before" in {
|
"support incoming subscriber while elements were requested before" in {
|
||||||
new ChainSetup(identity, settings.copy(initialInputBufferSize = 1),
|
new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1),
|
||||||
toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) {
|
toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) {
|
||||||
downstreamSubscription.request(5)
|
downstreamSubscription.request(5)
|
||||||
upstream.expectRequest(upstreamSubscription, 1)
|
upstream.expectRequest(upstreamSubscription, 1)
|
||||||
|
|
@ -420,7 +420,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
|
||||||
}
|
}
|
||||||
|
|
||||||
"be unblocked when blocking subscriber cancels subscription" in {
|
"be unblocked when blocking subscriber cancels subscription" in {
|
||||||
new ChainSetup(identity, settings.copy(initialInputBufferSize = 1),
|
new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1),
|
||||||
toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) {
|
toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) {
|
||||||
val downstream2 = TestSubscriber.manualProbe[Any]()
|
val downstream2 = TestSubscriber.manualProbe[Any]()
|
||||||
publisher.subscribe(downstream2)
|
publisher.subscribe(downstream2)
|
||||||
|
|
@ -457,7 +457,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
|
||||||
}
|
}
|
||||||
|
|
||||||
"call future subscribers' onError after onSubscribe if initial upstream was completed" in {
|
"call future subscribers' onError after onSubscribe if initial upstream was completed" in {
|
||||||
new ChainSetup(identity, settings.copy(initialInputBufferSize = 1),
|
new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1),
|
||||||
toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) {
|
toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) {
|
||||||
val downstream2 = TestSubscriber.manualProbe[Any]()
|
val downstream2 = TestSubscriber.manualProbe[Any]()
|
||||||
// don't link it just yet
|
// don't link it just yet
|
||||||
|
|
@ -496,7 +496,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
|
||||||
}
|
}
|
||||||
|
|
||||||
"call future subscribers' onError should be called instead of onSubscribed after initial upstream reported an error" in {
|
"call future subscribers' onError should be called instead of onSubscribed after initial upstream reported an error" in {
|
||||||
new ChainSetup[Int, String](_.map(_ ⇒ throw TestException), settings.copy(initialInputBufferSize = 1),
|
new ChainSetup[Int, String](_.map(_ ⇒ throw TestException), settings.withInputBuffer(initialSize = 1, maxSize = 1),
|
||||||
toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) {
|
toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) {
|
||||||
downstreamSubscription.request(1)
|
downstreamSubscription.request(1)
|
||||||
upstreamSubscription.expectRequest(1)
|
upstreamSubscription.expectRequest(1)
|
||||||
|
|
@ -513,7 +513,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
|
||||||
}
|
}
|
||||||
|
|
||||||
"call future subscribers' onError when all subscriptions were cancelled" in {
|
"call future subscribers' onError when all subscriptions were cancelled" in {
|
||||||
new ChainSetup(identity, settings.copy(initialInputBufferSize = 1),
|
new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1),
|
||||||
toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 16)) {
|
toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 16)) {
|
||||||
upstreamSubscription.expectRequest(1)
|
upstreamSubscription.expectRequest(1)
|
||||||
downstreamSubscription.cancel()
|
downstreamSubscription.cancel()
|
||||||
|
|
@ -529,7 +529,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
|
||||||
|
|
||||||
"A broken Flow" must {
|
"A broken Flow" must {
|
||||||
"cancel upstream and call onError on current and future downstream subscribers if an internal error occurs" in {
|
"cancel upstream and call onError on current and future downstream subscribers if an internal error occurs" in {
|
||||||
new ChainSetup(faultyFlow, settings.copy(initialInputBufferSize = 1), toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 16)) {
|
new ChainSetup(faultyFlow, settings.withInputBuffer(initialSize = 1, maxSize = 1), toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 16)) {
|
||||||
|
|
||||||
def checkError(sprobe: TestSubscriber.ManualProbe[Any]): Unit = {
|
def checkError(sprobe: TestSubscriber.ManualProbe[Any]): Unit = {
|
||||||
val error = sprobe.expectError()
|
val error = sprobe.expectError()
|
||||||
|
|
|
||||||
|
|
@ -164,6 +164,20 @@ abstract class ActorFlowMaterializer extends FlowMaterializer {
|
||||||
class MaterializationException(msg: String, cause: Throwable = null) extends RuntimeException(msg, cause)
|
class MaterializationException(msg: String, cause: Throwable = null) extends RuntimeException(msg, cause)
|
||||||
|
|
||||||
object ActorFlowMaterializerSettings {
|
object ActorFlowMaterializerSettings {
|
||||||
|
|
||||||
|
def apply(
|
||||||
|
initialInputBufferSize: Int,
|
||||||
|
maxInputBufferSize: Int,
|
||||||
|
dispatcher: String,
|
||||||
|
supervisionDecider: Supervision.Decider,
|
||||||
|
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
|
||||||
|
debugLogging: Boolean,
|
||||||
|
outputBurstLimit: Int,
|
||||||
|
optimizations: Optimizations) =
|
||||||
|
new ActorFlowMaterializerSettings(
|
||||||
|
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
|
||||||
|
outputBurstLimit, optimizations)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create [[ActorFlowMaterializerSettings]].
|
* Create [[ActorFlowMaterializerSettings]].
|
||||||
*
|
*
|
||||||
|
|
@ -215,21 +229,34 @@ object ActorFlowMaterializerSettings {
|
||||||
*
|
*
|
||||||
* This will likely be replaced in the future by auto-tuning these values at runtime.
|
* This will likely be replaced in the future by auto-tuning these values at runtime.
|
||||||
*/
|
*/
|
||||||
final case class ActorFlowMaterializerSettings(
|
final class ActorFlowMaterializerSettings(
|
||||||
initialInputBufferSize: Int,
|
val initialInputBufferSize: Int,
|
||||||
maxInputBufferSize: Int,
|
val maxInputBufferSize: Int,
|
||||||
dispatcher: String,
|
val dispatcher: String,
|
||||||
supervisionDecider: Supervision.Decider,
|
val supervisionDecider: Supervision.Decider,
|
||||||
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
|
val subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
|
||||||
debugLogging: Boolean,
|
val debugLogging: Boolean,
|
||||||
outputBurstLimit: Int,
|
val outputBurstLimit: Int,
|
||||||
optimizations: Optimizations) {
|
val optimizations: Optimizations) {
|
||||||
|
|
||||||
require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")
|
require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")
|
||||||
|
|
||||||
requirePowerOfTwo(maxInputBufferSize, "maxInputBufferSize")
|
requirePowerOfTwo(maxInputBufferSize, "maxInputBufferSize")
|
||||||
require(initialInputBufferSize <= maxInputBufferSize, s"initialInputBufferSize($initialInputBufferSize) must be <= maxInputBufferSize($maxInputBufferSize)")
|
require(initialInputBufferSize <= maxInputBufferSize, s"initialInputBufferSize($initialInputBufferSize) must be <= maxInputBufferSize($maxInputBufferSize)")
|
||||||
|
|
||||||
|
private def copy(
|
||||||
|
initialInputBufferSize: Int = this.initialInputBufferSize,
|
||||||
|
maxInputBufferSize: Int = this.maxInputBufferSize,
|
||||||
|
dispatcher: String = this.dispatcher,
|
||||||
|
supervisionDecider: Supervision.Decider = this.supervisionDecider,
|
||||||
|
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings = this.subscriptionTimeoutSettings,
|
||||||
|
debugLogging: Boolean = this.debugLogging,
|
||||||
|
outputBurstLimit: Int = this.outputBurstLimit,
|
||||||
|
optimizations: Optimizations = this.optimizations) =
|
||||||
|
new ActorFlowMaterializerSettings(
|
||||||
|
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
|
||||||
|
outputBurstLimit, optimizations)
|
||||||
|
|
||||||
def withInputBuffer(initialSize: Int, maxSize: Int): ActorFlowMaterializerSettings =
|
def withInputBuffer(initialSize: Int, maxSize: Int): ActorFlowMaterializerSettings =
|
||||||
copy(initialInputBufferSize = initialSize, maxInputBufferSize = maxSize)
|
copy(initialInputBufferSize = initialSize, maxInputBufferSize = maxSize)
|
||||||
|
|
||||||
|
|
@ -274,6 +301,9 @@ final case class ActorFlowMaterializerSettings(
|
||||||
object StreamSubscriptionTimeoutSettings {
|
object StreamSubscriptionTimeoutSettings {
|
||||||
import akka.stream.StreamSubscriptionTimeoutTerminationMode._
|
import akka.stream.StreamSubscriptionTimeoutTerminationMode._
|
||||||
|
|
||||||
|
def apply(mode: StreamSubscriptionTimeoutTerminationMode, timeout: FiniteDuration): StreamSubscriptionTimeoutSettings =
|
||||||
|
new StreamSubscriptionTimeoutSettings(mode, timeout)
|
||||||
|
|
||||||
/** Java API */
|
/** Java API */
|
||||||
def create(config: Config): StreamSubscriptionTimeoutSettings =
|
def create(config: Config): StreamSubscriptionTimeoutSettings =
|
||||||
apply(config)
|
apply(config)
|
||||||
|
|
@ -289,7 +319,8 @@ object StreamSubscriptionTimeoutSettings {
|
||||||
timeout = c.getDuration("timeout", TimeUnit.MILLISECONDS).millis)
|
timeout = c.getDuration("timeout", TimeUnit.MILLISECONDS).millis)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
final case class StreamSubscriptionTimeoutSettings(mode: StreamSubscriptionTimeoutTerminationMode, timeout: FiniteDuration)
|
|
||||||
|
final class StreamSubscriptionTimeoutSettings(val mode: StreamSubscriptionTimeoutTerminationMode, val timeout: FiniteDuration)
|
||||||
|
|
||||||
sealed abstract class StreamSubscriptionTimeoutTerminationMode
|
sealed abstract class StreamSubscriptionTimeoutTerminationMode
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue