chore: Add tests for not invoking onComplete twice for statefulMap operator.

This commit is contained in:
He-Pin 2023-12-25 11:33:47 +08:00 committed by kerr
parent 413e79f5d3
commit c78e2d7610

View file

@ -13,6 +13,15 @@
package org.apache.pekko.stream.scaladsl
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.nowarn
import scala.concurrent.Await
import scala.concurrent.Promise
import scala.concurrent.duration.DurationInt
import scala.util.Success
import scala.util.control.NoStackTrace
import org.apache.pekko
import pekko.Done
import pekko.stream.AbruptStageTerminationException
@ -21,16 +30,10 @@ import pekko.stream.ActorMaterializer
import pekko.stream.Supervision
import pekko.stream.testkit.StreamSpec
import pekko.stream.testkit.TestSubscriber
import pekko.stream.testkit.Utils.TE
import pekko.stream.testkit.scaladsl.TestSink
import pekko.stream.testkit.scaladsl.TestSource
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.nowarn
import scala.concurrent.Await
import scala.concurrent.Promise
import scala.concurrent.duration.DurationInt
import scala.util.Success
import scala.util.control.NoStackTrace
import pekko.testkit.EventFilter
class FlowStatefulMapSpec extends StreamSpec {
@ -371,5 +374,64 @@ class FlowStatefulMapSpec extends StreamSpec {
.expectComplete()
gate.ensure()
}
"will not call `onComplete` twice if `f` fail" in {
val closedCounter = new AtomicInteger(0)
val probe = Source
.repeat(1)
.statefulMap(() => "opening resource")(
(_, _) => throw TE("failing read"),
_ => {
closedCounter.incrementAndGet()
None
})
.runWith(TestSink.probe[String])
probe.request(1)
probe.expectError(TE("failing read"))
closedCounter.get() should ===(1)
}
"will not call `onComplete` twice if both `f` and `onComplete` fail" in {
val closedCounter = new AtomicInteger(0)
val probe = Source
.repeat(1)
.statefulMap(() => "opening resource")((_, _) => throw TE("failing read"),
_ => {
if (closedCounter.incrementAndGet() == 1) {
throw TE("boom")
}
None
})
.runWith(TestSink.probe[Int])
EventFilter[TE](occurrences = 1).intercept {
probe.request(1)
probe.expectError(TE("boom"))
}
closedCounter.get() should ===(1)
}
"will not call `onComplete` twice if `onComplete` fail on upstream complete" in {
val closedCounter = new AtomicInteger(0)
val (pub, sub) = TestSource[Int]()
.statefulMap(() => "opening resource")((state, value) => (state, value),
_ => {
closedCounter.incrementAndGet()
throw TE("boom")
})
.toMat(TestSink.probe[Int])(Keep.both)
.run()
EventFilter[TE](occurrences = 1).intercept {
sub.request(1)
pub.sendNext(1)
sub.expectNext(1)
sub.request(1)
pub.sendComplete()
sub.expectError(TE("boom"))
}
closedCounter.get() shouldBe 1
}
}
}