diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 38c133abaa..7359bcab32 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -6,12 +6,12 @@ package akka.stream.scaladsl import akka.Done import akka.stream.testkit.Utils.TE +import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped import akka.testkit.DefaultTimeout -import scala.annotation.nowarn -import org.scalatest.time.Millis -import org.scalatest.time.Span +import org.scalatest.time.{ Millis, Span } -import scala.concurrent.Future +import scala.annotation.nowarn +import scala.concurrent.{ Await, Future } //#imports import akka.stream._ @@ -22,6 +22,7 @@ import akka.stream.testkit.scaladsl.TestSink import akka.testkit.EventFilter import scala.collection.immutable +import scala.concurrent.duration._ @nowarn // tests assigning to typed val class SourceSpec extends StreamSpec with DefaultTimeout { @@ -436,4 +437,17 @@ class SourceSpec extends StreamSpec with DefaultTimeout { a[RuntimeException] shouldBe thrownBy(matValPoweredSource.preMaterialize()) } } + + "Source.futureSource" must { + + "not cancel substream twice" in assertAllStagesStopped { + val result = Source + .futureSource(akka.pattern.after(2.seconds)(Future.successful(Source(1 to 2)))) + .merge(Source(3 to 4)) + .take(1) + .runWith(Sink.ignore) + + Await.result(result, 4.seconds) shouldBe Done + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index e901ba423d..2a5a49b2dc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -340,9 +340,6 @@ import akka.stream.stage._ super.onDownstreamFinish(cause) } - override def postStop(): Unit = - if (!sinkIn.isClosed) sinkIn.cancel() - def onFutureSourceCompleted(result: Try[Graph[SourceShape[T], M]]): Unit = { result .map { graph =>