From 224873696d5d9b52ca5d30a9a27df3ade6009fbe Mon Sep 17 00:00:00 2001 From: kerr Date: Mon, 5 Sep 2022 15:07:29 +0800 Subject: [PATCH] !str Drop the IODispatcher attribute of InputStreamSinkStage. (#31548) --- .../scala/akka/stream/io/InputStreamSinkSpec.scala | 12 ------------ .../src/main/scala/akka/stream/impl/Stages.scala | 2 +- .../scala/akka/stream/javadsl/StreamConverters.scala | 9 +++------ .../akka/stream/scaladsl/StreamConverters.scala | 3 +-- 4 files changed, 5 insertions(+), 21 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala index f6dd011f03..d8057ff853 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala @@ -16,9 +16,6 @@ import scala.util.control.NoStackTrace import akka.stream._ import akka.stream.Attributes.inputBuffer -import akka.stream.impl.PhasedFusingActorMaterializer -import akka.stream.impl.StreamSupervisor -import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.io.InputStreamSinkStage import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Source @@ -212,15 +209,6 @@ class InputStreamSinkSpec extends StreamSpec(UnboundedMailboxConfig) { e.getCause should ===(ex) } - "use dedicated default-blocking-io-dispatcher by default" in { - // use a separate materializer to ensure we know what child is our stream - implicit val materializer = Materializer(system) - TestSource.probe[ByteString].runWith(StreamConverters.asInputStream()) - materializer.asInstanceOf[PhasedFusingActorMaterializer].supervisor.tell(StreamSupervisor.GetChildren, testActor) - val ref = expectMsgType[Children].children.find(_.path.toString contains "inputStreamSink").get - assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher) - } - "work when more bytes pulled from InputStream than available" in { val inputStream = Source.single(byteString).runWith(StreamConverters.asInputStream()) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index d5a396a4da..f1f0cb75dd 100755 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -146,7 +146,7 @@ import akka.stream.Attributes._ val futureFlow = name("futureFlow") val lazySource = name("lazySource") val outputStreamSink = name("outputStreamSink") and IODispatcher - val inputStreamSink = name("inputStreamSink") and IODispatcher + val inputStreamSink = name("inputStreamSink") val fileSink = name("fileSink") and IODispatcher val fromJavaStream = name("fromJavaStream") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala index 5e2da630f4..6ef5abacfd 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala @@ -73,8 +73,7 @@ object StreamConverters { * * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or - * set it for a given Source by using [[akka.stream.ActorAttributes]]. + * You can configure the internal buffer size by using [[akka.stream.ActorAttributes]]. * * The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and * closing the [[InputStream]] will cancel this [[Sink]]. @@ -87,8 +86,7 @@ object StreamConverters { * * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or - * set it for a given Source by using [[akka.stream.ActorAttributes]]. + * You can configure the internal buffer size by using [[akka.stream.ActorAttributes]]. * * The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and * closing the [[InputStream]] will cancel this [[Sink]]. @@ -106,8 +104,7 @@ object StreamConverters { * * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or - * set it for a given Source by using [[akka.stream.ActorAttributes]]. + * You can configure the internal buffer size by using [[akka.stream.ActorAttributes]]. * * The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and * closing the [[InputStream]] will cancel this [[Sink]]. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala index 80fdf4e1eb..74f8eaa5de 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala @@ -86,8 +86,7 @@ object StreamConverters { * * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or - * set it for a given Source by using [[akka.stream.ActorAttributes]]. + * You can configure the internal buffer size by using [[akka.stream.ActorAttributes]]. * * The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and * closing the [[InputStream]] will cancel this [[Sink]].