diff --git a/akka-agent/src/main/scala/akka/agent/Agent.scala b/akka-agent/src/main/scala/akka/agent/Agent.scala index 3ed6da63f3..9489ff28f3 100644 --- a/akka-agent/src/main/scala/akka/agent/Agent.scala +++ b/akka-agent/src/main/scala/akka/agent/Agent.scala @@ -156,7 +156,7 @@ object Agent { * participate in that transaction. Agents are integrated with the STM - * any dispatches made in a transaction are held until that transaction * commits, and are discarded if it is retried or aborted. - * + * * @deprecated Agents are deprecated and scheduled for removal in the next major version, use Actors instead. */ @deprecated("Agents are deprecated and scheduled for removal in the next major version, use Actors instead.", since = "2.5.0") diff --git a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst index 774271a207..cf74da0544 100644 --- a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst +++ b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst @@ -4,6 +4,36 @@ Upcoming Migration Guide 2.4.x to 2.5.x ####################################### +Akka Streams +============ + +Removal of StatefulStage, PushPullStage +--------------------------------------- + +``StatefulStage`` and ``PushPullStage`` were first introduced in Akka Streams 1.0, and later deprecated +and replaced by ``GraphStage`` in 2.0-M2. The ``GraphStage`` API has all features (and even more) as the +previous APIs and is even nicer to use. + +Please refer to the GraphStage documentation :ref:` for Scala ` or +the documentation :ref:`for Java `, for details on building custom GraphStages. + +``StatefulStage`` would be migrated to a simple ``GraphStage`` that contains some mutable state in its ``GraphStageLogic``, +and ``PushPullStage`` directly translate to graph stages. + +Removal of ``Source.transform``, replaced by ``via`` +---------------------------------------------------- + +Along with the removal of ``Stage`` (as described above), the ``transform`` methods creating Flows/Sources/Sinks +from ``Stage`` have been removed. They are replaced by using ``GraphStage`` instances with ``via``, e.g.:: + + exampleFlow.transform(() => new MyStage()) + +would now be:: + + myFlow.via(new MyGraphStage) + +as the ``GraphStage`` itself is a factory of logic instances. + Agents ====== @@ -19,6 +49,9 @@ We also anticipate to replace the uses of Agents by the upcoming Akka Typed, so If you use Agents and would like to take over the maintanance thereof, please contact the team on gitter or github. + + + Akka Persistence ================ diff --git a/akka-stream-tests-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala b/akka-stream-tests-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala index 03cff40e30..9ca46b873e 100644 --- a/akka-stream-tests-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala +++ b/akka-stream-tests-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala @@ -3,10 +3,11 @@ */ package akka.stream.tck -import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, Attributes } import akka.stream.scaladsl.Flow -import akka.stream.stage.{ Context, PushStage } -import org.reactivestreams.{ Processor } +import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } +import org.reactivestreams.Processor class TransformProcessorTest extends AkkaIdentityProcessorVerification[Int] { @@ -16,12 +17,16 @@ class TransformProcessorTest extends AkkaIdentityProcessorVerification[Int] { implicit val materializer = ActorMaterializer(settings)(system) - val mkStage = () ⇒ - new PushStage[Int, Int] { - override def onPush(in: Int, ctx: Context[Int]) = ctx.push(in) + val stage = + new SimpleLinearGraphStage[Int] { + override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPush(): Unit = push(out, grab(in)) + override def onPull(): Unit = pull(in) + setHandlers(in, out, this) + } } - Flow[Int].transform(mkStage).toProcessor.run() + Flow[Int].via(stage).toProcessor.run() } override def createElement(element: Int): Int = element diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index e8c8f54641..cdc4e8103f 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -192,43 +192,50 @@ public class FlowTest extends StreamTest { final JavaTestKit probe = new JavaTestKit(system); final Iterable input = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7); // duplicate each element, stop after 4 elements, and emit sum to the end - final Flow flow = Flow.of(Integer.class).transform(new Creator>() { + final Flow flow = Flow.of(Integer.class).via(new GraphStage>() { + + public final Inlet in = Inlet.create("in"); + public final Outlet out = Outlet.create("out"); + @Override - public PushPullStage create() throws Exception { - return new StatefulStage() { + public GraphStageLogic createLogic(Attributes inheritedAttributes) throws Exception { + return new GraphStageLogic(shape()) { int sum = 0; int count = 0; - - @Override - public StageState initial() { - return new StageState() { + + { + setHandler(in, new AbstractInHandler() { @Override - public SyncDirective onPush(Integer element, Context ctx) { - sum += element; - count += 1; - if (count == 4) { - return emitAndFinish(Arrays.asList(element, element, sum).iterator(), ctx); - } else { - return emit(Arrays.asList(element, element).iterator(), ctx); - } + public void onPush() throws Exception { + final Integer element = grab(in); + sum += element; + count += 1; + if (count == 4) { + emitMultiple(out, Arrays.asList(element, element, sum).iterator(), () -> completeStage()); + } else { + emitMultiple(out, Arrays.asList(element, element).iterator()); + } + } - - }; + }); + setHandler(out, new AbstractOutHandler() { + @Override + public void onPull() throws Exception { + pull(in); + } + }); } - - @Override - public TerminationDirective onUpstreamFinish(Context ctx) { - return terminationEmit(Collections.singletonList(sum).iterator(), ctx); - } - }; } - }); - Source.from(input).via(flow).runForeach(new Procedure() { - public void apply(Integer elem) { - probe.getRef().tell(elem, ActorRef.noSender()); + + @Override + public FlowShape shape() { + return FlowShape.of(in, out); } - }, materializer); + } + ); + Source.from(input).via(flow).runForeach((Procedure) elem -> + probe.getRef().tell(elem, ActorRef.noSender()), materializer); probe.expectMsgEquals(0); probe.expectMsgEquals(0); @@ -308,34 +315,47 @@ public class FlowTest extends StreamTest { assertEquals(Arrays.asList(Arrays.asList("A", "B", "C", "."), Arrays.asList("D", "."), Arrays.asList("E", "F")), result); } - public Creator> op() { - return new akka.japi.function.Creator>() { + public GraphStage> op() { + return new GraphStage>() { + public final Inlet in = Inlet.create("in"); + public final Outlet out = Outlet.create("out"); + @Override - public PushPullStage create() throws Exception { - return new PushPullStage() { - @Override - public SyncDirective onPush(T element, Context ctx) { - return ctx.push(element); - } - - @Override - public SyncDirective onPull(Context ctx) { - return ctx.pull(); + public GraphStageLogic createLogic(Attributes inheritedAttributes) throws Exception { + return new GraphStageLogic(shape()) { + { + setHandler(in, new AbstractInHandler() { + @Override + public void onPush() throws Exception { + push(out, grab(in)); + } + }); + setHandler(out, new AbstractOutHandler() { + @Override + public void onPull() throws Exception { + pull(in); + } + }); } }; } + + @Override + public FlowShape shape() { + return FlowShape.of(in, out); + } }; } @Test public void mustBeAbleToUseMerge() throws Exception { final Flow f1 = - Flow.of(String.class).transform(FlowTest.this. op()).named("f1"); + Flow.of(String.class).via(FlowTest.this.op()).named("f1"); final Flow f2 = - Flow.of(String.class).transform(FlowTest.this. op()).named("f2"); + Flow.of(String.class).via(FlowTest.this.op()).named("f2"); @SuppressWarnings("unused") final Flow f3 = - Flow.of(String.class).transform(FlowTest.this. op()).named("f3"); + Flow.of(String.class).via(FlowTest.this.op()).named("f3"); final Source in1 = Source.from(Arrays.asList("a", "b", "c")); final Source in2 = Source.from(Arrays.asList("d", "e", "f")); @@ -849,11 +869,7 @@ public class FlowTest extends StreamTest { Integer result = Source.maybe() .via(Flow.of(Integer.class) - .keepAlive(Duration.create(1, "second"), new Creator() { - public Integer create() { - return 0; - } - }) + .keepAlive(Duration.create(1, "second"), (Creator) () -> 0) ) .takeWithin(Duration.create(1500, "milliseconds")) .runWith(Sink.head(), materializer) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index b3fd3bc457..399369838b 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -91,59 +91,6 @@ public class SourceTest extends StreamTest { probe.expectMsgEquals("Done"); } - @Ignore("StatefulStage to be converted to GraphStage when Java Api is available (#18817)") @Test - public void mustBeAbleToUseTransform() { - final JavaTestKit probe = new JavaTestKit(system); - final Iterable input = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7); - // duplicate each element, stop after 4 elements, and emit sum to the end - Source.from(input).transform(new Creator>() { - @Override - public PushPullStage create() throws Exception { - return new StatefulStage() { - int sum = 0; - int count = 0; - - @Override - public StageState initial() { - return new StageState() { - @Override - public SyncDirective onPush(Integer element, Context ctx) { - sum += element; - count += 1; - if (count == 4) { - return emitAndFinish(Arrays.asList(element, element, sum).iterator(), ctx); - } else { - return emit(Arrays.asList(element, element).iterator(), ctx); - } - } - - }; - } - - @Override - public TerminationDirective onUpstreamFinish(Context ctx) { - return terminationEmit(Collections.singletonList(sum).iterator(), ctx); - } - - }; - } - }).runForeach(new Procedure() { - public void apply(Integer elem) { - probe.getRef().tell(elem, ActorRef.noSender()); - } - }, materializer); - - probe.expectMsgEquals(0); - probe.expectMsgEquals(0); - probe.expectMsgEquals(1); - probe.expectMsgEquals(1); - probe.expectMsgEquals(2); - probe.expectMsgEquals(2); - probe.expectMsgEquals(3); - probe.expectMsgEquals(3); - probe.expectMsgEquals(6); - } - @SuppressWarnings("unchecked") @Test public void mustBeAbleToUseGroupBy() throws Exception { diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala index 071fa4cbf0..200594e89e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala @@ -6,7 +6,6 @@ package akka.stream.impl.fusing import akka.NotUsed import akka.stream.testkit.StreamSpec import akka.stream.{ OverflowStrategy, Attributes } -import akka.stream.stage.AbstractStage.PushPullGraphStage import akka.stream.scaladsl.{ Merge, Broadcast, Balance, Zip } import GraphInterpreter._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index afa55d3884..41424807f1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -8,7 +8,6 @@ import akka.stream.ActorAttributes.SupervisionStrategy import akka.stream.Supervision.Decider import akka.stream._ import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, Failed, GraphAssembly, UpstreamBoundaryStageLogic } -import akka.stream.stage.AbstractStage.PushPullGraphStage import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, _ } import akka.stream.testkit.StreamSpec import akka.stream.testkit.Utils.TE @@ -330,15 +329,6 @@ trait GraphInterpreterSpecKit extends StreamSpec { .init() } - implicit class ToGraphStage[I, O](stage: Stage[I, O]) { - def toGS: PushPullGraphStage[Any, Any, Any] = { - val s = stage - new PushPullGraphStage[Any, Any, Any]( - (_) ⇒ s.asInstanceOf[Stage[Any, Any]], - Attributes.none) - } - } - abstract class OneBoundedSetupWithDecider[T](decider: Decider, _ops: GraphStageWithMaterializedValue[Shape, Any]*) extends Builder { val ops = _ops.toArray diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala index 19d99cae1b..38036f9c92 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala @@ -553,26 +553,6 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { upstream.onNextAndComplete(1) lastEvents() should be(Set(OnNext(1), OnComplete)) - - } - - //#20386 - @deprecated("Usage of PushPullStage is deprecated, please use GraphStage instead", "2.4.5") - class InvalidAbsorbTermination extends PushPullStage[Int, Int] { - override def onPull(ctx: Context[Int]): SyncDirective = ctx.pull() - override def onPush(elem: Int, ctx: Context[Int]): SyncDirective = ctx.push(elem) - override def onDownstreamFinish(ctx: Context[Int]): TerminationDirective = ctx.absorbTermination() - } - - // This test must be kept since it tests the compatibility layer, which while is deprecated it is still here. - "not allow absorbTermination from onDownstreamFinish()" in new OneBoundedSetup[Int]((new InvalidAbsorbTermination).toGS) { - lastEvents() should be(Set.empty) - - EventFilter[UnsupportedOperationException]("It is not allowed to call absorbTermination() from onDownstreamFinish.", occurrences = 1).intercept { - downstream.cancel() - lastEvents() should be(Set(Cancel)) - } - } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala index 3fe49fa454..7d50ebab75 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala @@ -12,7 +12,6 @@ import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Random - import akka.actor.ActorSystem import akka.pattern.{ after ⇒ later } import akka.stream._ @@ -25,6 +24,8 @@ import akka.testkit.EventFilter import akka.util.ByteString import javax.net.ssl._ +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage + object TlsSpec { val rnd = new Random @@ -351,12 +352,17 @@ class TlsSpec extends StreamSpec("akka.loglevel=INFO\nakka.actor.debug.receive=o val f = Source(scenario.inputs) .via(commPattern.decorateFlow(scenario.leftClosing, scenario.rightClosing, onRHS)) - .transform(() ⇒ new PushStage[SslTlsInbound, SslTlsInbound] { - override def onPush(elem: SslTlsInbound, ctx: Context[SslTlsInbound]) = - ctx.push(elem) - override def onDownstreamFinish(ctx: Context[SslTlsInbound]) = { - system.log.debug("me cancelled") - ctx.finish() + .via(new SimpleLinearGraphStage[SslTlsInbound] { + override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { + setHandlers(in, out, this) + + override def onPush() = push(out, grab(in)) + override def onPull() = pull(in) + + override def onDownstreamFinish() = { + system.log.debug("me cancelled") + completeStage() + } } }) .via(debug) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 2080dc1f2a..4e1cadb465 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -9,7 +9,6 @@ import akka.stream.Supervision._ import akka.stream.impl._ import akka.stream.impl.fusing.ActorGraphInterpreter import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly -import akka.stream.stage.AbstractStage.PushPullGraphStage import akka.stream.testkit.Utils._ import akka.stream.testkit._ import akka.stream._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala deleted file mode 100644 index 6c75b4e088..0000000000 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala +++ /dev/null @@ -1,455 +0,0 @@ -/** - * Copyright (C) 2014-2016 Lightbend Inc. - */ -package akka.stream.scaladsl - -import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } -import akka.stream.stage._ -import scala.concurrent.duration._ -import scala.util.control.NoStackTrace -import akka.stream.ActorMaterializer -import akka.stream.ActorMaterializerSettings -import akka.stream.testkit._ -import akka.stream.testkit.Utils._ -import akka.testkit.{ EventFilter, TestProbe } -import com.typesafe.config.ConfigFactory -import scala.concurrent.duration._ -import scala.util.control.NoStackTrace -import akka.stream.testkit.scaladsl.TestSink -import akka.stream.testkit.scaladsl.TestSource - -class FlowStageSpec extends StreamSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { - - val settings = ActorMaterializerSettings(system) - .withInputBuffer(initialSize = 2, maxSize = 2) - - implicit val materializer = ActorMaterializer(settings) - - "A Flow with transform operations" must { - "produce one-to-one transformation as expected" in assertAllStagesStopped { - val p = Source(List(1, 2, 3)).runWith(Sink.asPublisher(false)) - val p2 = Source.fromPublisher(p). - transform(() ⇒ new PushStage[Int, Int] { - var tot = 0 - override def onPush(elem: Int, ctx: Context[Int]) = { - tot += elem - ctx.push(tot) - } - }). - runWith(Sink.asPublisher(false)) - val subscriber = TestSubscriber.manualProbe[Int]() - p2.subscribe(subscriber) - val subscription = subscriber.expectSubscription() - subscription.request(1) - subscriber.expectNext(1) - subscriber.expectNoMsg(200.millis) - subscription.request(2) - subscriber.expectNext(3) - subscriber.expectNext(6) - subscriber.expectComplete() - } - - "produce one-to-several transformation as expected" in assertAllStagesStopped { - val p = Source(List(1, 2, 3)).runWith(Sink.asPublisher(false)) - val p2 = Source.fromPublisher(p). - transform(() ⇒ new StatefulStage[Int, Int] { - var tot = 0 - - lazy val waitForNext = new State { - override def onPush(elem: Int, ctx: Context[Int]) = { - tot += elem - emit(Iterator.fill(elem)(tot), ctx) - } - } - - override def initial = waitForNext - - override def onUpstreamFinish(ctx: Context[Int]): TerminationDirective = { - if (current eq waitForNext) ctx.finish() - else ctx.absorbTermination() - } - - }). - runWith(Sink.asPublisher(false)) - val subscriber = TestSubscriber.manualProbe[Int]() - p2.subscribe(subscriber) - val subscription = subscriber.expectSubscription() - subscription.request(4) - subscriber.expectNext(1) - subscriber.expectNext(3) - subscriber.expectNext(3) - subscriber.expectNext(6) - subscriber.expectNoMsg(200.millis) - subscription.request(100) - subscriber.expectNext(6) - subscriber.expectNext(6) - subscriber.expectComplete() - } - - "produce one-to-several transformation with state change" in { - val p = - Source(List(3, 2, 1, 0, 1, 12)). - transform(() ⇒ new StatefulStage[Int, Int] { - // a transformer that - // - for the first element, returns n times 42 - // - echos the remaining elements (can be reset to the duplication state by getting `0`) - - override def initial = inflate - lazy val inflate: State = new State { - override def onPush(elem: Int, ctx: Context[Int]) = { - emit(Iterator.fill(elem)(42), ctx, echo) - } - } - lazy val echo: State = new State { - def onPush(elem: Int, ctx: Context[Int]): SyncDirective = - if (elem == 0) { - become(inflate) - ctx.pull() - } else ctx.push(elem) - } - }).runWith(Sink.asPublisher(false)) - - val subscriber = TestSubscriber.manualProbe[Int]() - p.subscribe(subscriber) - val subscription = subscriber.expectSubscription() - subscription.request(50) - - // inflating: 3 times 42 - subscriber.expectNext(42) - subscriber.expectNext(42) - subscriber.expectNext(42) - - // echoing - subscriber.expectNext(2) - subscriber.expectNext(1) - - // reset - // inflating: 1 times 42 - subscriber.expectNext(42) - - // echoing - subscriber.expectNext(12) - subscriber.expectComplete() - } - - "produce dropping transformation as expected" in { - val p = Source(List(1, 2, 3, 4)).runWith(Sink.asPublisher(false)) - val p2 = Source.fromPublisher(p). - transform(() ⇒ new PushStage[Int, Int] { - var tot = 0 - override def onPush(elem: Int, ctx: Context[Int]) = { - tot += elem - if (elem % 2 == 0) - ctx.pull() - else - ctx.push(tot) - } - }). - runWith(Sink.asPublisher(false)) - val subscriber = TestSubscriber.manualProbe[Int]() - p2.subscribe(subscriber) - val subscription = subscriber.expectSubscription() - subscription.request(1) - subscriber.expectNext(1) - subscriber.expectNoMsg(200.millis) - subscription.request(1) - subscriber.expectNext(6) - subscription.request(1) - subscriber.expectComplete() - } - - "produce multi-step transformation as expected" in { - val p = Source(List("a", "bc", "def")).runWith(Sink.asPublisher(false)) - val p2 = Source.fromPublisher(p). - transform(() ⇒ new PushStage[String, Int] { - var concat = "" - override def onPush(elem: String, ctx: Context[Int]) = { - concat += elem - ctx.push(concat.length) - } - }). - transform(() ⇒ new PushStage[Int, Int] { - var tot = 0 - override def onPush(length: Int, ctx: Context[Int]) = { - tot += length - ctx.push(tot) - } - }). - runWith(Sink.asPublisher(true)) - val c1 = TestSubscriber.manualProbe[Int]() - p2.subscribe(c1) - val sub1 = c1.expectSubscription() - val c2 = TestSubscriber.manualProbe[Int]() - p2.subscribe(c2) - val sub2 = c2.expectSubscription() - sub1.request(1) - sub2.request(2) - c1.expectNext(1) - c2.expectNext(1) - c2.expectNext(4) - c1.expectNoMsg(200.millis) - sub1.request(2) - sub2.request(2) - c1.expectNext(4) - c1.expectNext(10) - c2.expectNext(10) - c1.expectComplete() - c2.expectComplete() - } - - "support emit onUpstreamFinish" in assertAllStagesStopped { - val p = Source(List("a")).runWith(Sink.asPublisher(false)) - val p2 = Source.fromPublisher(p). - transform(() ⇒ new StatefulStage[String, String] { - var s = "" - override def initial = new State { - override def onPush(element: String, ctx: Context[String]) = { - s += element - ctx.pull() - } - } - override def onUpstreamFinish(ctx: Context[String]) = - terminationEmit(Iterator.single(s + "B"), ctx) - }). - runWith(Sink.asPublisher(false)) - val c = TestSubscriber.manualProbe[String]() - p2.subscribe(c) - val s = c.expectSubscription() - s.request(1) - c.expectNext("aB") - c.expectComplete() - } - - "allow early finish" in assertAllStagesStopped { - val (p1, p2) = TestSource.probe[Int]. - transform(() ⇒ new PushStage[Int, Int] { - var s = "" - override def onPush(element: Int, ctx: Context[Int]) = { - s += element - if (s == "1") - ctx.pushAndFinish(element) - else - ctx.push(element) - } - }) - .toMat(TestSink.probe[Int])(Keep.both).run - p2.request(10) - p1.sendNext(1) - .sendNext(2) - p2.expectNext(1) - .expectComplete() - p1.expectCancellation() - } - - "report error when exception is thrown" in assertAllStagesStopped { - val p = Source(List(1, 2, 3)).runWith(Sink.asPublisher(false)) - val p2 = Source.fromPublisher(p). - transform(() ⇒ new StatefulStage[Int, Int] { - override def initial = new State { - override def onPush(elem: Int, ctx: Context[Int]) = { - if (elem == 2) { - throw new IllegalArgumentException("two not allowed") - } else { - emit(Iterator(elem, elem), ctx) - } - } - } - }). - runWith(TestSink.probe[Int]) - EventFilter[IllegalArgumentException]("two not allowed") intercept { - p2.request(100) - .expectNext(1) - .expectNext(1) - .expectError().getMessage should be("two not allowed") - p2.expectNoMsg(200.millis) - } - } - - "support emit of final elements when onUpstreamFailure" in assertAllStagesStopped { - val p = Source(List(1, 2, 3)).runWith(Sink.asPublisher(false)) - val p2 = Source.fromPublisher(p). - map(elem ⇒ if (elem == 2) throw new IllegalArgumentException("two not allowed") else elem). - transform(() ⇒ new StatefulStage[Int, Int] { - override def initial = new State { - override def onPush(elem: Int, ctx: Context[Int]) = ctx.push(elem) - } - - override def onUpstreamFailure(cause: Throwable, ctx: Context[Int]) = { - terminationEmit(Iterator(100, 101), ctx) - } - }). - filter(elem ⇒ elem != 1). // it's undefined if element 1 got through before the error or not - runWith(TestSink.probe[Int]) - EventFilter[IllegalArgumentException]("two not allowed") intercept { - p2.request(100) - .expectNext(100) - .expectNext(101) - .expectComplete() - .expectNoMsg(200.millis) - } - } - - "support cancel as expected" in assertAllStagesStopped { - val p = Source(1 to 100).runWith(Sink.asPublisher(false)) - val received = Source.fromPublisher(p). - transform(() ⇒ new StatefulStage[Int, Int] { - override def initial = new State { - override def onPush(elem: Int, ctx: Context[Int]) = - emit(Iterator(elem, elem), ctx) - } - }) - .runWith(TestSink.probe[Int]) - .request(1000) - .expectNext(1) - .cancel() - .receiveWithin(1.second) - received.size should be < 200 - received.foldLeft((true, 1)) { - case ((flag, last), next) ⇒ (flag && (last == next || last == next - 1), next) - }._1 should be(true) - } - - "support producing elements from empty inputs" in assertAllStagesStopped { - val p = Source(List.empty[Int]).runWith(Sink.asPublisher(false)) - Source.fromPublisher(p). - transform(() ⇒ new StatefulStage[Int, Int] { - override def initial = new State { - override def onPush(elem: Int, ctx: Context[Int]) = ctx.pull() - } - override def onUpstreamFinish(ctx: Context[Int]) = - terminationEmit(Iterator(1, 2, 3), ctx) - }) - .runWith(TestSink.probe[Int]) - .request(4) - .expectNext(1) - .expectNext(2) - .expectNext(3) - .expectComplete() - - } - - "support converting onComplete into onError" in { - Source(List(5, 1, 2, 3)).transform(() ⇒ new PushStage[Int, Int] { - var expectedNumberOfElements: Option[Int] = None - var count = 0 - override def onPush(elem: Int, ctx: Context[Int]) = - if (expectedNumberOfElements.isEmpty) { - expectedNumberOfElements = Some(elem) - ctx.pull() - } else { - count += 1 - ctx.push(elem) - } - - override def onUpstreamFinish(ctx: Context[Int]) = - expectedNumberOfElements match { - case Some(expected) if count != expected ⇒ - throw new RuntimeException(s"Expected $expected, got $count") with NoStackTrace - case _ ⇒ ctx.finish() - } - }).runWith(TestSink.probe[Int]) - .request(10) - .expectNext(1) - .expectNext(2) - .expectNext(3) - .expectError().getMessage should be("Expected 5, got 3") - } - - "be safe to reuse" in { - val flow = Source(1 to 3).transform(() ⇒ - new PushStage[Int, Int] { - var count = 0 - - override def onPush(elem: Int, ctx: Context[Int]) = { - count += 1 - ctx.push(count) - } - }) - - flow.runWith(TestSink.probe[Int]) - .request(3) - .expectNext(1, 2, 3) - .expectComplete() - - flow.runWith(TestSink.probe[Int]) - .request(3) - .expectNext(1, 2, 3) - .expectComplete() - } - - "handle early cancelation" in assertAllStagesStopped { - val onDownstreamFinishProbe = TestProbe() - val down = TestSubscriber.manualProbe[Int]() - val s = Source.asSubscriber[Int]. - transform(() ⇒ new PushStage[Int, Int] { - override def onPush(elem: Int, ctx: Context[Int]) = - ctx.push(elem) - override def onDownstreamFinish(ctx: Context[Int]): TerminationDirective = { - onDownstreamFinishProbe.ref ! "onDownstreamFinish" - ctx.finish() - } - }). - to(Sink.fromSubscriber(down)).run() - - val downstream = down.expectSubscription() - downstream.cancel() - onDownstreamFinishProbe.expectMsg("onDownstreamFinish") - - val up = TestPublisher.manualProbe[Int]() - up.subscribe(s) - val upsub = up.expectSubscription() - upsub.expectCancellation() - } - - "not trigger onUpstreamFinished after pushAndFinish" in assertAllStagesStopped { - val in = TestPublisher.manualProbe[Int]() - val flow = - Source.fromPublisher(in) - .transform(() ⇒ new StatefulStage[Int, Int] { - - def initial: StageState[Int, Int] = new State { - override def onPush(element: Int, ctx: Context[Int]) = - ctx.pushAndFinish(element) - } - override def onUpstreamFinish(ctx: Context[Int]): TerminationDirective = - terminationEmit(Iterator(42), ctx) - }) - .runWith(Sink.asPublisher(false)) - - val inSub = in.expectSubscription() - - val out = TestSubscriber.manualProbe[Int]() - flow.subscribe(out) - val outSub = out.expectSubscription() - - inSub.sendNext(23) - inSub.sendComplete() - - outSub.request(1) // it depends on this line, i.e. generating backpressure between buffer and stage execution - - out.expectNext(23) - out.expectComplete() - } - - "chain elements to currently emitting on upstream finish" in assertAllStagesStopped { - Source.single("hi") - .transform(() ⇒ new StatefulStage[String, String] { - override def initial = new State { - override def onPush(elem: String, ctx: Context[String]) = - emit(Iterator(elem + "1", elem + "2"), ctx) - } - override def onUpstreamFinish(ctx: Context[String]) = { - terminationEmit(Iterator("byebye"), ctx) - } - }) - .runWith(TestSink.probe[String]) - .request(1) - .expectNext("hi1") - .request(2) - .expectNext("hi2") - .expectNext("byebye") - .expectComplete() - } - } - -} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphDSLCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphDSLCompileSpec.scala index 36592d5b86..12a6873440 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphDSLCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphDSLCompileSpec.scala @@ -5,7 +5,7 @@ package akka.stream.scaladsl import akka.NotUsed import akka.stream.impl.fusing.GraphStages -import akka.stream.{ ActorMaterializer, ClosedShape, FlowShape, OverflowStrategy } +import akka.stream._ import akka.stream.testkit._ import akka.stream.stage._ @@ -19,21 +19,26 @@ class GraphDSLCompileSpec extends StreamSpec { implicit val materializer = ActorMaterializer() - def op[In, Out]: () ⇒ PushStage[In, Out] = { () ⇒ - new PushStage[In, Out] { - override def onPush(elem: In, ctx: Context[Out]): SyncDirective = - ctx.push(elem.asInstanceOf[Out]) + def op[In, Out] = new GraphStage[FlowShape[In, Out]] { + val in = Inlet[In]("op.in") + val out = Outlet[Out]("op.out") + override val shape = FlowShape[In, Out](in, out) + override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPush() = push(out, grab(in).asInstanceOf[Out]) + override def onPull(): Unit = pull(in) + setHandlers(in, out, this) } + } val apples = () ⇒ Iterator.continually(new Apple) - val f1 = Flow[String].transform(op[String, String]).named("f1") - val f2 = Flow[String].transform(op[String, String]).named("f2") - val f3 = Flow[String].transform(op[String, String]).named("f3") - val f4 = Flow[String].transform(op[String, String]).named("f4") - val f5 = Flow[String].transform(op[String, String]).named("f5") - val f6 = Flow[String].transform(op[String, String]).named("f6") + val f1 = Flow[String].via(op[String, String]).named("f1") + val f2 = Flow[String].via(op[String, String]).named("f2") + val f3 = Flow[String].via(op[String, String]).named("f3") + val f4 = Flow[String].via(op[String, String]).named("f4") + val f5 = Flow[String].via(op[String, String]).named("f5") + val f6 = Flow[String].via(op[String, String]).named("f6") val in1 = Source(List("a", "b", "c")) val in2 = Source(List("d", "e", "f")) @@ -169,7 +174,7 @@ class GraphDSLCompileSpec extends StreamSpec { val out2 = Sink.asPublisher[String](false) val out9 = Sink.asPublisher[String](false) val out10 = Sink.asPublisher[String](false) - def f(s: String) = Flow[String].transform(op[String, String]).named(s) + def f(s: String) = Flow[String].via(op[String, String]).named(s) import GraphDSL.Implicits._ in7 ~> f("a") ~> b7 ~> f("b") ~> m11 ~> f("c") ~> b11 ~> f("d") ~> out2 diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index bbd575e8b7..8f8d81427d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -7,8 +7,6 @@ import akka.stream.ActorAttributes.SupervisionStrategy import akka.stream.Attributes._ import akka.stream.Supervision.Decider import akka.stream._ -import akka.stream.stage.AbstractStage.PushPullGraphStage -import akka.stream.stage.Stage /** * INTERNAL API @@ -136,24 +134,4 @@ object Stages { import DefaultAttributes._ - /* - * Stage that is backed by a GraphStage but can be symbolically introspected - */ - case class SymbolicGraphStage[-In, +Out, Ext](symbolicStage: SymbolicStage[In, Out]) - extends PushPullGraphStage[In, Out, Ext]( - symbolicStage.create, - symbolicStage.attributes) { - } - - sealed trait SymbolicStage[-In, +Out] { - def attributes: Attributes - def create(effectiveAttributes: Attributes): Stage[In, Out] - - // FIXME: No supervision hooked in yet. - - protected def supervision(attributes: Attributes): Decider = - attributes.get[SupervisionStrategy](SupervisionStrategy(Supervision.stoppingDecider)).decider - - } - } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala index 37704497b9..fd1d4b9f33 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala @@ -3,10 +3,9 @@ */ package akka.stream.impl.fusing -import akka.event.{ NoLogging } +import akka.event.NoLogging import akka.stream._ import akka.stream.impl.fusing.GraphInterpreter.{ GraphAssembly, DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic } -import akka.stream.stage.AbstractStage.PushPullGraphStage import akka.stream.stage._ import java.{ util ⇒ ju } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index d07a47cdc9..b0dd0b209f 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -8,7 +8,6 @@ import akka.event.LoggingAdapter import akka.japi.{ Pair, function } import akka.stream.impl.{ ConstantFun, StreamLayout } import akka.stream._ -import akka.stream.stage.Stage import org.reactivestreams.Processor import scala.annotation.unchecked.uncheckedVariance @@ -1098,15 +1097,6 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def buffer(size: Int, overflowStrategy: OverflowStrategy): javadsl.Flow[In, Out, Mat] = new Flow(delegate.buffer(size, overflowStrategy)) - /** - * Generic transformation of a stream with a custom processing [[akka.stream.stage.Stage]]. - * This operator makes it possible to extend the `Flow` API when there is no specialized - * operator that performs the transformation. - */ - @deprecated("Use via(GraphStage) instead.", "2.4.3") - def transform[U](mkStage: function.Creator[Stage[Out, U]]): javadsl.Flow[In, U, Mat] = - new Flow(delegate.transform(() ⇒ mkStage.create())) - /** * Takes up to `n` elements from the stream (less than `n` if the upstream completes before emitting `n` elements) * and returns a pair containing a strict sequence of the taken element diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index aae573eb12..f9510c1706 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -11,7 +11,6 @@ import akka.event.LoggingAdapter import akka.japi.{ Pair, Util, function } import akka.stream._ import akka.stream.impl.{ ConstantFun, StreamLayout, SourceQueueAdapter } -import akka.stream.stage.Stage import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance import scala.collection.JavaConverters._ @@ -1700,15 +1699,6 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def buffer(size: Int, overflowStrategy: OverflowStrategy): javadsl.Source[Out, Mat] = new Source(delegate.buffer(size, overflowStrategy)) - /** - * Generic transformation of a stream with a custom processing [[akka.stream.stage.Stage]]. - * This operator makes it possible to extend the `Flow` API when there is no specialized - * operator that performs the transformation. - */ - @deprecated("Use via(GraphStage) instead.", "2.4.3") - def transform[U](mkStage: function.Creator[Stage[Out, U]]): javadsl.Source[U, Mat] = - new Source(delegate.transform(() ⇒ mkStage.create())) - /** * Takes up to `n` elements from the stream (less than `n` if the upstream completes before emitting `n` elements) * and returns a pair containing a strict sequence of the taken element diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 78efb1c14d..40044932de 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -8,7 +8,6 @@ import akka.event.LoggingAdapter import akka.japi.function import akka.stream._ import akka.stream.impl.ConstantFun -import akka.stream.stage.Stage import scala.collection.JavaConverters._ import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.duration.FiniteDuration @@ -921,15 +920,6 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def buffer(size: Int, overflowStrategy: OverflowStrategy): SubFlow[In, Out, Mat] = new SubFlow(delegate.buffer(size, overflowStrategy)) - /** - * Generic transformation of a stream with a custom processing [[akka.stream.stage.Stage]]. - * This operator makes it possible to extend the `Flow` API when there is no specialized - * operator that performs the transformation. - */ - @deprecated("Use via(GraphStage) instead.", "2.4.3") - def transform[U](mkStage: function.Creator[Stage[Out, U]]): SubFlow[In, U, Mat] = - new SubFlow(delegate.transform(() ⇒ mkStage.create())) - /** * Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements) * and returns a pair containing a strict sequence of the taken element diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index cd67bffe5d..96f6d4c943 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -8,7 +8,6 @@ import akka.event.LoggingAdapter import akka.japi.function import akka.stream._ import akka.stream.impl.ConstantFun -import akka.stream.stage.Stage import scala.collection.JavaConverters._ import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.duration.FiniteDuration @@ -919,15 +918,6 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def buffer(size: Int, overflowStrategy: OverflowStrategy): SubSource[Out, Mat] = new SubSource(delegate.buffer(size, overflowStrategy)) - /** - * Generic transformation of a stream with a custom processing [[akka.stream.stage.Stage]]. - * This operator makes it possible to extend the `Flow` API when there is no specialized - * operator that performs the transformation. - */ - @deprecated("Use via(GraphStage) instead.", "2.4.3") - def transform[U](mkStage: function.Creator[Stage[Out, U]]): SubSource[U, Mat] = - new SubSource(delegate.transform(() ⇒ mkStage.create())) - /** * Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements) * and returns a pair containing a strict sequence of the taken element diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 272fdf1b5c..1f89de2f25 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -9,7 +9,6 @@ import akka.Done import akka.stream.impl.StreamLayout.Module import akka.stream.impl._ import akka.stream.impl.fusing._ -import akka.stream.stage.AbstractStage.{ PushPullGraphStage, PushPullGraphStageWithMaterializedValue } import akka.stream.stage._ import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } import scala.annotation.unchecked.uncheckedVariance @@ -1210,15 +1209,6 @@ trait FlowOps[+Out, +Mat] { */ def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out] = via(fusing.Buffer(size, overflowStrategy)) - /** - * Generic transformation of a stream with a custom processing [[akka.stream.stage.Stage]]. - * This operator makes it possible to extend the `Flow` API when there is no specialized - * operator that performs the transformation. - */ - @deprecated("Use via(GraphStage) instead.", "2.4.3") - def transform[T](mkStage: () ⇒ Stage[Out, T]): Repr[T] = - via(new PushPullGraphStage((attr) ⇒ mkStage(), Attributes.none)) - /** * Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements) * and returns a pair containing a strict sequence of the taken element @@ -1965,9 +1955,6 @@ trait FlowOps[+Out, +Mat] { */ def async: Repr[Out] - /** INTERNAL API */ - private[scaladsl] def andThen[T](op: SymbolicStage[Out, T]): Repr[T] = - via(SymbolicGraphStage(op)) } /** @@ -2200,10 +2187,4 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { def monitor[Mat2]()(combine: (Mat, FlowMonitor[Out]) ⇒ Mat2): ReprMat[Out, Mat2] = viaMat(GraphStages.monitor)(combine) - /** - * INTERNAL API. - */ - private[akka] def transformMaterializing[T, M](mkStageAndMaterialized: () ⇒ (Stage[Out, T], M)): ReprMat[T, M] = - viaMat(new PushPullGraphStageWithMaterializedValue[Out, T, NotUsed, M]((attr) ⇒ mkStageAndMaterialized(), Attributes.none))(Keep.right) - } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala index 15726b2457..7a677efad7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala @@ -7,7 +7,8 @@ import java.nio.ByteOrder import akka.NotUsed import akka.stream.impl.Stages.DefaultAttributes -import akka.stream.{ Attributes, Inlet, Outlet, FlowShape } +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage +import akka.stream.{ Attributes, FlowShape, Inlet, Outlet } import akka.stream.stage._ import akka.util.{ ByteIterator, ByteString } @@ -90,17 +91,7 @@ object Framing { * Protocol encoder that is used by [[Framing#simpleFramingProtocol]] */ def simpleFramingProtocolEncoder(maximumMessageLength: Int): Flow[ByteString, ByteString, NotUsed] = - Flow[ByteString].transform(() ⇒ new PushStage[ByteString, ByteString] { - override def onPush(message: ByteString, ctx: Context[ByteString]): SyncDirective = { - val msgSize = message.size - if (msgSize > maximumMessageLength) - ctx.fail(new FramingException(s"Maximum allowed message size is $maximumMessageLength but tried to send $msgSize bytes")) - else { - val header = ByteString((msgSize >> 24) & 0xFF, (msgSize >> 16) & 0xFF, (msgSize >> 8) & 0xFF, msgSize & 0xFF) - ctx.push(header ++ message) - } - } - }) + Flow[ByteString].via(new SimpleFramingProtocolEncoder(maximumMessageLength)) class FramingException(msg: String) extends RuntimeException(msg) @@ -128,6 +119,26 @@ object Framing { decoded & Mask } + private class SimpleFramingProtocolEncoder(maximumMessageLength: Long) extends SimpleLinearGraphStage[ByteString] { + override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { + setHandlers(in, out, this) + + override def onPush(): Unit = { + val message = grab(in) + val msgSize = message.size + + if (msgSize > maximumMessageLength) + failStage(new FramingException(s"Maximum allowed message size is $maximumMessageLength but tried to send $msgSize bytes")) + else { + val header = ByteString((msgSize >> 24) & 0xFF, (msgSize >> 16) & 0xFF, (msgSize >> 8) & 0xFF, msgSize & 0xFF) + push(out, header ++ message) + } + } + + override def onPull(): Unit = pull(in) + } + } + private class DelimiterFramingStage(val separatorBytes: ByteString, val maximumLineBytes: Int, val allowTruncation: Boolean) extends GraphStage[FlowShape[ByteString, ByteString]] { diff --git a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala deleted file mode 100644 index 865eb31462..0000000000 --- a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala +++ /dev/null @@ -1,678 +0,0 @@ -/** - * Copyright (C) 2014-2016 Lightbend Inc. - */ -package akka.stream.stage - -import akka.NotUsed -import akka.stream._ - -import scala.util.control.NonFatal - -/** - * General interface for stream transformation. - * - * Custom `Stage` implementations are intended to be used with - * [[akka.stream.scaladsl.FlowOps#transform]] or - * [[akka.stream.javadsl.Flow#transform]] to extend the `Flow` API when there - * is no specialized operator that performs the transformation. - * - * Custom implementations are subclasses of [[PushPullStage]] or - * [[DetachedStage]]. Sometimes it is convenient to extend - * [[StatefulStage]] for support of become like behavior. - * - * It is possible to keep state in the concrete `Stage` instance with - * ordinary instance variables. The `Transformer` is executed by an actor and - * therefore you do not have to add any additional thread safety or memory - * visibility constructs to access the state from the callback methods. - * - * @see [[akka.stream.scaladsl.Flow#transform]] - * @see [[akka.stream.javadsl.Flow#transform]] - */ -@deprecated("Please use GraphStage instead.", "2.4.2") -sealed trait Stage[-In, +Out] - -/** - * INTERNAL API - */ -object AbstractStage { - - private class PushPullGraphLogic[In, Out]( - private val shape: FlowShape[In, Out], - val attributes: Attributes, - val stage: AbstractStage[In, Out, Directive, Directive, Context[Out], LifecycleContext]) - extends GraphStageLogic(shape) with DetachedContext[Out] { - - final override def materializer: Materializer = interpreter.materializer - - private def ctx: DetachedContext[Out] = this - - private var currentStage: AbstractStage[In, Out, Directive, Directive, Context[Out], LifecycleContext] = stage - - { - // No need to refer to the handler in a private val - val handler = new InHandler with OutHandler { - override def onPush(): Unit = - try { currentStage.onPush(grab(shape.in), ctx) } catch { case NonFatal(ex) ⇒ onSupervision(ex) } - - override def onPull(): Unit = currentStage.onPull(ctx) - - override def onUpstreamFinish(): Unit = currentStage.onUpstreamFinish(ctx) - - override def onUpstreamFailure(ex: Throwable): Unit = currentStage.onUpstreamFailure(ex, ctx) - - override def onDownstreamFinish(): Unit = currentStage.onDownstreamFinish(ctx) - } - - setHandler(shape.in, handler) - setHandler(shape.out, handler) - } - - private def onSupervision(ex: Throwable): Unit = { - currentStage.decide(ex) match { - case Supervision.Stop ⇒ - failStage(ex) - case Supervision.Resume ⇒ - resetAfterSupervise() - case Supervision.Restart ⇒ - resetAfterSupervise() - currentStage.postStop() - currentStage = currentStage.restart().asInstanceOf[AbstractStage[In, Out, Directive, Directive, Context[Out], LifecycleContext]] - currentStage.preStart(ctx) - } - } - - private def resetAfterSupervise(): Unit = { - val mustPull = currentStage.isDetached || isAvailable(shape.out) - if (!hasBeenPulled(shape.in) && mustPull) pull(shape.in) - } - - override protected[stream] def beforePreStart(): Unit = { - super.beforePreStart() - if (currentStage.isDetached) pull(shape.in) - } - - final override def push(elem: Out): DownstreamDirective = { - push(shape.out, elem) - null - } - - final override def pull(): UpstreamDirective = { - pull(shape.in) - null - } - - final override def finish(): FreeDirective = { - completeStage() - null - } - - final override def pushAndFinish(elem: Out): DownstreamDirective = { - push(shape.out, elem) - completeStage() - null - } - - final override def fail(cause: Throwable): FreeDirective = { - failStage(cause) - null - } - - final override def isFinishing: Boolean = isClosed(shape.in) - - final override def absorbTermination(): TerminationDirective = { - if (isClosed(shape.out)) { - val ex = new UnsupportedOperationException("It is not allowed to call absorbTermination() from onDownstreamFinish.") - // This MUST be logged here, since the downstream has cancelled, i.e. there is no one to send onError to, the - // stage is just about to finish so no one will catch it anyway just the interpreter - - interpreter.log.error(ex.getMessage) - throw ex // We still throw for correctness (although a finish() would also work here) - } - if (isAvailable(shape.out)) currentStage.onPull(ctx) - null - } - - override def pushAndPull(elem: Out): FreeDirective = { - push(shape.out, elem) - pull(shape.in) - null - } - - final override def holdUpstreamAndPush(elem: Out): UpstreamDirective = { - push(shape.out, elem) - null - } - - final override def holdDownstreamAndPull(): DownstreamDirective = { - pull(shape.in) - null - } - - final override def isHoldingDownstream: Boolean = isAvailable(shape.out) - - final override def isHoldingUpstream: Boolean = !(isClosed(shape.in) || hasBeenPulled(shape.in)) - - final override def holdDownstream(): DownstreamDirective = null - - final override def holdUpstream(): UpstreamDirective = null - - override def preStart(): Unit = currentStage.preStart(ctx) - override def postStop(): Unit = currentStage.postStop() - - override def toString: String = s"PushPullGraphLogic($currentStage)" - } - - class PushPullGraphStageWithMaterializedValue[-In, +Out, Ext, +Mat]( - val factory: (Attributes) ⇒ (Stage[In, Out], Mat), - stageAttributes: Attributes) - extends GraphStageWithMaterializedValue[FlowShape[In, Out], Mat] { - - val name = stageAttributes.nameOrDefault() - override def initialAttributes = stageAttributes - val shape = FlowShape(Inlet[In](name + ".in"), Outlet[Out](name + ".out")) - - override def toString = name - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Mat) = { - val stageAndMat = factory(inheritedAttributes) - val stage: AbstractStage[In, Out, Directive, Directive, Context[Out], LifecycleContext] = - stageAndMat._1.asInstanceOf[AbstractStage[In, Out, Directive, Directive, Context[Out], LifecycleContext]] - (new PushPullGraphLogic(shape, inheritedAttributes, stage), stageAndMat._2) - } - } - - class PushPullGraphStage[-In, +Out, Ext](_factory: (Attributes) ⇒ Stage[In, Out], _stageAttributes: Attributes) - extends PushPullGraphStageWithMaterializedValue[In, Out, Ext, NotUsed]((att: Attributes) ⇒ (_factory(att), NotUsed), _stageAttributes) -} - -@deprecated("Please use GraphStage instead.", "2.4.2") -abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, Ctx <: Context[Out], LifeCtx <: LifecycleContext] extends Stage[In, Out] { - - /** - * INTERNAL API - */ - private[stream] def isDetached: Boolean = false - - /** - * User overridable callback. - *

- * It is called before any other method defined on the `Stage`. - * Empty default implementation. - */ - @throws(classOf[Exception]) - def preStart(ctx: LifeCtx): Unit = () - - /** - * `onPush` is called when an element from upstream is available and there is demand from downstream, i.e. - * in `onPush` you are allowed to call [[akka.stream.stage.Context#push]] to emit one element downstream, - * or you can absorb the element by calling [[akka.stream.stage.Context#pull]]. Note that you can only - * emit zero or one element downstream from `onPull`. - * - * To emit more than one element you have to push the remaining elements from [[#onPull]], one-by-one. - * `onPush` is not called again until `onPull` has requested more elements with - * [[akka.stream.stage.Context#pull]]. - */ - def onPush(elem: In, ctx: Ctx): PushD - - /** - * `onPull` is called when there is demand from downstream, i.e. you are allowed to push one element - * downstream with [[akka.stream.stage.Context#push]], or request elements from upstreams with - * [[akka.stream.stage.Context#pull]] - */ - def onPull(ctx: Ctx): PullD - - /** - * `onUpstreamFinish` is called when upstream has signaled that the stream is - * successfully completed. Here you cannot call [[akka.stream.stage.Context#push]], - * because there might not be any demand from downstream. To emit additional elements before - * terminating you can use [[akka.stream.stage.Context#absorbTermination]] and push final elements - * from [[#onPull]]. The stage will then be in finishing state, which can be checked - * with [[akka.stream.stage.Context#isFinishing]]. - * - * By default the finish signal is immediately propagated with [[akka.stream.stage.Context#finish]]. - * - * *IMPORTANT NOTICE:* this signal is not back-pressured, it might arrive from upstream even though - * the last action by this stage was a “push”. - */ - def onUpstreamFinish(ctx: Ctx): TerminationDirective = ctx.finish() - - /** - * `onDownstreamFinish` is called when downstream has canceled. - * - * By default the cancel signal is immediately propagated with [[akka.stream.stage.Context#finish]]. - */ - def onDownstreamFinish(ctx: Ctx): TerminationDirective = ctx.finish() - - /** - * `onUpstreamFailure` is called when upstream has signaled that the stream is completed - * with failure. It is not called if [[#onPull]] or [[#onPush]] of the stage itself - * throws an exception. - * - * Note that elements that were emitted by upstream before the failure happened might - * not have been received by this stage when `onUpstreamFailure` is called, i.e. - * failures are not backpressured and might be propagated as soon as possible. - * - * Here you cannot call [[akka.stream.stage.Context#push]], because there might not - * be any demand from downstream. To emit additional elements before terminating you - * can use [[akka.stream.stage.Context#absorbTermination]] and push final elements - * from [[#onPull]]. The stage will then be in finishing state, which can be checked - * with [[akka.stream.stage.Context#isFinishing]]. - */ - def onUpstreamFailure(cause: Throwable, ctx: Ctx): TerminationDirective = ctx.fail(cause) - - /** - * User overridable callback. - *

- * Is called after the Stages final action is performed. // TODO need better wording here - * Empty default implementation. - */ - @throws(classOf[Exception]) - def postStop(): Unit = () - - /** - * If an exception is thrown from [[#onPush]] this method is invoked to decide how - * to handle the exception. By default this method returns [[Supervision.Stop]]. - * - * If an exception is thrown from [[#onPull]] the stream will always be completed with - * failure, because it is not always possible to recover from that state. - * In concrete stages it is of course possible to use ordinary try-catch-recover inside - * `onPull` when it is know how to recover from such exceptions. - * - */ - def decide(t: Throwable): Supervision.Directive = Supervision.Stop - - /** - * Used to create a fresh instance of the stage after an error resulting in a [[Supervision.Restart]] - * directive. By default it will return the same instance untouched, so you must override it - * if there are any state that should be cleared before restarting, e.g. by returning a new instance. - */ - def restart(): Stage[In, Out] = this -} - -/** - * `PushPullStage` implementations participate in 1-bounded regions. For every external non-completion signal these - * stages produce *exactly one* push or pull signal. - * - * [[#onPush]] is called when an element from upstream is available and there is demand from downstream, i.e. - * in `onPush` you are allowed to call [[Context#push]] to emit one element downstream, or you can absorb the - * element by calling [[Context#pull]]. Note that you can only emit zero or one element downstream from `onPull`. - * To emit more than one element you have to push the remaining elements from [[#onPull]], one-by-one. - * `onPush` is not called again until `onPull` has requested more elements with [[Context#pull]]. - * - * [[StatefulStage]] has support for making it easy to emit more than one element from `onPush`. - * - * [[#onPull]] is called when there is demand from downstream, i.e. you are allowed to push one element - * downstream with [[Context#push]], or request elements from upstreams with [[Context#pull]]. If you - * always perform transitive pull by calling `ctx.pull` from `onPull` you can use [[PushStage]] instead of - * `PushPullStage`. - * - * Stages are allowed to do early completion of downstream and cancel of upstream. This is done with [[Context#finish]], - * which is a combination of cancel/complete. - * - * Since onComplete is not a backpressured signal it is sometimes preferable to push a final element and then - * immediately finish. This combination is exposed as [[Context#pushAndFinish]] which enables stages to - * propagate completion events without waiting for an extra round of pull. - * - * Another peculiarity is how to convert termination events (complete/failure) into elements. The problem - * here is that the termination events are not backpressured while elements are. This means that simply calling - * [[Context#push]] as a response to [[#onUpstreamFinish]] or [[#onUpstreamFailure]] will very likely break boundedness - * and result in a buffer overflow somewhere. Therefore the only allowed command in this case is - * [[Context#absorbTermination]] which stops the propagation of the termination signal, and puts the stage in a - * [[akka.stream.stage.Context#isFinishing]] state. Depending on whether the stage has a pending pull signal it - * has not yet "consumed" by a push its [[#onPull]] handler might be called immediately or later. From - * [[#onPull]] final elements can be pushed before completing downstream with [[Context#finish]] or - * [[Context#pushAndFinish]]. - * - * [[StatefulStage]] has support for making it easy to emit final elements. - * - * All these rules are enforced by types and runtime checks where needed. Always return the `Directive` - * from the call to the [[Context]] method, and do only call [[Context]] commands once per callback. - * - * @see [[DetachedStage]] - * @see [[StatefulStage]] - * @see [[PushStage]] - */ -@deprecated("Please use GraphStage instead.", "2.4.2") -abstract class PushPullStage[In, Out] extends AbstractStage[In, Out, SyncDirective, SyncDirective, Context[Out], LifecycleContext] - -/** - * `PushStage` is a [[PushPullStage]] that always perform transitive pull by calling `ctx.pull` from `onPull`. - */ -@deprecated("Please use GraphStage instead.", "2.4.2") -abstract class PushStage[In, Out] extends PushPullStage[In, Out] { - /** - * Always pulls from upstream. - */ - final override def onPull(ctx: Context[Out]): SyncDirective = ctx.pull() -} - -/** - * `DetachedStage` can be used to implement operations similar to [[akka.stream.scaladsl.FlowOps#buffer buffer]], - * [[akka.stream.scaladsl.FlowOps#expand expand]] and [[akka.stream.scaladsl.FlowOps#conflate conflate]]. - * - * `DetachedStage` implementations are boundaries between 1-bounded regions. This means that they need to enforce the - * "exactly one" property both on their upstream and downstream regions. As a consequence a `DetachedStage` can never - * answer an [[#onPull]] with a [[Context#pull]] or answer an [[#onPush]] with a [[Context#push]] since such an action - * would "steal" the event from one region (resulting in zero signals) and would inject it to the other region - * (resulting in two signals). - * - * However, DetachedStages have the ability to call [[akka.stream.stage.DetachedContext#hold]] as a response to - * [[#onPush]] and [[#onPull]] which temporarily takes the signal off and - * stops execution, at the same time putting the stage in an [[akka.stream.stage.DetachedContext#isHolding]] state. - * If the stage is in a holding state it contains one absorbed signal, therefore in this state the only possible - * command to call is [[akka.stream.stage.DetachedContext#pushAndPull]] which results in two events making the - * balance right again: 1 hold + 1 external event = 2 external event - * - * This mechanism allows synchronization between the upstream and downstream regions which otherwise can progress - * independently. - * - * @see [[PushPullStage]] - */ -@deprecated("Please use GraphStage instead.", "2.4.2") -abstract class DetachedStage[In, Out] - extends AbstractStage[In, Out, UpstreamDirective, DownstreamDirective, DetachedContext[Out], LifecycleContext] { - private[stream] override def isDetached = true - - /** - * If an exception is thrown from [[#onPush]] this method is invoked to decide how - * to handle the exception. By default this method returns [[Supervision.Stop]]. - * - * If an exception is thrown from [[#onPull]] or if the stage is holding state the stream - * will always be completed with failure, because it is not always possible to recover from - * that state. - * In concrete stages it is of course possible to use ordinary try-catch-recover inside - * `onPull` when it is know how to recover from such exceptions. - */ - override def decide(t: Throwable): Supervision.Directive = super.decide(t) -} - -/** - * The behavior of [[StatefulStage]] is defined by these two methods, which - * has the same semantics as corresponding methods in [[PushPullStage]]. - */ -abstract class StageState[In, Out] { - def onPush(elem: In, ctx: Context[Out]): SyncDirective - def onPull(ctx: Context[Out]): SyncDirective = ctx.pull() -} - -/** - * INTERNAL API - */ -private[akka] object StatefulStage { - sealed trait AndThen - case object Finish extends AndThen - final case class Become(state: StageState[Any, Any]) extends AndThen - case object Stay extends AndThen -} - -/** - * `StatefulStage` is a [[PushPullStage]] that provides convenience to make some things easier. - * - * The behavior is defined in [[StageState]] instances. The initial behavior is specified - * by subclass implementing the [[#initial]] method. The behavior can be changed by using [[#become]]. - * - * Use [[#emit]] or [[#emitAndFinish]] to push more than one element from [[StageState#onPush]] or - * [[StageState#onPull]]. - * - * Use [[#terminationEmit]] to push final elements from [[#onUpstreamFinish]] or [[#onUpstreamFailure]]. - */ -@deprecated("StatefulStage is deprecated, please use GraphStage instead.", "2.0-M2") -abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] { - import StatefulStage._ - - /** - * Scala API - */ - abstract class State extends StageState[In, Out] - - private[this] var emitting = false - private[this] var _current: StageState[In, Out] = _ - become(initial) - - /** - * Concrete subclass must return the initial behavior from this method. - * - * **Warning:** This method must not be implemented as `val`. - */ - def initial: StageState[In, Out] - - /** - * Current state. - */ - final def current: StageState[In, Out] = _current - - /** - * Change the behavior to another [[StageState]]. - */ - final def become(state: StageState[In, Out]): Unit = { - require(state ne null, "New state must not be null") - _current = state - } - - /** - * Invokes current state. - */ - final override def onPush(elem: In, ctx: Context[Out]): SyncDirective = _current.onPush(elem, ctx) - /** - * Invokes current state. - */ - final override def onPull(ctx: Context[Out]): SyncDirective = _current.onPull(ctx) - - override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective = - if (emitting) ctx.absorbTermination() - else ctx.finish() - - /** - * Scala API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one - * element downstream. - */ - final def emit(iter: Iterator[Out], ctx: Context[Out]): SyncDirective = emit(iter, ctx, _current) - - /** - * Java API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one - * element downstream. - */ - final def emit(iter: java.util.Iterator[Out], ctx: Context[Out]): SyncDirective = { - import scala.collection.JavaConverters._ - emit(iter.asScala, ctx) - } - - /** - * Scala API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one - * element downstream and after that change behavior. - */ - final def emit(iter: Iterator[Out], ctx: Context[Out], nextState: StageState[In, Out]): SyncDirective = { - if (emitting) throw new IllegalStateException("already in emitting state") - if (iter.isEmpty) { - become(nextState) - ctx.pull() - } else { - val elem = iter.next() - if (iter.hasNext) { - emitting = true - become(emittingState(iter, andThen = Become(nextState.asInstanceOf[StageState[Any, Any]]))) - } else - become(nextState) - ctx.push(elem) - } - } - - /** - * Java API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one - * element downstream and after that change behavior. - */ - final def emit(iter: java.util.Iterator[Out], ctx: Context[Out], nextState: StageState[In, Out]): SyncDirective = { - import scala.collection.JavaConverters._ - emit(iter.asScala, ctx, nextState) - } - - /** - * Scala API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one - * element downstream and after that finish (complete downstreams, cancel upstreams). - */ - final def emitAndFinish(iter: Iterator[Out], ctx: Context[Out]): SyncDirective = { - if (emitting) throw new IllegalStateException("already in emitting state") - if (iter.isEmpty) - ctx.finish() - else { - val elem = iter.next() - if (iter.hasNext) { - emitting = true - become(emittingState(iter, andThen = Finish)) - ctx.push(elem) - } else - ctx.pushAndFinish(elem) - } - } - - /** - * Java API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one - * element downstream and after that finish (complete downstreams, cancel upstreams). - */ - final def emitAndFinish(iter: java.util.Iterator[Out], ctx: Context[Out]): SyncDirective = { - import scala.collection.JavaConverters._ - emitAndFinish(iter.asScala, ctx) - } - - /** - * Scala API: Can be used from [[#onUpstreamFinish]] to push final elements downstream - * before completing the stream successfully. Note that if this is used from - * [[#onUpstreamFailure]] the failure will be absorbed and the stream will be completed - * successfully. - */ - final def terminationEmit(iter: Iterator[Out], ctx: Context[Out]): TerminationDirective = { - if (iter.isEmpty) { - if (emitting) ctx.absorbTermination() - else ctx.finish() - } else { - val nextState = current match { - case es: EmittingState if emitting ⇒ es.copy(iter = es.iter ++ iter) - case _ ⇒ emittingState(iter, andThen = Finish) - } - become(nextState) - ctx.absorbTermination() - } - } - - /** - * Java API: Can be used from [[#onUpstreamFinish]] or [[#onUpstreamFailure]] to push final - * elements downstream. - */ - final def terminationEmit(iter: java.util.Iterator[Out], ctx: Context[Out]): TerminationDirective = { - import scala.collection.JavaConverters._ - terminationEmit(iter.asScala, ctx) - } - - private def emittingState(iter: Iterator[Out], andThen: AndThen) = EmittingState(iter, andThen) - - private case class EmittingState(iter: Iterator[Out], andThen: AndThen) extends State { - override def onPush(elem: In, ctx: Context[Out]) = throw new IllegalStateException("onPush not allowed in emittingState") - override def onPull(ctx: Context[Out]) = { - if (iter.hasNext) { - val elem = iter.next() - if (iter.hasNext) - ctx.push(elem) - else if (!ctx.isFinishing) { - emitting = false - andThen match { - case Stay ⇒ // ok - case Become(newState) ⇒ become(newState.asInstanceOf[StageState[In, Out]]) - case Finish ⇒ ctx.pushAndFinish(elem) - } - ctx.push(elem) - } else - ctx.pushAndFinish(elem) - } else - throw new IllegalStateException("onPull with empty iterator is not expected in emittingState") - } - } - -} - -/** - * Return type from [[Context]] methods. - */ -sealed trait Directive -sealed trait AsyncDirective extends Directive -sealed trait SyncDirective extends Directive -sealed trait UpstreamDirective extends SyncDirective -sealed trait DownstreamDirective extends SyncDirective -sealed trait TerminationDirective extends SyncDirective -// never instantiated -sealed abstract class FreeDirective private () extends UpstreamDirective with DownstreamDirective with TerminationDirective with AsyncDirective - -trait LifecycleContext { - /** - * Returns the Materializer that was used to materialize this [[Stage]]. - * It can be used to materialize sub-flows. - */ - def materializer: Materializer - - /** Returns operation attributes associated with the this Stage */ - def attributes: Attributes -} - -/** - * Passed to the callback methods of [[PushPullStage]] and [[StatefulStage]]. - */ -sealed trait Context[Out] extends LifecycleContext { - /** - * Push one element to downstreams. - */ - def push(elem: Out): DownstreamDirective - /** - * Request for more elements from upstreams. - */ - def pull(): UpstreamDirective - /** - * Cancel upstreams and complete downstreams successfully. - */ - def finish(): FreeDirective - /** - * Push one element to downstream immediately followed by - * cancel of upstreams and complete of downstreams. - */ - def pushAndFinish(elem: Out): DownstreamDirective - /** - * Cancel upstreams and complete downstreams with failure. - */ - def fail(cause: Throwable): FreeDirective - /** - * Puts the stage in a finishing state so that - * final elements can be pushed from `onPull`. - */ - def absorbTermination(): TerminationDirective - - /** - * This returns `true` after [[#absorbTermination]] has been used. - */ - def isFinishing: Boolean - -} - -/** - * Passed to the callback methods of [[DetachedStage]]. - * - * [[#hold]] stops execution and at the same time putting the stage in a holding state. - * If the stage is in a holding state it contains one absorbed signal, therefore in - * this state the only possible command to call is [[#pushAndPull]] which results in two - * events making the balance right again: 1 hold + 1 external event = 2 external event - */ -trait DetachedContext[Out] extends Context[Out] { - def holdUpstream(): UpstreamDirective - def holdUpstreamAndPush(elem: Out): UpstreamDirective - - def holdDownstream(): DownstreamDirective - def holdDownstreamAndPull(): DownstreamDirective - - /** - * This returns `true` when [[#hold]] has been used - * and it is reset to `false` after [[#pushAndPull]]. - */ - def isHoldingBoth: Boolean = isHoldingUpstream && isHoldingDownstream - def isHoldingUpstream: Boolean - def isHoldingDownstream: Boolean - - def pushAndPull(elem: Out): FreeDirective - -} diff --git a/project/MiMa.scala b/project/MiMa.scala index aa873b08c3..056bf84de8 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -1025,6 +1025,50 @@ object MiMa extends AutoPlugin { // #20553 Tree flattening should be separate from Fusing ProblemFilters.exclude[MissingClassProblem]("akka.stream.Fusing$StructuralInfo"), ProblemFilters.exclude[MissingClassProblem]("akka.stream.Fusing$StructuralInfo$") + ), + "2.4.14" -> Seq( + // #21423 removal of deprecated stages (in 2.5.x) + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.Source.transform"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.SubSource.transform"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.Flow.transform"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.SubFlow.transform"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.transformMaterializing"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Flow.transform"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Flow.transformMaterializing"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Flow.andThen"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Source.transform"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Source.transformMaterializing"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Source.andThen"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowOps.transform"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowOps.andThen"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.Directive"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.AsyncDirective"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.TerminationDirective"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.AbstractStage$"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.StatefulStage$Become$"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.AbstractStage$PushPullGraphStage"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.StatefulStage$EmittingState$"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.AbstractStage"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.AbstractStage$PushPullGraphLogic"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.Context"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.Stage"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.StatefulStage$"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.DetachedStage"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.StatefulStage$Become"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.StageState"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.AbstractStage$PushPullGraphStageWithMaterializedValue"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.DownstreamDirective"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.PushPullStage"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.LifecycleContext"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.StatefulStage$EmittingState"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.PushStage"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.DetachedContext"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.StatefulStage"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.StatefulStage$State"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.UpstreamDirective"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.FreeDirective"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.StatefulStage$AndThen"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.SyncDirective") ) ) } diff --git a/project/ValidatePullRequest.scala b/project/ValidatePullRequest.scala index dcf129d61f..165761d636 100644 --- a/project/ValidatePullRequest.scala +++ b/project/ValidatePullRequest.scala @@ -3,7 +3,7 @@ */ package akka -import com.typesafe.tools.mima.plugin.MimaKeys.reportBinaryIssues +import com.typesafe.tools.mima.plugin.MimaKeys.mimaReportBinaryIssues import com.typesafe.tools.mima.plugin.MimaPlugin import net.virtualvoid.sbt.graph.backend.SbtUpdateReport import net.virtualvoid.sbt.graph.DependencyGraphKeys._ @@ -279,7 +279,7 @@ object MimaWithPrValidation extends AutoPlugin { override def trigger = allRequirements override def requires = ValidatePullRequest && MimaPlugin override lazy val projectSettings = Seq( - additionalTasks in ValidatePR += reportBinaryIssues + additionalTasks in ValidatePR += mimaReportBinaryIssues ) }