MaybeSource rewritten as GraphStage #22789

This commit is contained in:
Johan Andrén 2017-07-05 12:55:28 +01:00 committed by GitHub
parent 8a095ed23d
commit 2b2923f1b6
10 changed files with 272 additions and 145 deletions

View file

@ -3,7 +3,7 @@ package akka.stream
import akka.actor.{ ActorSystem, Props }
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.testkit.StreamSpec
import akka.stream.testkit.{ StreamSpec, TestPublisher }
import akka.testkit.{ ImplicitSender, TestActor }
import scala.concurrent.Await
@ -24,7 +24,7 @@ class ActorMaterializerSpec extends StreamSpec with ImplicitSender {
"properly shut down actors associated with it" in {
val m = ActorMaterializer.create(system)
val f = Source.maybe[Int].runFold(0)(_ + _)(m)
val f = Source.fromPublisher(TestPublisher.probe[Int]()(system)).runFold(0)(_ + _)(m)
m.shutdown()

View file

@ -0,0 +1,35 @@
/**
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.stream.ActorMaterializer
import akka.stream.testkit.{ StreamSpec, TestSubscriber, Utils }
import akka.testkit.DefaultTimeout
import org.scalatest.time.{ Millis, Span }
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.control.NoStackTrace
class FailedSourceSpec extends StreamSpec with DefaultTimeout {
implicit val materializer = ActorMaterializer()
"The Failed Source" must {
"emit error immediately" in {
val ex = new RuntimeException with NoStackTrace
val p = Source.failed(ex).runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[Int]()
p.subscribe(c)
c.expectSubscriptionAndError(ex)
// reject additional subscriber
val c2 = TestSubscriber.manualProbe[Int]()
p.subscribe(c2)
c2.expectSubscriptionAndError()
}
}
}

View file

@ -0,0 +1,101 @@
/**
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.stream.{ AbruptStageTerminationException, ActorMaterializer }
import akka.stream.testkit.{ StreamSpec, TestSubscriber, Utils }
import akka.testkit.DefaultTimeout
import org.scalatest.time.{ Millis, Span }
import scala.concurrent.duration._
import scala.concurrent.Await
import scala.util.Failure
import scala.util.control.NoStackTrace
class MaybeSourceSpec extends StreamSpec with DefaultTimeout {
implicit val materializer = ActorMaterializer()
"The Maybe Source" must {
"complete materialized future with None when stream cancels" in Utils.assertAllStagesStopped {
val neverSource = Source.maybe[Int]
val pubSink = Sink.asPublisher[Int](false)
val (f, neverPub) = neverSource.toMat(pubSink)(Keep.both).run()
val c = TestSubscriber.manualProbe[Int]()
neverPub.subscribe(c)
val subs = c.expectSubscription()
subs.request(1000)
c.expectNoMsg(300.millis)
subs.cancel()
f.future.futureValue shouldEqual None
}
"allow external triggering of empty completion" in Utils.assertAllStagesStopped {
val neverSource = Source.maybe[Int].filter(_ false)
val counterSink = Sink.fold[Int, Int](0) { (acc, _) acc + 1 }
val (neverPromise, counterFuture) = neverSource.toMat(counterSink)(Keep.both).run()
// external cancellation
neverPromise.trySuccess(None) shouldEqual true
counterFuture.futureValue shouldEqual 0
}
"allow external triggering of empty completion when there was no demand" in Utils.assertAllStagesStopped {
val probe = TestSubscriber.probe[Int]()
val promise = Source.maybe[Int].to(Sink.fromSubscriber(probe)).run()
// external cancellation
probe.ensureSubscription()
promise.trySuccess(None) shouldEqual true
probe.expectComplete()
}
"allow external triggering of non-empty completion" in Utils.assertAllStagesStopped {
val neverSource = Source.maybe[Int]
val counterSink = Sink.head[Int]
val (neverPromise, counterFuture) = neverSource.toMat(counterSink)(Keep.both).run()
// external cancellation
neverPromise.trySuccess(Some(6)) shouldEqual true
counterFuture.futureValue shouldEqual 6
}
"allow external triggering of onError" in Utils.assertAllStagesStopped {
val neverSource = Source.maybe[Int]
val counterSink = Sink.fold[Int, Int](0) { (acc, _) acc + 1 }
val (neverPromise, counterFuture) = neverSource.toMat(counterSink)(Keep.both).run()
// external cancellation
neverPromise.tryFailure(new Exception("Boom") with NoStackTrace) shouldEqual true
counterFuture.failed.futureValue.getMessage should include("Boom")
}
"complete materialized future when materializer is shutdown" in Utils.assertAllStagesStopped {
val mat = ActorMaterializer()
val neverSource = Source.maybe[Int]
val pubSink = Sink.asPublisher[Int](false)
val (f, neverPub) = neverSource.toMat(pubSink)(Keep.both).run()(mat)
val c = TestSubscriber.manualProbe[Int]()
neverPub.subscribe(c)
val subs = c.expectSubscription()
mat.shutdown()
f.future.failed.futureValue shouldBe an[AbruptStageTerminationException]
}
}
}

View file

@ -64,79 +64,6 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
}
}
"Failed Source" must {
"emit error immediately" in {
val ex = new RuntimeException with NoStackTrace
val p = Source.failed(ex).runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[Int]()
p.subscribe(c)
c.expectSubscriptionAndError(ex)
// reject additional subscriber
val c2 = TestSubscriber.manualProbe[Int]()
p.subscribe(c2)
c2.expectSubscriptionAndError()
}
}
"Maybe Source" must {
"complete materialized future with None when stream cancels" in Utils.assertAllStagesStopped {
val neverSource = Source.maybe[Int]
val pubSink = Sink.asPublisher[Int](false)
val (f, neverPub) = neverSource.toMat(pubSink)(Keep.both).run()
val c = TestSubscriber.manualProbe[Int]()
neverPub.subscribe(c)
val subs = c.expectSubscription()
subs.request(1000)
c.expectNoMsg(300.millis)
subs.cancel()
Await.result(f.future, 3.seconds) shouldEqual None
}
"allow external triggering of empty completion" in Utils.assertAllStagesStopped {
val neverSource = Source.maybe[Int].filter(_ false)
val counterSink = Sink.fold[Int, Int](0) { (acc, _) acc + 1 }
val (neverPromise, counterFuture) = neverSource.toMat(counterSink)(Keep.both).run()
// external cancellation
neverPromise.trySuccess(None) shouldEqual true
Await.result(counterFuture, 3.seconds) shouldEqual 0
}
"allow external triggering of non-empty completion" in Utils.assertAllStagesStopped {
val neverSource = Source.maybe[Int]
val counterSink = Sink.head[Int]
val (neverPromise, counterFuture) = neverSource.toMat(counterSink)(Keep.both).run()
// external cancellation
neverPromise.trySuccess(Some(6)) shouldEqual true
Await.result(counterFuture, 3.seconds) shouldEqual 6
}
"allow external triggering of onError" in Utils.assertAllStagesStopped {
val neverSource = Source.maybe[Int]
val counterSink = Sink.fold[Int, Int](0) { (acc, _) acc + 1 }
val (neverPromise, counterFuture) = neverSource.toMat(counterSink)(Keep.both).run()
// external cancellation
neverPromise.failure(new Exception("Boom") with NoStackTrace)
val ready = Await.ready(counterFuture, 3.seconds)
val Failure(ex) = ready.value.get
ex.getMessage should include("Boom")
}
}
"Composite Source" must {
"merge from many inputs" in {
val probes = immutable.Seq.fill(5)(TestPublisher.manualProbe[Int]())

View file

@ -44,51 +44,6 @@ import scala.concurrent.{ ExecutionContext, Promise }
override def toString: String = name
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class MaybePublisher[T](
promise: Promise[Option[T]],
name: String)(implicit ec: ExecutionContext) extends Publisher[T] {
import ReactiveStreamsCompliance._
private[this] class MaybeSubscription(subscriber: Subscriber[_ >: T]) extends Subscription {
private[this] var done: Boolean = false
override def cancel(): Unit = {
done = true
promise.trySuccess(None)
}
override def request(elements: Long): Unit = {
if (elements < 1) rejectDueToNonPositiveDemand(subscriber)
if (!done) {
done = true
promise.future foreach {
// We consciously do not catch SpecViolation here, it will be reported to the ExecutionContext
case Some(v)
tryOnNext(subscriber, v)
tryOnComplete(subscriber)
case None
tryOnComplete(subscriber)
}
}
}
}
override def subscribe(subscriber: Subscriber[_ >: T]): Unit =
try {
requireNonNullSubscriber(subscriber)
tryOnSubscribe(subscriber, new MaybeSubscription(subscriber))
promise.future.failed.foreach {
error tryOnError(subscriber, error)
}
} catch {
case sv: SpecViolation ec.reportFailure(sv)
}
override def toString: String = name
}
/**
* INTERNAL API
* This is only a legal subscription when it is immediately followed by

View file

@ -0,0 +1,31 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.impl
import akka.annotation.InternalApi
import akka.stream.{ Attributes, Outlet, SourceShape }
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
/**
* INTERNAL API
*/
@InternalApi private[akka] final class FailedSource[T](failure: Throwable) extends GraphStage[SourceShape[T]] {
val out = Outlet[T]("FailedSource.out")
override val shape = SourceShape(out)
override protected def initialAttributes: Attributes = DefaultAttributes.failedSource
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
override def onPull(): Unit = ()
override def preStart(): Unit = {
failStage(failure)
}
setHandler(out, this)
}
override def toString = s"FailedSource(${failure.getClass.getName})"
}

View file

@ -0,0 +1,84 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.impl
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.stream.{ AbruptStageTerminationException, Attributes, Outlet, SourceShape }
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, OutHandler }
import akka.util.OptionVal
import scala.concurrent.Promise
import scala.util.Try
/**
* INTERNAL API
*/
@InternalApi private[akka] object MaybeSource extends GraphStageWithMaterializedValue[SourceShape[AnyRef], Promise[Option[AnyRef]]] {
val out = Outlet[AnyRef]("MaybeSource.out")
override val shape = SourceShape(out)
override protected def initialAttributes = DefaultAttributes.maybeSource
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Promise[Option[AnyRef]]) = {
import scala.util.{ Success ScalaSuccess, Failure ScalaFailure }
val promise = Promise[Option[AnyRef]]()
val logic = new GraphStageLogic(shape) with OutHandler {
private var arrivedEarly: OptionVal[AnyRef] = OptionVal.None
override def preStart(): Unit = {
promise.future.value match {
case Some(value)
// already completed, shortcut
handleCompletion(value)
case None
// callback on future completion
promise.future.onComplete(
getAsyncCallback(handleCompletion).invoke
)(ExecutionContexts.sameThreadExecutionContext)
}
}
override def onPull(): Unit = arrivedEarly match {
case OptionVal.Some(value)
push(out, value)
completeStage()
case OptionVal.None
}
private def handleCompletion(elem: Try[Option[AnyRef]]): Unit = {
elem match {
case ScalaSuccess(None)
completeStage()
case ScalaSuccess(Some(value))
if (isAvailable(out)) {
push(out, value)
completeStage()
} else {
arrivedEarly = OptionVal.Some(value)
}
case ScalaFailure(ex)
failStage(ex)
}
}
override def onDownstreamFinish(): Unit = {
promise.tryComplete(ScalaSuccess(None))
}
override def postStop(): Unit = {
if (!promise.isCompleted)
promise.tryFailure(new AbruptStageTerminationException(this))
}
setHandler(out, this)
}
(logic, promise)
}
override def toString = "MaybeSource"
}

