Removes unnecessary postStop #30071 (#30187)

* Removes unnecessary postStop #30071
* Adds test for future source with substream #30071
This commit is contained in:
Nicolas Vollmar 2021-04-20 10:36:30 +02:00 committed by GitHub
parent 46b63e369e
commit c04f6ca5a1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 7 deletions

View file

@ -6,12 +6,12 @@ package akka.stream.scaladsl
import akka.Done
import akka.stream.testkit.Utils.TE
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import akka.testkit.DefaultTimeout
import scala.annotation.nowarn
import org.scalatest.time.Millis
import org.scalatest.time.Span
import org.scalatest.time.{ Millis, Span }
import scala.concurrent.Future
import scala.annotation.nowarn
import scala.concurrent.{ Await, Future }
//#imports
import akka.stream._
@ -22,6 +22,7 @@ import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.EventFilter
import scala.collection.immutable
import scala.concurrent.duration._
@nowarn // tests assigning to typed val
class SourceSpec extends StreamSpec with DefaultTimeout {
@ -436,4 +437,17 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
a[RuntimeException] shouldBe thrownBy(matValPoweredSource.preMaterialize())
}
}
"Source.futureSource" must {
"not cancel substream twice" in assertAllStagesStopped {
val result = Source
.futureSource(akka.pattern.after(2.seconds)(Future.successful(Source(1 to 2))))
.merge(Source(3 to 4))
.take(1)
.runWith(Sink.ignore)
Await.result(result, 4.seconds) shouldBe Done
}
}
}

View file

@ -340,9 +340,6 @@ import akka.stream.stage._
super.onDownstreamFinish(cause)
}
override def postStop(): Unit =
if (!sinkIn.isClosed) sinkIn.cancel()
def onFutureSourceCompleted(result: Try[Graph[SourceShape[T], M]]): Unit = {
result
.map { graph =>