diff --git a/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala index de64c4793d..0e329774de 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala @@ -3,7 +3,7 @@ package akka.stream import akka.actor.{ ActorSystem, Props } import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } import akka.stream.scaladsl.{ Sink, Source } -import akka.stream.testkit.StreamSpec +import akka.stream.testkit.{ StreamSpec, TestPublisher } import akka.testkit.{ ImplicitSender, TestActor } import scala.concurrent.Await @@ -24,7 +24,7 @@ class ActorMaterializerSpec extends StreamSpec with ImplicitSender { "properly shut down actors associated with it" in { val m = ActorMaterializer.create(system) - val f = Source.maybe[Int].runFold(0)(_ + _)(m) + val f = Source.fromPublisher(TestPublisher.probe[Int]()(system)).runFold(0)(_ + _)(m) m.shutdown() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FailedSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FailedSourceSpec.scala new file mode 100644 index 0000000000..ddf3ac082a --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FailedSourceSpec.scala @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2014-2017 Lightbend Inc. + */ +package akka.stream.scaladsl + +import akka.stream.ActorMaterializer +import akka.stream.testkit.{ StreamSpec, TestSubscriber, Utils } +import akka.testkit.DefaultTimeout +import org.scalatest.time.{ Millis, Span } + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.Failure +import scala.util.control.NoStackTrace + +class FailedSourceSpec extends StreamSpec with DefaultTimeout { + + implicit val materializer = ActorMaterializer() + + "The Failed Source" must { + "emit error immediately" in { + val ex = new RuntimeException with NoStackTrace + val p = Source.failed(ex).runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[Int]() + p.subscribe(c) + c.expectSubscriptionAndError(ex) + + // reject additional subscriber + val c2 = TestSubscriber.manualProbe[Int]() + p.subscribe(c2) + c2.expectSubscriptionAndError() + } + } + +} \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/MaybeSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/MaybeSourceSpec.scala new file mode 100644 index 0000000000..6d9cd2b927 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/MaybeSourceSpec.scala @@ -0,0 +1,101 @@ +/** + * Copyright (C) 2014-2017 Lightbend Inc. + */ +package akka.stream.scaladsl + +import akka.stream.{ AbruptStageTerminationException, ActorMaterializer } +import akka.stream.testkit.{ StreamSpec, TestSubscriber, Utils } +import akka.testkit.DefaultTimeout +import org.scalatest.time.{ Millis, Span } + +import scala.concurrent.duration._ +import scala.concurrent.Await +import scala.util.Failure +import scala.util.control.NoStackTrace + +class MaybeSourceSpec extends StreamSpec with DefaultTimeout { + + implicit val materializer = ActorMaterializer() + + "The Maybe Source" must { + + "complete materialized future with None when stream cancels" in Utils.assertAllStagesStopped { + val neverSource = Source.maybe[Int] + val pubSink = Sink.asPublisher[Int](false) + + val (f, neverPub) = neverSource.toMat(pubSink)(Keep.both).run() + + val c = TestSubscriber.manualProbe[Int]() + neverPub.subscribe(c) + val subs = c.expectSubscription() + + subs.request(1000) + c.expectNoMsg(300.millis) + + subs.cancel() + f.future.futureValue shouldEqual None + } + + "allow external triggering of empty completion" in Utils.assertAllStagesStopped { + val neverSource = Source.maybe[Int].filter(_ ⇒ false) + val counterSink = Sink.fold[Int, Int](0) { (acc, _) ⇒ acc + 1 } + + val (neverPromise, counterFuture) = neverSource.toMat(counterSink)(Keep.both).run() + + // external cancellation + neverPromise.trySuccess(None) shouldEqual true + + counterFuture.futureValue shouldEqual 0 + } + + "allow external triggering of empty completion when there was no demand" in Utils.assertAllStagesStopped { + val probe = TestSubscriber.probe[Int]() + val promise = Source.maybe[Int].to(Sink.fromSubscriber(probe)).run() + + // external cancellation + probe.ensureSubscription() + promise.trySuccess(None) shouldEqual true + probe.expectComplete() + } + + "allow external triggering of non-empty completion" in Utils.assertAllStagesStopped { + val neverSource = Source.maybe[Int] + val counterSink = Sink.head[Int] + + val (neverPromise, counterFuture) = neverSource.toMat(counterSink)(Keep.both).run() + + // external cancellation + neverPromise.trySuccess(Some(6)) shouldEqual true + + counterFuture.futureValue shouldEqual 6 + } + + "allow external triggering of onError" in Utils.assertAllStagesStopped { + val neverSource = Source.maybe[Int] + val counterSink = Sink.fold[Int, Int](0) { (acc, _) ⇒ acc + 1 } + + val (neverPromise, counterFuture) = neverSource.toMat(counterSink)(Keep.both).run() + + // external cancellation + neverPromise.tryFailure(new Exception("Boom") with NoStackTrace) shouldEqual true + + counterFuture.failed.futureValue.getMessage should include("Boom") + } + + "complete materialized future when materializer is shutdown" in Utils.assertAllStagesStopped { + val mat = ActorMaterializer() + val neverSource = Source.maybe[Int] + val pubSink = Sink.asPublisher[Int](false) + + val (f, neverPub) = neverSource.toMat(pubSink)(Keep.both).run()(mat) + + val c = TestSubscriber.manualProbe[Int]() + neverPub.subscribe(c) + val subs = c.expectSubscription() + + mat.shutdown() + f.future.failed.futureValue shouldBe an[AbruptStageTerminationException] + } + + } +} \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 82985a91b5..c354a908eb 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -64,79 +64,6 @@ class SourceSpec extends StreamSpec with DefaultTimeout { } } - "Failed Source" must { - "emit error immediately" in { - val ex = new RuntimeException with NoStackTrace - val p = Source.failed(ex).runWith(Sink.asPublisher(false)) - val c = TestSubscriber.manualProbe[Int]() - p.subscribe(c) - c.expectSubscriptionAndError(ex) - - // reject additional subscriber - val c2 = TestSubscriber.manualProbe[Int]() - p.subscribe(c2) - c2.expectSubscriptionAndError() - } - } - - "Maybe Source" must { - "complete materialized future with None when stream cancels" in Utils.assertAllStagesStopped { - val neverSource = Source.maybe[Int] - val pubSink = Sink.asPublisher[Int](false) - - val (f, neverPub) = neverSource.toMat(pubSink)(Keep.both).run() - - val c = TestSubscriber.manualProbe[Int]() - neverPub.subscribe(c) - val subs = c.expectSubscription() - - subs.request(1000) - c.expectNoMsg(300.millis) - - subs.cancel() - Await.result(f.future, 3.seconds) shouldEqual None - } - - "allow external triggering of empty completion" in Utils.assertAllStagesStopped { - val neverSource = Source.maybe[Int].filter(_ ⇒ false) - val counterSink = Sink.fold[Int, Int](0) { (acc, _) ⇒ acc + 1 } - - val (neverPromise, counterFuture) = neverSource.toMat(counterSink)(Keep.both).run() - - // external cancellation - neverPromise.trySuccess(None) shouldEqual true - - Await.result(counterFuture, 3.seconds) shouldEqual 0 - } - - "allow external triggering of non-empty completion" in Utils.assertAllStagesStopped { - val neverSource = Source.maybe[Int] - val counterSink = Sink.head[Int] - - val (neverPromise, counterFuture) = neverSource.toMat(counterSink)(Keep.both).run() - - // external cancellation - neverPromise.trySuccess(Some(6)) shouldEqual true - - Await.result(counterFuture, 3.seconds) shouldEqual 6 - } - - "allow external triggering of onError" in Utils.assertAllStagesStopped { - val neverSource = Source.maybe[Int] - val counterSink = Sink.fold[Int, Int](0) { (acc, _) ⇒ acc + 1 } - - val (neverPromise, counterFuture) = neverSource.toMat(counterSink)(Keep.both).run() - - // external cancellation - neverPromise.failure(new Exception("Boom") with NoStackTrace) - - val ready = Await.ready(counterFuture, 3.seconds) - val Failure(ex) = ready.value.get - ex.getMessage should include("Boom") - } - - } - "Composite Source" must { "merge from many inputs" in { val probes = immutable.Seq.fill(5)(TestPublisher.manualProbe[Int]()) diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index ef9410378c..c01b7b4425 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -44,51 +44,6 @@ import scala.concurrent.{ ExecutionContext, Promise } override def toString: String = name } -/** - * INTERNAL API - */ -@InternalApi private[akka] final case class MaybePublisher[T]( - promise: Promise[Option[T]], - name: String)(implicit ec: ExecutionContext) extends Publisher[T] { - import ReactiveStreamsCompliance._ - - private[this] class MaybeSubscription(subscriber: Subscriber[_ >: T]) extends Subscription { - private[this] var done: Boolean = false - override def cancel(): Unit = { - done = true - promise.trySuccess(None) - } - - override def request(elements: Long): Unit = { - if (elements < 1) rejectDueToNonPositiveDemand(subscriber) - if (!done) { - done = true - promise.future foreach { - // We consciously do not catch SpecViolation here, it will be reported to the ExecutionContext - case Some(v) ⇒ - tryOnNext(subscriber, v) - tryOnComplete(subscriber) - case None ⇒ - tryOnComplete(subscriber) - } - } - } - } - - override def subscribe(subscriber: Subscriber[_ >: T]): Unit = - try { - requireNonNullSubscriber(subscriber) - tryOnSubscribe(subscriber, new MaybeSubscription(subscriber)) - promise.future.failed.foreach { - error ⇒ tryOnError(subscriber, error) - } - } catch { - case sv: SpecViolation ⇒ ec.reportFailure(sv) - } - - override def toString: String = name -} - /** * INTERNAL API * This is only a legal subscription when it is immediately followed by diff --git a/akka-stream/src/main/scala/akka/stream/impl/FailedSource.scala b/akka-stream/src/main/scala/akka/stream/impl/FailedSource.scala new file mode 100644 index 0000000000..8872d6c22a --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/FailedSource.scala @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.stream.impl + +import akka.annotation.InternalApi +import akka.stream.{ Attributes, Outlet, SourceShape } +import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } + +/** + * INTERNAL API + */ +@InternalApi private[akka] final class FailedSource[T](failure: Throwable) extends GraphStage[SourceShape[T]] { + val out = Outlet[T]("FailedSource.out") + override val shape = SourceShape(out) + + override protected def initialAttributes: Attributes = DefaultAttributes.failedSource + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { + + override def onPull(): Unit = () + + override def preStart(): Unit = { + failStage(failure) + } + setHandler(out, this) + } + + override def toString = s"FailedSource(${failure.getClass.getName})" +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/MaybeSource.scala b/akka-stream/src/main/scala/akka/stream/impl/MaybeSource.scala new file mode 100644 index 0000000000..fe4fdb4a23 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/MaybeSource.scala @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.stream.impl + +import akka.annotation.InternalApi +import akka.dispatch.ExecutionContexts +import akka.stream.{ AbruptStageTerminationException, Attributes, Outlet, SourceShape } +import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, OutHandler } +import akka.util.OptionVal + +import scala.concurrent.Promise +import scala.util.Try + +/** + * INTERNAL API + */ +@InternalApi private[akka] object MaybeSource extends GraphStageWithMaterializedValue[SourceShape[AnyRef], Promise[Option[AnyRef]]] { + val out = Outlet[AnyRef]("MaybeSource.out") + override val shape = SourceShape(out) + + override protected def initialAttributes = DefaultAttributes.maybeSource + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Promise[Option[AnyRef]]) = { + import scala.util.{ Success ⇒ ScalaSuccess, Failure ⇒ ScalaFailure } + val promise = Promise[Option[AnyRef]]() + val logic = new GraphStageLogic(shape) with OutHandler { + + private var arrivedEarly: OptionVal[AnyRef] = OptionVal.None + + override def preStart(): Unit = { + promise.future.value match { + case Some(value) ⇒ + // already completed, shortcut + handleCompletion(value) + case None ⇒ + // callback on future completion + promise.future.onComplete( + getAsyncCallback(handleCompletion).invoke + )(ExecutionContexts.sameThreadExecutionContext) + } + } + + override def onPull(): Unit = arrivedEarly match { + case OptionVal.Some(value) ⇒ + push(out, value) + completeStage() + case OptionVal.None ⇒ + } + + private def handleCompletion(elem: Try[Option[AnyRef]]): Unit = { + elem match { + case ScalaSuccess(None) ⇒ + completeStage() + case ScalaSuccess(Some(value)) ⇒ + if (isAvailable(out)) { + push(out, value) + completeStage() + } else { + arrivedEarly = OptionVal.Some(value) + } + case ScalaFailure(ex) ⇒ + failStage(ex) + } + } + + override def onDownstreamFinish(): Unit = { + promise.tryComplete(ScalaSuccess(None)) + } + + override def postStop(): Unit = { + if (!promise.isCompleted) + promise.tryFailure(new AbruptStageTerminationException(this)) + } + + setHandler(out, this) + + } + (logic, promise) + } + + override def toString = "MaybeSource" +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index bcef0cf7f9..1e8e5aaf07 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -77,19 +77,6 @@ import akka.util.OptionVal override def withAttributes(attr: Attributes): SourceModule[Out, NotUsed] = new PublisherSource[Out](p, attr, amendShape(attr)) } -/** - * INTERNAL API - */ -@InternalApi private[akka] final class MaybeSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Option[Out]]](shape) { - - override def create(context: MaterializationContext) = { - val p = Promise[Option[Out]]() - new MaybePublisher[Out](p, attributes.nameOrDefault("MaybeSource"))(context.materializer.executionContext) → p - } - override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Promise[Option[Out]]] = new MaybeSource[Out](attributes, shape) - override def withAttributes(attr: Attributes): SourceModule[Out, Promise[Option[Out]]] = new MaybeSource(attr, amendShape(attr)) -} - /** * INTERNAL API * Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 5b59a3ae2c..e884e47962 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -355,16 +355,13 @@ object Source { * with None. */ def maybe[T]: Source[T, Promise[Option[T]]] = - fromGraph(new MaybeSource[T](DefaultAttributes.maybeSource, shape("MaybeSource"))) + Source.fromGraph(MaybeSource.asInstanceOf[Graph[SourceShape[T], Promise[Option[T]]]]) /** * Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`. */ def failed[T](cause: Throwable): Source[T, NotUsed] = - fromGraph(new PublisherSource( - ErrorPublisher(cause, "FailedSource")[T], - DefaultAttributes.failedSource, - shape("FailedSource"))) + Source.fromGraph(new FailedSource[T](cause)) /** * Creates a `Source` that is not materialized until there is downstream demand, when the source gets materialized diff --git a/project/MiMa.scala b/project/MiMa.scala index 6c15c4a27c..7110fdcab8 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -530,7 +530,7 @@ object MiMa extends AutoPlugin { // small changes in attributes ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.testkit.StreamTestKit#ProbeSource.withAttributes"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.testkit.StreamTestKit#ProbeSink.withAttributes"), - + // #22332 protobuf serializers for remote deployment ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.WireFormats#DeployDataOrBuilder.getConfigManifest"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.WireFormats#DeployDataOrBuilder.hasScopeManifest"), @@ -1161,8 +1161,6 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.ShardRegion.shardBuffers_=") ), "2.4.18" -> Seq( - ), - "2.4.19" -> Seq( ) // make sure that // * this list ends with the latest released version number @@ -1199,14 +1197,14 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.DeathWatch.watchWith"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.DeathWatch.akka$actor$dungeon$DeathWatch$$watching"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.DeathWatch.akka$actor$dungeon$DeathWatch$$watching_="), - + // #22868 store shards ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.sendUpdate"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.waitingForUpdate"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.getState"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.waitingForState"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.this"), - + // #21213 Feature request: Let BackoffSupervisor reply to messages when its child is stopped ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffSupervisor.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOptionsImpl.copy"), @@ -1222,14 +1220,26 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.io.ChannelRegistration.cancel"), // #23144 recoverWithRetries cleanup - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.RecoverWith.InfiniteRetries"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.RecoverWith.InfiniteRetries"), // #23025 OversizedPayloadException DeltaPropagation ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.DeltaPropagationSelector.maxDeltaSize"), - + // #23023 added a new overload with implementation to trait, so old transport implementations compiled against // older versions will be missing the method. We accept that incompatibility for now. ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.transport.AssociationHandle.disassociate") + ), + "2.5.3" -> Seq( + // #22789 Source.maybe rewritten as a graph stage + ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.MaybePublisher"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.MaybePublisher$MaybeSubscription"), + ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.MaybeSource"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.MaybeSource.newInstance"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.MaybeSource.withAttributes"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.MaybeSource.attributes"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.MaybeSource.create"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.MaybeSource.this"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.MaybePublisher$") ) )