View file

@ -77,19 +77,6 @@ import akka.util.OptionVal
override def withAttributes(attr: Attributes): SourceModule[Out, NotUsed] = new PublisherSource[Out](p, attr, amendShape(attr))
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class MaybeSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Option[Out]]](shape) {
override def create(context: MaterializationContext) = {
val p = Promise[Option[Out]]()
new MaybePublisher[Out](p, attributes.nameOrDefault("MaybeSource"))(context.materializer.executionContext) p
}
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Promise[Option[Out]]] = new MaybeSource[Out](attributes, shape)
override def withAttributes(attr: Attributes): SourceModule[Out, Promise[Option[Out]]] = new MaybeSource(attr, amendShape(attr))
}
/**
* INTERNAL API
* Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`,

View file

@ -355,16 +355,13 @@ object Source {
* with None.
*/
def maybe[T]: Source[T, Promise[Option[T]]] =
fromGraph(new MaybeSource[T](DefaultAttributes.maybeSource, shape("MaybeSource")))
Source.fromGraph(MaybeSource.asInstanceOf[Graph[SourceShape[T], Promise[Option[T]]]])
/**
* Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`.
*/
def failed[T](cause: Throwable): Source[T, NotUsed] =
fromGraph(new PublisherSource(
ErrorPublisher(cause, "FailedSource")[T],
DefaultAttributes.failedSource,
shape("FailedSource")))
Source.fromGraph(new FailedSource[T](cause))
/**
* Creates a `Source` that is not materialized until there is downstream demand, when the source gets materialized

View file

@ -530,7 +530,7 @@ object MiMa extends AutoPlugin {
// small changes in attributes
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.testkit.StreamTestKit#ProbeSource.withAttributes"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.testkit.StreamTestKit#ProbeSink.withAttributes"),
// #22332 protobuf serializers for remote deployment
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.WireFormats#DeployDataOrBuilder.getConfigManifest"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.WireFormats#DeployDataOrBuilder.hasScopeManifest"),
@ -1161,8 +1161,6 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.ShardRegion.shardBuffers_=")
),
"2.4.18" -> Seq(
),
"2.4.19" -> Seq(
)
// make sure that
// * this list ends with the latest released version number
@ -1199,14 +1197,14 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.DeathWatch.watchWith"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.DeathWatch.akka$actor$dungeon$DeathWatch$$watching"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.DeathWatch.akka$actor$dungeon$DeathWatch$$watching_="),
// #22868 store shards
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.sendUpdate"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.waitingForUpdate"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.getState"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.waitingForState"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.this"),
// #21213 Feature request: Let BackoffSupervisor reply to messages when its child is stopped
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffSupervisor.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOptionsImpl.copy"),
@ -1222,14 +1220,26 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.io.ChannelRegistration.cancel"),
// #23144 recoverWithRetries cleanup
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.RecoverWith.InfiniteRetries"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.RecoverWith.InfiniteRetries"),
// #23025 OversizedPayloadException DeltaPropagation
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.DeltaPropagationSelector.maxDeltaSize"),
// #23023 added a new overload with implementation to trait, so old transport implementations compiled against
// older versions will be missing the method. We accept that incompatibility for now.
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.transport.AssociationHandle.disassociate")
),
"2.5.3" -> Seq(
// #22789 Source.maybe rewritten as a graph stage
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.MaybePublisher"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.MaybePublisher$MaybeSubscription"),
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.MaybeSource"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.MaybeSource.newInstance"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.MaybeSource.withAttributes"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.MaybeSource.attributes"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.MaybeSource.create"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.MaybeSource.this"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.MaybePublisher$")
)
)