!str Drop the IODispatcher attribute of InputStreamSinkStage. (#31548)

This commit is contained in:
kerr 2022-09-05 15:07:29 +08:00 committed by GitHub
parent 601a2626e9
commit 224873696d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 5 additions and 21 deletions

View file

@ -16,9 +16,6 @@ import scala.util.control.NoStackTrace
import akka.stream._
import akka.stream.Attributes.inputBuffer
import akka.stream.impl.PhasedFusingActorMaterializer
import akka.stream.impl.StreamSupervisor
import akka.stream.impl.StreamSupervisor.Children
import akka.stream.impl.io.InputStreamSinkStage
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Source
@ -212,15 +209,6 @@ class InputStreamSinkSpec extends StreamSpec(UnboundedMailboxConfig) {
e.getCause should ===(ex)
}
"use dedicated default-blocking-io-dispatcher by default" in {
// use a separate materializer to ensure we know what child is our stream
implicit val materializer = Materializer(system)
TestSource.probe[ByteString].runWith(StreamConverters.asInputStream())
materializer.asInstanceOf[PhasedFusingActorMaterializer].supervisor.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "inputStreamSink").get
assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher)
}
"work when more bytes pulled from InputStream than available" in {
val inputStream = Source.single(byteString).runWith(StreamConverters.asInputStream())

View file

@ -146,7 +146,7 @@ import akka.stream.Attributes._
val futureFlow = name("futureFlow")
val lazySource = name("lazySource")
val outputStreamSink = name("outputStreamSink") and IODispatcher
val inputStreamSink = name("inputStreamSink") and IODispatcher
val inputStreamSink = name("inputStreamSink")
val fileSink = name("fileSink") and IODispatcher
val fromJavaStream = name("fromJavaStream")

View file

@ -73,8 +73,7 @@ object StreamConverters {
*
* This Sink is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[akka.stream.ActorAttributes]].
* You can configure the internal buffer size by using [[akka.stream.ActorAttributes]].
*
* The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and
* closing the [[InputStream]] will cancel this [[Sink]].
@ -87,8 +86,7 @@ object StreamConverters {
*
* This Sink is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[akka.stream.ActorAttributes]].
* You can configure the internal buffer size by using [[akka.stream.ActorAttributes]].
*
* The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and
* closing the [[InputStream]] will cancel this [[Sink]].
@ -106,8 +104,7 @@ object StreamConverters {
*
* This Sink is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[akka.stream.ActorAttributes]].
* You can configure the internal buffer size by using [[akka.stream.ActorAttributes]].
*
* The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and
* closing the [[InputStream]] will cancel this [[Sink]].

View file

@ -86,8 +86,7 @@ object StreamConverters {
*
* This Sink is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[akka.stream.ActorAttributes]].
* You can configure the internal buffer size by using [[akka.stream.ActorAttributes]].
*
* The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and
* closing the [[InputStream]] will cancel this [[Sink]].