diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala index d740c2ed57..226e48efa6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala @@ -11,10 +11,12 @@ import akka.stream.testkit._ import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl._ import akka.stream._ -import akka.stream.testkit.TestSubscriber.OnComplete - import scala.concurrent.duration._ +import akka.actor.ActorRef +import akka.stream.testkit.TestSubscriber.OnComplete +import org.reactivestreams.Publisher + class ActorRefSourceSpec extends StreamSpec { private implicit val materializer = ActorMaterializer() @@ -22,7 +24,8 @@ class ActorRefSourceSpec extends StreamSpec { "emit received messages to the stream" in { val s = TestSubscriber.manualProbe[Int]() - val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() + val materializer2 = ActorMaterializer() + val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run()(materializer2) val sub = s.expectSubscription() sub.request(2) ref ! 1 @@ -113,43 +116,54 @@ class ActorRefSourceSpec extends StreamSpec { for (n <- 1 to 20) ref ! n ref ! Status.Success(CompletionStrategy.Draining) - s.request(20) - for (n <- 1 to 20) s.expectNext(n) + s.request(10) + for (n <- 1 to 10) s.expectNext(n) + s.expectNoMessage(20.millis) + s.request(10) + for (n <- 11 to 20) s.expectNext(n) s.expectComplete() } "not signal buffered elements but complete immediately the stream after receiving a Status.Success with CompletionStrategy.Immediately" in assertAllStagesStopped { - val (ref, s) = Source - .actorRef(100, OverflowStrategy.fail) - .toMat(TestSink.probe[Int].addAttributes(Attributes.inputBuffer(initial = 1, max = 1)))(Keep.both) - .run() + val (ref, s) = Source.actorRef(100, OverflowStrategy.fail).toMat(TestSink.probe[Int])(Keep.both).run() for (n <- 1 to 20) ref ! n ref ! Status.Success(CompletionStrategy.Immediately) - s.request(20) - var e: Either[OnComplete.type, Int] = null - do { - e = s.expectNextOrComplete() - if (e.right.exists(_ > 10)) fail("Must not drain all remaining elements: " + e) - } while (e.isRight) + s.request(10) + + def verifyNext(n: Int): Unit = { + if (n > 10) + s.expectComplete() + else + s.expectNextOrComplete() match { + case Right(`n`) => verifyNext(n + 1) + case Right(x) => fail("expected $n, got $x") + case Left(_) => // ok, completed + } + } + verifyNext(1) } "not signal buffered elements but complete immediately the stream after receiving a PoisonPill (backwards compatibility)" in assertAllStagesStopped { - val (ref, s) = Source - .actorRef(100, OverflowStrategy.fail) - .toMat(TestSink.probe[Int].addAttributes(Attributes.inputBuffer(initial = 1, max = 1)))(Keep.both) - .run() + val (ref, s) = Source.actorRef(100, OverflowStrategy.fail).toMat(TestSink.probe[Int])(Keep.both).run() for (n <- 1 to 20) ref ! n ref ! PoisonPill - s.request(20) - var e: Either[OnComplete.type, Int] = null - do { - e = s.expectNextOrComplete() - if (e.right.exists(_ > 10)) fail("Must not drain all remaining elements: " + e) - } while (e.isRight) + s.request(10) + + def verifyNext(n: Int): Unit = { + if (n > 10) + s.expectComplete() + else + s.expectNextOrComplete() match { + case Right(`n`) => verifyNext(n + 1) + case Right(x) => fail("expected $n, got $x") + case Left(_) => // ok, completed + } + } + verifyNext(1) } "not buffer elements after receiving Status.Success" in assertAllStagesStopped { @@ -196,5 +210,15 @@ class ActorRefSourceSpec extends StreamSpec { ref.path.name.contains(name) should ===(true) ref ! PoisonPill } + + "be possible to run immediately, reproducer of #26714" in { + (1 to 100).foreach { _ => + val mat = ActorMaterializer() + val source: Source[String, ActorRef] = Source.actorRef[String](10000, OverflowStrategy.fail) + val (_: ActorRef, _: Publisher[String]) = + source.toMat(Sink.asPublisher(false))(Keep.both).run()(mat) + mat.shutdown() + } + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 82c88b5cb0..cacdc9405d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -169,6 +169,14 @@ private[akka] class SubFusingActorMaterializerImpl( extends DeadLetterSuppression with NoSerializationVerificationNeeded + final case class AddFunctionRef(f: (ActorRef, Any) => Unit, name: String) + extends DeadLetterSuppression + with NoSerializationVerificationNeeded + + final case class RemoveFunctionRef(ref: FunctionRef) + extends DeadLetterSuppression + with NoSerializationVerificationNeeded + /** Testing purpose */ case object GetChildren @@ -194,7 +202,13 @@ private[akka] class SubFusingActorMaterializerImpl( case Materialize(props, name) => val impl = context.actorOf(props, name) sender() ! impl - case GetChildren => sender() ! Children(context.children.toSet) + case AddFunctionRef(f, name) => + val ref = context.asInstanceOf[ActorCell].addFunctionRef(f, name) + sender() ! ref + case RemoveFunctionRef(ref) => + context.asInstanceOf[ActorCell].removeFunctionRef(ref) + case GetChildren => + sender() ! Children(context.children.toSet) case StopChildren => context.children.foreach(context.stop) sender() ! StoppedChildren diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 31e5806be6..726a44282f 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -9,6 +9,7 @@ import java.util.concurrent.atomic.AtomicReference import akka.actor._ import akka.annotation.{ ApiMayChange, InternalApi } import akka.japi.function.{ Effect, Procedure } +import akka.pattern.ask import akka.stream._ import akka.stream.actor.ActorSubscriberMessage import akka.stream.impl.fusing.{ GraphInterpreter, GraphStageModule, SubSink, SubSource } @@ -17,11 +18,12 @@ import akka.stream.scaladsl.GenericGraphWithChangedAttributes import akka.util.OptionVal import akka.util.unused import akka.{ Done, NotUsed } - import scala.annotation.tailrec import scala.collection.{ immutable, mutable } import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ Future, Promise } +import scala.concurrent.{ Await, Future, Promise } + +import akka.stream.impl.StreamSupervisor /** * Scala API: A GraphStage represents a reusable graph stream processing operator. @@ -215,27 +217,37 @@ object GraphStageLogic { } private val callback = getAsyncCallback(internalReceive) - private def cell = materializer.supervisor match { - case ref: LocalActorRef => ref.underlying - case ref: RepointableActorRef if ref.isStarted => ref.underlying.asInstanceOf[ActorCell] - case unknown => - throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]") - } - private val functionRef: FunctionRef = - cell.addFunctionRef( - { - case (r, PoisonPill) if poisonPillFallback ⇒ - callback.invoke((r, PoisonPill)) - case (_, m @ (PoisonPill | Kill)) => - materializer.logger.warning( - "{} message sent to StageActor({}) will be ignored, since it is not a real Actor." + - "Use a custom message type to communicate with it instead.", - m, - functionRef.path) - case pair => callback.invoke(pair) - }, - name) + private val functionRef: FunctionRef = { + val f: (ActorRef, Any) => Unit = { + case (r, PoisonPill) if poisonPillFallback ⇒ + callback.invoke((r, PoisonPill)) + case (_, m @ (PoisonPill | Kill)) => + materializer.logger.warning( + "{} message sent to StageActor({}) will be ignored, since it is not a real Actor." + + "Use a custom message type to communicate with it instead.", + m, + functionRef.path) + case pair => callback.invoke(pair) + } + + materializer.supervisor match { + case ref: LocalActorRef => + ref.underlying.addFunctionRef(f, name) + case ref: RepointableActorRef => + if (ref.isStarted) + ref.underlying.asInstanceOf[ActorCell].addFunctionRef(f, name) + else { + // this may happen if materialized immediately before Materializer has been fully initialized, + // should be rare + implicit val timeout = ref.system.settings.CreationTimeout + val reply = (materializer.supervisor ? StreamSupervisor.AddFunctionRef(f, name)).mapTo[FunctionRef] + Await.result(reply, timeout.duration) + } + case unknown => + throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]") + } + } /** * The ActorRef by which this StageActor can be contacted from the outside. @@ -267,7 +279,9 @@ object GraphStageLogic { behavior = receive } - def stop(): Unit = cell.removeFunctionRef(functionRef) + def stop(): Unit = { + materializer.supervisor ! StreamSupervisor.RemoveFunctionRef(functionRef) + } def watch(actorRef: ActorRef): Unit = functionRef.watch(actorRef)