-str #21423 remove deprecated Stage references (#21919)

* -str #21423 remove deprecated Stage references

* include mima filters for removed classes
This commit is contained in:
Konrad Malawski 2016-12-02 14:23:13 +01:00 committed by GitHub
parent cc46d15381
commit 04ab4ebb9c
23 changed files with 210 additions and 1390 deletions

View file

@ -156,7 +156,7 @@ object Agent {
* participate in that transaction. Agents are integrated with the STM -
* any dispatches made in a transaction are held until that transaction
* commits, and are discarded if it is retried or aborted.
*
*
* @deprecated Agents are deprecated and scheduled for removal in the next major version, use Actors instead.
*/
@deprecated("Agents are deprecated and scheduled for removal in the next major version, use Actors instead.", since = "2.5.0")

View file

@ -4,6 +4,36 @@
Upcoming Migration Guide 2.4.x to 2.5.x
#######################################
Akka Streams
============
Removal of StatefulStage, PushPullStage
---------------------------------------
``StatefulStage`` and ``PushPullStage`` were first introduced in Akka Streams 1.0, and later deprecated
and replaced by ``GraphStage`` in 2.0-M2. The ``GraphStage`` API has all features (and even more) as the
previous APIs and is even nicer to use.
Please refer to the GraphStage documentation :ref:` for Scala <graphstage-scala>` or
the documentation :ref:`for Java <graphstage-scala>`, for details on building custom GraphStages.
``StatefulStage`` would be migrated to a simple ``GraphStage`` that contains some mutable state in its ``GraphStageLogic``,
and ``PushPullStage`` directly translate to graph stages.
Removal of ``Source.transform``, replaced by ``via``
----------------------------------------------------
Along with the removal of ``Stage`` (as described above), the ``transform`` methods creating Flows/Sources/Sinks
from ``Stage`` have been removed. They are replaced by using ``GraphStage`` instances with ``via``, e.g.::
exampleFlow.transform(() => new MyStage())
would now be::
myFlow.via(new MyGraphStage)
as the ``GraphStage`` itself is a factory of logic instances.
Agents
======
@ -19,6 +49,9 @@ We also anticipate to replace the uses of Agents by the upcoming Akka Typed, so
If you use Agents and would like to take over the maintanance thereof, please contact the team on gitter or github.
Akka Persistence
================

View file

@ -3,10 +3,11 @@
*/
package akka.stream.tck
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, Attributes }
import akka.stream.scaladsl.Flow
import akka.stream.stage.{ Context, PushStage }
import org.reactivestreams.{ Processor }
import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler }
import org.reactivestreams.Processor
class TransformProcessorTest extends AkkaIdentityProcessorVerification[Int] {
@ -16,12 +17,16 @@ class TransformProcessorTest extends AkkaIdentityProcessorVerification[Int] {
implicit val materializer = ActorMaterializer(settings)(system)
val mkStage = ()
new PushStage[Int, Int] {
override def onPush(in: Int, ctx: Context[Int]) = ctx.push(in)
val stage =
new SimpleLinearGraphStage[Int] {
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = push(out, grab(in))
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
Flow[Int].transform(mkStage).toProcessor.run()
Flow[Int].via(stage).toProcessor.run()
}
override def createElement(element: Int): Int = element

View file

@ -192,43 +192,50 @@ public class FlowTest extends StreamTest {
final JavaTestKit probe = new JavaTestKit(system);
final Iterable<Integer> input = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7);
// duplicate each element, stop after 4 elements, and emit sum to the end
final Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class).transform(new Creator<Stage<Integer, Integer>>() {
final Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class).via(new GraphStage<FlowShape<Integer, Integer>>() {
public final Inlet<Integer> in = Inlet.create("in");
public final Outlet<Integer> out = Outlet.create("out");
@Override
public PushPullStage<Integer, Integer> create() throws Exception {
return new StatefulStage<Integer, Integer>() {
public GraphStageLogic createLogic(Attributes inheritedAttributes) throws Exception {
return new GraphStageLogic(shape()) {
int sum = 0;
int count = 0;
@Override
public StageState<Integer, Integer> initial() {
return new StageState<Integer, Integer>() {
{
setHandler(in, new AbstractInHandler() {
@Override
public SyncDirective onPush(Integer element, Context<Integer> ctx) {
sum += element;
count += 1;
if (count == 4) {
return emitAndFinish(Arrays.asList(element, element, sum).iterator(), ctx);
} else {
return emit(Arrays.asList(element, element).iterator(), ctx);
}
public void onPush() throws Exception {
final Integer element = grab(in);
sum += element;
count += 1;
if (count == 4) {
emitMultiple(out, Arrays.asList(element, element, sum).iterator(), () -> completeStage());
} else {
emitMultiple(out, Arrays.asList(element, element).iterator());
}
}
};
});
setHandler(out, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
pull(in);
}
});
}
@Override
public TerminationDirective onUpstreamFinish(Context<Integer> ctx) {
return terminationEmit(Collections.singletonList(sum).iterator(), ctx);
}
};
}
});
Source.from(input).via(flow).runForeach(new Procedure<Integer>() {
public void apply(Integer elem) {
probe.getRef().tell(elem, ActorRef.noSender());
@Override
public FlowShape<Integer, Integer> shape() {
return FlowShape.of(in, out);
}
}, materializer);
}
);
Source.from(input).via(flow).runForeach((Procedure<Integer>) elem ->
probe.getRef().tell(elem, ActorRef.noSender()), materializer);
probe.expectMsgEquals(0);
probe.expectMsgEquals(0);
@ -308,34 +315,47 @@ public class FlowTest extends StreamTest {
assertEquals(Arrays.asList(Arrays.asList("A", "B", "C", "."), Arrays.asList("D", "."), Arrays.asList("E", "F")), result);
}
public <T> Creator<Stage<T, T>> op() {
return new akka.japi.function.Creator<Stage<T, T>>() {
public <T> GraphStage<FlowShape<T, T>> op() {
return new GraphStage<FlowShape<T, T>>() {
public final Inlet<T> in = Inlet.create("in");
public final Outlet<T> out = Outlet.create("out");
@Override
public PushPullStage<T, T> create() throws Exception {
return new PushPullStage<T, T>() {
@Override
public SyncDirective onPush(T element, Context<T> ctx) {
return ctx.push(element);
}
@Override
public SyncDirective onPull(Context<T> ctx) {
return ctx.pull();
public GraphStageLogic createLogic(Attributes inheritedAttributes) throws Exception {
return new GraphStageLogic(shape()) {
{
setHandler(in, new AbstractInHandler() {
@Override
public void onPush() throws Exception {
push(out, grab(in));
}
});
setHandler(out, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
pull(in);
}
});
}
};
}
@Override
public FlowShape<T, T> shape() {
return FlowShape.of(in, out);
}
};
}
@Test
public void mustBeAbleToUseMerge() throws Exception {
final Flow<String, String, NotUsed> f1 =
Flow.of(String.class).transform(FlowTest.this.<String> op()).named("f1");
Flow.of(String.class).via(FlowTest.this.op()).named("f1");
final Flow<String, String, NotUsed> f2 =
Flow.of(String.class).transform(FlowTest.this.<String> op()).named("f2");
Flow.of(String.class).via(FlowTest.this.op()).named("f2");
@SuppressWarnings("unused")
final Flow<String, String, NotUsed> f3 =
Flow.of(String.class).transform(FlowTest.this.<String> op()).named("f3");
Flow.of(String.class).via(FlowTest.this.op()).named("f3");
final Source<String, NotUsed> in1 = Source.from(Arrays.asList("a", "b", "c"));
final Source<String, NotUsed> in2 = Source.from(Arrays.asList("d", "e", "f"));
@ -849,11 +869,7 @@ public class FlowTest extends StreamTest {
Integer result =
Source.<Integer>maybe()
.via(Flow.of(Integer.class)
.keepAlive(Duration.create(1, "second"), new Creator<Integer>() {
public Integer create() {
return 0;
}
})
.keepAlive(Duration.create(1, "second"), (Creator<Integer>) () -> 0)
)
.takeWithin(Duration.create(1500, "milliseconds"))
.runWith(Sink.<Integer>head(), materializer)

View file

@ -91,59 +91,6 @@ public class SourceTest extends StreamTest {
probe.expectMsgEquals("Done");
}
@Ignore("StatefulStage to be converted to GraphStage when Java Api is available (#18817)") @Test
public void mustBeAbleToUseTransform() {
final JavaTestKit probe = new JavaTestKit(system);
final Iterable<Integer> input = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7);
// duplicate each element, stop after 4 elements, and emit sum to the end
Source.from(input).transform(new Creator<Stage<Integer, Integer>>() {
@Override
public PushPullStage<Integer, Integer> create() throws Exception {
return new StatefulStage<Integer, Integer>() {
int sum = 0;
int count = 0;
@Override
public StageState<Integer, Integer> initial() {
return new StageState<Integer, Integer>() {
@Override
public SyncDirective onPush(Integer element, Context<Integer> ctx) {
sum += element;
count += 1;
if (count == 4) {
return emitAndFinish(Arrays.asList(element, element, sum).iterator(), ctx);
} else {
return emit(Arrays.asList(element, element).iterator(), ctx);
}
}
};
}
@Override
public TerminationDirective onUpstreamFinish(Context<Integer> ctx) {
return terminationEmit(Collections.singletonList(sum).iterator(), ctx);
}
};
}
}).runForeach(new Procedure<Integer>() {
public void apply(Integer elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}, materializer);
probe.expectMsgEquals(0);
probe.expectMsgEquals(0);
probe.expectMsgEquals(1);
probe.expectMsgEquals(1);
probe.expectMsgEquals(2);
probe.expectMsgEquals(2);
probe.expectMsgEquals(3);
probe.expectMsgEquals(3);
probe.expectMsgEquals(6);
}
@SuppressWarnings("unchecked")
@Test
public void mustBeAbleToUseGroupBy() throws Exception {

View file

@ -6,7 +6,6 @@ package akka.stream.impl.fusing
import akka.NotUsed
import akka.stream.testkit.StreamSpec
import akka.stream.{ OverflowStrategy, Attributes }
import akka.stream.stage.AbstractStage.PushPullGraphStage
import akka.stream.scaladsl.{ Merge, Broadcast, Balance, Zip }
import GraphInterpreter._

View file

@ -8,7 +8,6 @@ import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Supervision.Decider
import akka.stream._
import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, Failed, GraphAssembly, UpstreamBoundaryStageLogic }
import akka.stream.stage.AbstractStage.PushPullGraphStage
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, _ }
import akka.stream.testkit.StreamSpec
import akka.stream.testkit.Utils.TE
@ -330,15 +329,6 @@ trait GraphInterpreterSpecKit extends StreamSpec {
.init()
}
implicit class ToGraphStage[I, O](stage: Stage[I, O]) {
def toGS: PushPullGraphStage[Any, Any, Any] = {
val s = stage
new PushPullGraphStage[Any, Any, Any](
(_) s.asInstanceOf[Stage[Any, Any]],
Attributes.none)
}
}
abstract class OneBoundedSetupWithDecider[T](decider: Decider, _ops: GraphStageWithMaterializedValue[Shape, Any]*) extends Builder {
val ops = _ops.toArray

View file

@ -553,26 +553,6 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
upstream.onNextAndComplete(1)
lastEvents() should be(Set(OnNext(1), OnComplete))
}
//#20386
@deprecated("Usage of PushPullStage is deprecated, please use GraphStage instead", "2.4.5")
class InvalidAbsorbTermination extends PushPullStage[Int, Int] {
override def onPull(ctx: Context[Int]): SyncDirective = ctx.pull()
override def onPush(elem: Int, ctx: Context[Int]): SyncDirective = ctx.push(elem)
override def onDownstreamFinish(ctx: Context[Int]): TerminationDirective = ctx.absorbTermination()
}
// This test must be kept since it tests the compatibility layer, which while is deprecated it is still here.
"not allow absorbTermination from onDownstreamFinish()" in new OneBoundedSetup[Int]((new InvalidAbsorbTermination).toGS) {
lastEvents() should be(Set.empty)
EventFilter[UnsupportedOperationException]("It is not allowed to call absorbTermination() from onDownstreamFinish.", occurrences = 1).intercept {
downstream.cancel()
lastEvents() should be(Set(Cancel))
}
}
}

View file

@ -12,7 +12,6 @@ import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Random
import akka.actor.ActorSystem
import akka.pattern.{ after later }
import akka.stream._
@ -25,6 +24,8 @@ import akka.testkit.EventFilter
import akka.util.ByteString
import javax.net.ssl._
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
object TlsSpec {
val rnd = new Random
@ -351,12 +352,17 @@ class TlsSpec extends StreamSpec("akka.loglevel=INFO\nakka.actor.debug.receive=o
val f =
Source(scenario.inputs)
.via(commPattern.decorateFlow(scenario.leftClosing, scenario.rightClosing, onRHS))
.transform(() new PushStage[SslTlsInbound, SslTlsInbound] {
override def onPush(elem: SslTlsInbound, ctx: Context[SslTlsInbound]) =
ctx.push(elem)
override def onDownstreamFinish(ctx: Context[SslTlsInbound]) = {
system.log.debug("me cancelled")
ctx.finish()
.via(new SimpleLinearGraphStage[SslTlsInbound] {
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
setHandlers(in, out, this)
override def onPush() = push(out, grab(in))
override def onPull() = pull(in)
override def onDownstreamFinish() = {
system.log.debug("me cancelled")
completeStage()
}
}
})
.via(debug)

View file

@ -9,7 +9,6 @@ import akka.stream.Supervision._
import akka.stream.impl._
import akka.stream.impl.fusing.ActorGraphInterpreter
import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly
import akka.stream.stage.AbstractStage.PushPullGraphStage
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import akka.stream._

View file

@ -1,455 +0,0 @@
/**
* Copyright (C) 2014-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.stage._
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.testkit._
import akka.stream.testkit.Utils._
import akka.testkit.{ EventFilter, TestProbe }
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.scaladsl.TestSource
class FlowStageSpec extends StreamSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) {
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 2)
implicit val materializer = ActorMaterializer(settings)
"A Flow with transform operations" must {
"produce one-to-one transformation as expected" in assertAllStagesStopped {
val p = Source(List(1, 2, 3)).runWith(Sink.asPublisher(false))
val p2 = Source.fromPublisher(p).
transform(() new PushStage[Int, Int] {
var tot = 0
override def onPush(elem: Int, ctx: Context[Int]) = {
tot += elem
ctx.push(tot)
}
}).
runWith(Sink.asPublisher(false))
val subscriber = TestSubscriber.manualProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
subscription.request(1)
subscriber.expectNext(1)
subscriber.expectNoMsg(200.millis)
subscription.request(2)
subscriber.expectNext(3)
subscriber.expectNext(6)
subscriber.expectComplete()
}
"produce one-to-several transformation as expected" in assertAllStagesStopped {
val p = Source(List(1, 2, 3)).runWith(Sink.asPublisher(false))
val p2 = Source.fromPublisher(p).
transform(() new StatefulStage[Int, Int] {
var tot = 0
lazy val waitForNext = new State {
override def onPush(elem: Int, ctx: Context[Int]) = {
tot += elem
emit(Iterator.fill(elem)(tot), ctx)
}
}
override def initial = waitForNext
override def onUpstreamFinish(ctx: Context[Int]): TerminationDirective = {
if (current eq waitForNext) ctx.finish()
else ctx.absorbTermination()
}
}).
runWith(Sink.asPublisher(false))
val subscriber = TestSubscriber.manualProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
subscription.request(4)
subscriber.expectNext(1)
subscriber.expectNext(3)
subscriber.expectNext(3)
subscriber.expectNext(6)
subscriber.expectNoMsg(200.millis)
subscription.request(100)
subscriber.expectNext(6)
subscriber.expectNext(6)
subscriber.expectComplete()
}
"produce one-to-several transformation with state change" in {
val p =
Source(List(3, 2, 1, 0, 1, 12)).
transform(() new StatefulStage[Int, Int] {
// a transformer that
// - for the first element, returns n times 42
// - echos the remaining elements (can be reset to the duplication state by getting `0`)
override def initial = inflate
lazy val inflate: State = new State {
override def onPush(elem: Int, ctx: Context[Int]) = {
emit(Iterator.fill(elem)(42), ctx, echo)
}
}
lazy val echo: State = new State {
def onPush(elem: Int, ctx: Context[Int]): SyncDirective =
if (elem == 0) {
become(inflate)
ctx.pull()
} else ctx.push(elem)
}
}).runWith(Sink.asPublisher(false))
val subscriber = TestSubscriber.manualProbe[Int]()
p.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
subscription.request(50)
// inflating: 3 times 42
subscriber.expectNext(42)
subscriber.expectNext(42)
subscriber.expectNext(42)
// echoing
subscriber.expectNext(2)
subscriber.expectNext(1)
// reset
// inflating: 1 times 42
subscriber.expectNext(42)
// echoing
subscriber.expectNext(12)
subscriber.expectComplete()
}
"produce dropping transformation as expected" in {
val p = Source(List(1, 2, 3, 4)).runWith(Sink.asPublisher(false))
val p2 = Source.fromPublisher(p).
transform(() new PushStage[Int, Int] {
var tot = 0
override def onPush(elem: Int, ctx: Context[Int]) = {
tot += elem
if (elem % 2 == 0)
ctx.pull()
else
ctx.push(tot)
}
}).
runWith(Sink.asPublisher(false))
val subscriber = TestSubscriber.manualProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
subscription.request(1)
subscriber.expectNext(1)
subscriber.expectNoMsg(200.millis)
subscription.request(1)
subscriber.expectNext(6)
subscription.request(1)
subscriber.expectComplete()
}
"produce multi-step transformation as expected" in {
val p = Source(List("a", "bc", "def")).runWith(Sink.asPublisher(false))
val p2 = Source.fromPublisher(p).
transform(() new PushStage[String, Int] {
var concat = ""
override def onPush(elem: String, ctx: Context[Int]) = {
concat += elem
ctx.push(concat.length)
}
}).
transform(() new PushStage[Int, Int] {
var tot = 0
override def onPush(length: Int, ctx: Context[Int]) = {
tot += length
ctx.push(tot)
}
}).
runWith(Sink.asPublisher(true))
val c1 = TestSubscriber.manualProbe[Int]()
p2.subscribe(c1)
val sub1 = c1.expectSubscription()
val c2 = TestSubscriber.manualProbe[Int]()
p2.subscribe(c2)
val sub2 = c2.expectSubscription()
sub1.request(1)
sub2.request(2)
c1.expectNext(1)
c2.expectNext(1)
c2.expectNext(4)
c1.expectNoMsg(200.millis)
sub1.request(2)
sub2.request(2)
c1.expectNext(4)
c1.expectNext(10)
c2.expectNext(10)
c1.expectComplete()
c2.expectComplete()
}
"support emit onUpstreamFinish" in assertAllStagesStopped {
val p = Source(List("a")).runWith(Sink.asPublisher(false))
val p2 = Source.fromPublisher(p).
transform(() new StatefulStage[String, String] {
var s = ""
override def initial = new State {
override def onPush(element: String, ctx: Context[String]) = {
s += element
ctx.pull()
}
}
override def onUpstreamFinish(ctx: Context[String]) =
terminationEmit(Iterator.single(s + "B"), ctx)
}).
runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p2.subscribe(c)
val s = c.expectSubscription()
s.request(1)
c.expectNext("aB")
c.expectComplete()
}
"allow early finish" in assertAllStagesStopped {
val (p1, p2) = TestSource.probe[Int].
transform(() new PushStage[Int, Int] {
var s = ""
override def onPush(element: Int, ctx: Context[Int]) = {
s += element
if (s == "1")
ctx.pushAndFinish(element)
else
ctx.push(element)
}
})
.toMat(TestSink.probe[Int])(Keep.both).run
p2.request(10)
p1.sendNext(1)
.sendNext(2)
p2.expectNext(1)
.expectComplete()
p1.expectCancellation()
}
"report error when exception is thrown" in assertAllStagesStopped {
val p = Source(List(1, 2, 3)).runWith(Sink.asPublisher(false))
val p2 = Source.fromPublisher(p).
transform(() new StatefulStage[Int, Int] {
override def initial = new State {
override def onPush(elem: Int, ctx: Context[Int]) = {
if (elem == 2) {
throw new IllegalArgumentException("two not allowed")
} else {
emit(Iterator(elem, elem), ctx)
}
}
}
}).
runWith(TestSink.probe[Int])
EventFilter[IllegalArgumentException]("two not allowed") intercept {
p2.request(100)
.expectNext(1)
.expectNext(1)
.expectError().getMessage should be("two not allowed")
p2.expectNoMsg(200.millis)
}
}
"support emit of final elements when onUpstreamFailure" in assertAllStagesStopped {
val p = Source(List(1, 2, 3)).runWith(Sink.asPublisher(false))
val p2 = Source.fromPublisher(p).
map(elem if (elem == 2) throw new IllegalArgumentException("two not allowed") else elem).
transform(() new StatefulStage[Int, Int] {
override def initial = new State {
override def onPush(elem: Int, ctx: Context[Int]) = ctx.push(elem)
}
override def onUpstreamFailure(cause: Throwable, ctx: Context[Int]) = {
terminationEmit(Iterator(100, 101), ctx)
}
}).
filter(elem elem != 1). // it's undefined if element 1 got through before the error or not
runWith(TestSink.probe[Int])
EventFilter[IllegalArgumentException]("two not allowed") intercept {
p2.request(100)
.expectNext(100)
.expectNext(101)
.expectComplete()
.expectNoMsg(200.millis)
}
}
"support cancel as expected" in assertAllStagesStopped {
val p = Source(1 to 100).runWith(Sink.asPublisher(false))
val received = Source.fromPublisher(p).
transform(() new StatefulStage[Int, Int] {
override def initial = new State {
override def onPush(elem: Int, ctx: Context[Int]) =
emit(Iterator(elem, elem), ctx)
}
})
.runWith(TestSink.probe[Int])
.request(1000)
.expectNext(1)
.cancel()
.receiveWithin(1.second)
received.size should be < 200
received.foldLeft((true, 1)) {
case ((flag, last), next) (flag && (last == next || last == next - 1), next)
}._1 should be(true)
}
"support producing elements from empty inputs" in assertAllStagesStopped {
val p = Source(List.empty[Int]).runWith(Sink.asPublisher(false))
Source.fromPublisher(p).
transform(() new StatefulStage[Int, Int] {
override def initial = new State {
override def onPush(elem: Int, ctx: Context[Int]) = ctx.pull()
}
override def onUpstreamFinish(ctx: Context[Int]) =
terminationEmit(Iterator(1, 2, 3), ctx)
})
.runWith(TestSink.probe[Int])
.request(4)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectComplete()
}
"support converting onComplete into onError" in {
Source(List(5, 1, 2, 3)).transform(() new PushStage[Int, Int] {
var expectedNumberOfElements: Option[Int] = None
var count = 0
override def onPush(elem: Int, ctx: Context[Int]) =
if (expectedNumberOfElements.isEmpty) {
expectedNumberOfElements = Some(elem)
ctx.pull()
} else {
count += 1
ctx.push(elem)
}
override def onUpstreamFinish(ctx: Context[Int]) =
expectedNumberOfElements match {
case Some(expected) if count != expected
throw new RuntimeException(s"Expected $expected, got $count") with NoStackTrace
case _ ctx.finish()
}
}).runWith(TestSink.probe[Int])
.request(10)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectError().getMessage should be("Expected 5, got 3")
}
"be safe to reuse" in {
val flow = Source(1 to 3).transform(()
new PushStage[Int, Int] {
var count = 0
override def onPush(elem: Int, ctx: Context[Int]) = {
count += 1
ctx.push(count)
}
})
flow.runWith(TestSink.probe[Int])
.request(3)
.expectNext(1, 2, 3)
.expectComplete()
flow.runWith(TestSink.probe[Int])
.request(3)
.expectNext(1, 2, 3)
.expectComplete()
}
"handle early cancelation" in assertAllStagesStopped {
val onDownstreamFinishProbe = TestProbe()
val down = TestSubscriber.manualProbe[Int]()
val s = Source.asSubscriber[Int].
transform(() new PushStage[Int, Int] {
override def onPush(elem: Int, ctx: Context[Int]) =
ctx.push(elem)
override def onDownstreamFinish(ctx: Context[Int]): TerminationDirective = {
onDownstreamFinishProbe.ref ! "onDownstreamFinish"
ctx.finish()
}
}).
to(Sink.fromSubscriber(down)).run()
val downstream = down.expectSubscription()
downstream.cancel()
onDownstreamFinishProbe.expectMsg("onDownstreamFinish")
val up = TestPublisher.manualProbe[Int]()
up.subscribe(s)
val upsub = up.expectSubscription()
upsub.expectCancellation()
}
"not trigger onUpstreamFinished after pushAndFinish" in assertAllStagesStopped {
val in = TestPublisher.manualProbe[Int]()
val flow =
Source.fromPublisher(in)
.transform(() new StatefulStage[Int, Int] {
def initial: StageState[Int, Int] = new State {
override def onPush(element: Int, ctx: Context[Int]) =
ctx.pushAndFinish(element)
}
override def onUpstreamFinish(ctx: Context[Int]): TerminationDirective =
terminationEmit(Iterator(42), ctx)
})
.runWith(Sink.asPublisher(false))
val inSub = in.expectSubscription()
val out = TestSubscriber.manualProbe[Int]()
flow.subscribe(out)
val outSub = out.expectSubscription()
inSub.sendNext(23)
inSub.sendComplete()
outSub.request(1) // it depends on this line, i.e. generating backpressure between buffer and stage execution
out.expectNext(23)
out.expectComplete()
}
"chain elements to currently emitting on upstream finish" in assertAllStagesStopped {
Source.single("hi")
.transform(() new StatefulStage[String, String] {
override def initial = new State {
override def onPush(elem: String, ctx: Context[String]) =
emit(Iterator(elem + "1", elem + "2"), ctx)
}
override def onUpstreamFinish(ctx: Context[String]) = {
terminationEmit(Iterator("byebye"), ctx)
}
})
.runWith(TestSink.probe[String])
.request(1)
.expectNext("hi1")
.request(2)
.expectNext("hi2")
.expectNext("byebye")
.expectComplete()
}
}
}

View file

@ -5,7 +5,7 @@ package akka.stream.scaladsl
import akka.NotUsed
import akka.stream.impl.fusing.GraphStages
import akka.stream.{ ActorMaterializer, ClosedShape, FlowShape, OverflowStrategy }
import akka.stream._
import akka.stream.testkit._
import akka.stream.stage._
@ -19,21 +19,26 @@ class GraphDSLCompileSpec extends StreamSpec {
implicit val materializer = ActorMaterializer()
def op[In, Out]: () PushStage[In, Out] = { ()
new PushStage[In, Out] {
override def onPush(elem: In, ctx: Context[Out]): SyncDirective =
ctx.push(elem.asInstanceOf[Out])
def op[In, Out] = new GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("op.in")
val out = Outlet[Out]("op.out")
override val shape = FlowShape[In, Out](in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush() = push(out, grab(in).asInstanceOf[Out])
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
val apples = () Iterator.continually(new Apple)
val f1 = Flow[String].transform(op[String, String]).named("f1")
val f2 = Flow[String].transform(op[String, String]).named("f2")
val f3 = Flow[String].transform(op[String, String]).named("f3")
val f4 = Flow[String].transform(op[String, String]).named("f4")
val f5 = Flow[String].transform(op[String, String]).named("f5")
val f6 = Flow[String].transform(op[String, String]).named("f6")
val f1 = Flow[String].via(op[String, String]).named("f1")
val f2 = Flow[String].via(op[String, String]).named("f2")
val f3 = Flow[String].via(op[String, String]).named("f3")
val f4 = Flow[String].via(op[String, String]).named("f4")
val f5 = Flow[String].via(op[String, String]).named("f5")
val f6 = Flow[String].via(op[String, String]).named("f6")
val in1 = Source(List("a", "b", "c"))
val in2 = Source(List("d", "e", "f"))
@ -169,7 +174,7 @@ class GraphDSLCompileSpec extends StreamSpec {
val out2 = Sink.asPublisher[String](false)
val out9 = Sink.asPublisher[String](false)
val out10 = Sink.asPublisher[String](false)
def f(s: String) = Flow[String].transform(op[String, String]).named(s)
def f(s: String) = Flow[String].via(op[String, String]).named(s)
import GraphDSL.Implicits._
in7 ~> f("a") ~> b7 ~> f("b") ~> m11 ~> f("c") ~> b11 ~> f("d") ~> out2

View file

@ -7,8 +7,6 @@ import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Attributes._
import akka.stream.Supervision.Decider
import akka.stream._
import akka.stream.stage.AbstractStage.PushPullGraphStage
import akka.stream.stage.Stage
/**
* INTERNAL API
@ -136,24 +134,4 @@ object Stages {
import DefaultAttributes._
/*
* Stage that is backed by a GraphStage but can be symbolically introspected
*/
case class SymbolicGraphStage[-In, +Out, Ext](symbolicStage: SymbolicStage[In, Out])
extends PushPullGraphStage[In, Out, Ext](
symbolicStage.create,
symbolicStage.attributes) {
}
sealed trait SymbolicStage[-In, +Out] {
def attributes: Attributes
def create(effectiveAttributes: Attributes): Stage[In, Out]
// FIXME: No supervision hooked in yet.
protected def supervision(attributes: Attributes): Decider =
attributes.get[SupervisionStrategy](SupervisionStrategy(Supervision.stoppingDecider)).decider
}
}

View file

@ -3,10 +3,9 @@
*/
package akka.stream.impl.fusing
import akka.event.{ NoLogging }
import akka.event.NoLogging
import akka.stream._
import akka.stream.impl.fusing.GraphInterpreter.{ GraphAssembly, DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic }
import akka.stream.stage.AbstractStage.PushPullGraphStage
import akka.stream.stage._
import java.{ util ju }

View file

@ -8,7 +8,6 @@ import akka.event.LoggingAdapter
import akka.japi.{ Pair, function }
import akka.stream.impl.{ ConstantFun, StreamLayout }
import akka.stream._
import akka.stream.stage.Stage
import org.reactivestreams.Processor
import scala.annotation.unchecked.uncheckedVariance
@ -1098,15 +1097,6 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def buffer(size: Int, overflowStrategy: OverflowStrategy): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.buffer(size, overflowStrategy))
/**
* Generic transformation of a stream with a custom processing [[akka.stream.stage.Stage]].
* This operator makes it possible to extend the `Flow` API when there is no specialized
* operator that performs the transformation.
*/
@deprecated("Use via(GraphStage) instead.", "2.4.3")
def transform[U](mkStage: function.Creator[Stage[Out, U]]): javadsl.Flow[In, U, Mat] =
new Flow(delegate.transform(() mkStage.create()))
/**
* Takes up to `n` elements from the stream (less than `n` if the upstream completes before emitting `n` elements)
* and returns a pair containing a strict sequence of the taken element

View file

@ -11,7 +11,6 @@ import akka.event.LoggingAdapter
import akka.japi.{ Pair, Util, function }
import akka.stream._
import akka.stream.impl.{ ConstantFun, StreamLayout, SourceQueueAdapter }
import akka.stream.stage.Stage
import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.JavaConverters._
@ -1700,15 +1699,6 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
def buffer(size: Int, overflowStrategy: OverflowStrategy): javadsl.Source[Out, Mat] =
new Source(delegate.buffer(size, overflowStrategy))
/**
* Generic transformation of a stream with a custom processing [[akka.stream.stage.Stage]].
* This operator makes it possible to extend the `Flow` API when there is no specialized
* operator that performs the transformation.
*/
@deprecated("Use via(GraphStage) instead.", "2.4.3")
def transform[U](mkStage: function.Creator[Stage[Out, U]]): javadsl.Source[U, Mat] =
new Source(delegate.transform(() mkStage.create()))
/**
* Takes up to `n` elements from the stream (less than `n` if the upstream completes before emitting `n` elements)
* and returns a pair containing a strict sequence of the taken element

View file

@ -8,7 +8,6 @@ import akka.event.LoggingAdapter
import akka.japi.function
import akka.stream._
import akka.stream.impl.ConstantFun
import akka.stream.stage.Stage
import scala.collection.JavaConverters._
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.duration.FiniteDuration
@ -921,15 +920,6 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
def buffer(size: Int, overflowStrategy: OverflowStrategy): SubFlow[In, Out, Mat] =
new SubFlow(delegate.buffer(size, overflowStrategy))
/**
* Generic transformation of a stream with a custom processing [[akka.stream.stage.Stage]].
* This operator makes it possible to extend the `Flow` API when there is no specialized
* operator that performs the transformation.
*/
@deprecated("Use via(GraphStage) instead.", "2.4.3")
def transform[U](mkStage: function.Creator[Stage[Out, U]]): SubFlow[In, U, Mat] =
new SubFlow(delegate.transform(() mkStage.create()))
/**
* Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements)
* and returns a pair containing a strict sequence of the taken element

View file

@ -8,7 +8,6 @@ import akka.event.LoggingAdapter
import akka.japi.function
import akka.stream._
import akka.stream.impl.ConstantFun
import akka.stream.stage.Stage
import scala.collection.JavaConverters._
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.duration.FiniteDuration
@ -919,15 +918,6 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
def buffer(size: Int, overflowStrategy: OverflowStrategy): SubSource[Out, Mat] =
new SubSource(delegate.buffer(size, overflowStrategy))
/**
* Generic transformation of a stream with a custom processing [[akka.stream.stage.Stage]].
* This operator makes it possible to extend the `Flow` API when there is no specialized
* operator that performs the transformation.
*/
@deprecated("Use via(GraphStage) instead.", "2.4.3")
def transform[U](mkStage: function.Creator[Stage[Out, U]]): SubSource[U, Mat] =
new SubSource(delegate.transform(() mkStage.create()))
/**
* Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements)
* and returns a pair containing a strict sequence of the taken element

View file

@ -9,7 +9,6 @@ import akka.Done
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl._
import akka.stream.impl.fusing._
import akka.stream.stage.AbstractStage.{ PushPullGraphStage, PushPullGraphStageWithMaterializedValue }
import akka.stream.stage._
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
import scala.annotation.unchecked.uncheckedVariance
@ -1210,15 +1209,6 @@ trait FlowOps[+Out, +Mat] {
*/
def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out] = via(fusing.Buffer(size, overflowStrategy))
/**
* Generic transformation of a stream with a custom processing [[akka.stream.stage.Stage]].
* This operator makes it possible to extend the `Flow` API when there is no specialized
* operator that performs the transformation.
*/
@deprecated("Use via(GraphStage) instead.", "2.4.3")
def transform[T](mkStage: () Stage[Out, T]): Repr[T] =
via(new PushPullGraphStage((attr) mkStage(), Attributes.none))
/**
* Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements)
* and returns a pair containing a strict sequence of the taken element
@ -1965,9 +1955,6 @@ trait FlowOps[+Out, +Mat] {
*/
def async: Repr[Out]
/** INTERNAL API */
private[scaladsl] def andThen[T](op: SymbolicStage[Out, T]): Repr[T] =
via(SymbolicGraphStage(op))
}
/**
@ -2200,10 +2187,4 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
def monitor[Mat2]()(combine: (Mat, FlowMonitor[Out]) Mat2): ReprMat[Out, Mat2] =
viaMat(GraphStages.monitor)(combine)
/**
* INTERNAL API.
*/
private[akka] def transformMaterializing[T, M](mkStageAndMaterialized: () (Stage[Out, T], M)): ReprMat[T, M] =
viaMat(new PushPullGraphStageWithMaterializedValue[Out, T, NotUsed, M]((attr) mkStageAndMaterialized(), Attributes.none))(Keep.right)
}

View file

@ -7,7 +7,8 @@ import java.nio.ByteOrder
import akka.NotUsed
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.{ Attributes, Inlet, Outlet, FlowShape }
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import akka.stream.stage._
import akka.util.{ ByteIterator, ByteString }
@ -90,17 +91,7 @@ object Framing {
* Protocol encoder that is used by [[Framing#simpleFramingProtocol]]
*/
def simpleFramingProtocolEncoder(maximumMessageLength: Int): Flow[ByteString, ByteString, NotUsed] =
Flow[ByteString].transform(() new PushStage[ByteString, ByteString] {
override def onPush(message: ByteString, ctx: Context[ByteString]): SyncDirective = {
val msgSize = message.size
if (msgSize > maximumMessageLength)
ctx.fail(new FramingException(s"Maximum allowed message size is $maximumMessageLength but tried to send $msgSize bytes"))
else {
val header = ByteString((msgSize >> 24) & 0xFF, (msgSize >> 16) & 0xFF, (msgSize >> 8) & 0xFF, msgSize & 0xFF)
ctx.push(header ++ message)
}
}
})
Flow[ByteString].via(new SimpleFramingProtocolEncoder(maximumMessageLength))
class FramingException(msg: String) extends RuntimeException(msg)
@ -128,6 +119,26 @@ object Framing {
decoded & Mask
}
private class SimpleFramingProtocolEncoder(maximumMessageLength: Long) extends SimpleLinearGraphStage[ByteString] {
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
setHandlers(in, out, this)
override def onPush(): Unit = {
val message = grab(in)
val msgSize = message.size
if (msgSize > maximumMessageLength)
failStage(new FramingException(s"Maximum allowed message size is $maximumMessageLength but tried to send $msgSize bytes"))
else {
val header = ByteString((msgSize >> 24) & 0xFF, (msgSize >> 16) & 0xFF, (msgSize >> 8) & 0xFF, msgSize & 0xFF)
push(out, header ++ message)
}
}
override def onPull(): Unit = pull(in)
}
}
private class DelimiterFramingStage(val separatorBytes: ByteString, val maximumLineBytes: Int, val allowTruncation: Boolean)
extends GraphStage[FlowShape[ByteString, ByteString]] {

View file

@ -1,678 +0,0 @@
/**
* Copyright (C) 2014-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.stage
import akka.NotUsed
import akka.stream._
import scala.util.control.NonFatal
/**
* General interface for stream transformation.
*
* Custom `Stage` implementations are intended to be used with
* [[akka.stream.scaladsl.FlowOps#transform]] or
* [[akka.stream.javadsl.Flow#transform]] to extend the `Flow` API when there
* is no specialized operator that performs the transformation.
*
* Custom implementations are subclasses of [[PushPullStage]] or
* [[DetachedStage]]. Sometimes it is convenient to extend
* [[StatefulStage]] for support of become like behavior.
*
* It is possible to keep state in the concrete `Stage` instance with
* ordinary instance variables. The `Transformer` is executed by an actor and
* therefore you do not have to add any additional thread safety or memory
* visibility constructs to access the state from the callback methods.
*
* @see [[akka.stream.scaladsl.Flow#transform]]
* @see [[akka.stream.javadsl.Flow#transform]]
*/
@deprecated("Please use GraphStage instead.", "2.4.2")
sealed trait Stage[-In, +Out]
/**
* INTERNAL API
*/
object AbstractStage {
private class PushPullGraphLogic[In, Out](
private val shape: FlowShape[In, Out],
val attributes: Attributes,
val stage: AbstractStage[In, Out, Directive, Directive, Context[Out], LifecycleContext])
extends GraphStageLogic(shape) with DetachedContext[Out] {
final override def materializer: Materializer = interpreter.materializer
private def ctx: DetachedContext[Out] = this
private var currentStage: AbstractStage[In, Out, Directive, Directive, Context[Out], LifecycleContext] = stage
{
// No need to refer to the handler in a private val
val handler = new InHandler with OutHandler {
override def onPush(): Unit =
try { currentStage.onPush(grab(shape.in), ctx) } catch { case NonFatal(ex) onSupervision(ex) }
override def onPull(): Unit = currentStage.onPull(ctx)
override def onUpstreamFinish(): Unit = currentStage.onUpstreamFinish(ctx)
override def onUpstreamFailure(ex: Throwable): Unit = currentStage.onUpstreamFailure(ex, ctx)
override def onDownstreamFinish(): Unit = currentStage.onDownstreamFinish(ctx)
}
setHandler(shape.in, handler)
setHandler(shape.out, handler)
}
private def onSupervision(ex: Throwable): Unit = {
currentStage.decide(ex) match {
case Supervision.Stop
failStage(ex)
case Supervision.Resume
resetAfterSupervise()
case Supervision.Restart
resetAfterSupervise()
currentStage.postStop()
currentStage = currentStage.restart().asInstanceOf[AbstractStage[In, Out, Directive, Directive, Context[Out], LifecycleContext]]
currentStage.preStart(ctx)
}
}
private def resetAfterSupervise(): Unit = {
val mustPull = currentStage.isDetached || isAvailable(shape.out)
if (!hasBeenPulled(shape.in) && mustPull) pull(shape.in)
}
override protected[stream] def beforePreStart(): Unit = {
super.beforePreStart()
if (currentStage.isDetached) pull(shape.in)
}
final override def push(elem: Out): DownstreamDirective = {
push(shape.out, elem)
null
}
final override def pull(): UpstreamDirective = {
pull(shape.in)
null
}
final override def finish(): FreeDirective = {
completeStage()
null
}
final override def pushAndFinish(elem: Out): DownstreamDirective = {
push(shape.out, elem)
completeStage()
null
}
final override def fail(cause: Throwable): FreeDirective = {
failStage(cause)
null
}
final override def isFinishing: Boolean = isClosed(shape.in)
final override def absorbTermination(): TerminationDirective = {
if (isClosed(shape.out)) {
val ex = new UnsupportedOperationException("It is not allowed to call absorbTermination() from onDownstreamFinish.")
// This MUST be logged here, since the downstream has cancelled, i.e. there is no one to send onError to, the
// stage is just about to finish so no one will catch it anyway just the interpreter
interpreter.log.error(ex.getMessage)
throw ex // We still throw for correctness (although a finish() would also work here)
}
if (isAvailable(shape.out)) currentStage.onPull(ctx)
null
}
override def pushAndPull(elem: Out): FreeDirective = {
push(shape.out, elem)
pull(shape.in)
null
}
final override def holdUpstreamAndPush(elem: Out): UpstreamDirective = {
push(shape.out, elem)
null
}
final override def holdDownstreamAndPull(): DownstreamDirective = {
pull(shape.in)
null
}
final override def isHoldingDownstream: Boolean = isAvailable(shape.out)
final override def isHoldingUpstream: Boolean = !(isClosed(shape.in) || hasBeenPulled(shape.in))
final override def holdDownstream(): DownstreamDirective = null
final override def holdUpstream(): UpstreamDirective = null
override def preStart(): Unit = currentStage.preStart(ctx)
override def postStop(): Unit = currentStage.postStop()
override def toString: String = s"PushPullGraphLogic($currentStage)"
}
class PushPullGraphStageWithMaterializedValue[-In, +Out, Ext, +Mat](
val factory: (Attributes) (Stage[In, Out], Mat),
stageAttributes: Attributes)
extends GraphStageWithMaterializedValue[FlowShape[In, Out], Mat] {
val name = stageAttributes.nameOrDefault()
override def initialAttributes = stageAttributes
val shape = FlowShape(Inlet[In](name + ".in"), Outlet[Out](name + ".out"))
override def toString = name
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Mat) = {
val stageAndMat = factory(inheritedAttributes)
val stage: AbstractStage[In, Out, Directive, Directive, Context[Out], LifecycleContext] =
stageAndMat._1.asInstanceOf[AbstractStage[In, Out, Directive, Directive, Context[Out], LifecycleContext]]
(new PushPullGraphLogic(shape, inheritedAttributes, stage), stageAndMat._2)
}
}
class PushPullGraphStage[-In, +Out, Ext](_factory: (Attributes) Stage[In, Out], _stageAttributes: Attributes)
extends PushPullGraphStageWithMaterializedValue[In, Out, Ext, NotUsed]((att: Attributes) (_factory(att), NotUsed), _stageAttributes)
}
@deprecated("Please use GraphStage instead.", "2.4.2")
abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, Ctx <: Context[Out], LifeCtx <: LifecycleContext] extends Stage[In, Out] {
/**
* INTERNAL API
*/
private[stream] def isDetached: Boolean = false
/**
* User overridable callback.
* <p/>
* It is called before any other method defined on the `Stage`.
* Empty default implementation.
*/
@throws(classOf[Exception])
def preStart(ctx: LifeCtx): Unit = ()
/**
* `onPush` is called when an element from upstream is available and there is demand from downstream, i.e.
* in `onPush` you are allowed to call [[akka.stream.stage.Context#push]] to emit one element downstream,
* or you can absorb the element by calling [[akka.stream.stage.Context#pull]]. Note that you can only
* emit zero or one element downstream from `onPull`.
*
* To emit more than one element you have to push the remaining elements from [[#onPull]], one-by-one.
* `onPush` is not called again until `onPull` has requested more elements with
* [[akka.stream.stage.Context#pull]].
*/
def onPush(elem: In, ctx: Ctx): PushD
/**
* `onPull` is called when there is demand from downstream, i.e. you are allowed to push one element
* downstream with [[akka.stream.stage.Context#push]], or request elements from upstreams with
* [[akka.stream.stage.Context#pull]]
*/
def onPull(ctx: Ctx): PullD
/**
* `onUpstreamFinish` is called when upstream has signaled that the stream is
* successfully completed. Here you cannot call [[akka.stream.stage.Context#push]],
* because there might not be any demand from downstream. To emit additional elements before
* terminating you can use [[akka.stream.stage.Context#absorbTermination]] and push final elements
* from [[#onPull]]. The stage will then be in finishing state, which can be checked
* with [[akka.stream.stage.Context#isFinishing]].
*
* By default the finish signal is immediately propagated with [[akka.stream.stage.Context#finish]].
*
* *IMPORTANT NOTICE:* this signal is not back-pressured, it might arrive from upstream even though
* the last action by this stage was a push.
*/
def onUpstreamFinish(ctx: Ctx): TerminationDirective = ctx.finish()
/**
* `onDownstreamFinish` is called when downstream has canceled.
*
* By default the cancel signal is immediately propagated with [[akka.stream.stage.Context#finish]].
*/
def onDownstreamFinish(ctx: Ctx): TerminationDirective = ctx.finish()
/**
* `onUpstreamFailure` is called when upstream has signaled that the stream is completed
* with failure. It is not called if [[#onPull]] or [[#onPush]] of the stage itself
* throws an exception.
*
* Note that elements that were emitted by upstream before the failure happened might
* not have been received by this stage when `onUpstreamFailure` is called, i.e.
* failures are not backpressured and might be propagated as soon as possible.
*
* Here you cannot call [[akka.stream.stage.Context#push]], because there might not
* be any demand from downstream. To emit additional elements before terminating you
* can use [[akka.stream.stage.Context#absorbTermination]] and push final elements
* from [[#onPull]]. The stage will then be in finishing state, which can be checked
* with [[akka.stream.stage.Context#isFinishing]].
*/
def onUpstreamFailure(cause: Throwable, ctx: Ctx): TerminationDirective = ctx.fail(cause)
/**
* User overridable callback.
* <p/>
* Is called after the Stages final action is performed. // TODO need better wording here
* Empty default implementation.
*/
@throws(classOf[Exception])
def postStop(): Unit = ()
/**
* If an exception is thrown from [[#onPush]] this method is invoked to decide how
* to handle the exception. By default this method returns [[Supervision.Stop]].
*
* If an exception is thrown from [[#onPull]] the stream will always be completed with
* failure, because it is not always possible to recover from that state.
* In concrete stages it is of course possible to use ordinary try-catch-recover inside
* `onPull` when it is know how to recover from such exceptions.
*
*/
def decide(t: Throwable): Supervision.Directive = Supervision.Stop
/**
* Used to create a fresh instance of the stage after an error resulting in a [[Supervision.Restart]]
* directive. By default it will return the same instance untouched, so you must override it
* if there are any state that should be cleared before restarting, e.g. by returning a new instance.
*/
def restart(): Stage[In, Out] = this
}
/**
* `PushPullStage` implementations participate in 1-bounded regions. For every external non-completion signal these
* stages produce *exactly one* push or pull signal.
*
* [[#onPush]] is called when an element from upstream is available and there is demand from downstream, i.e.
* in `onPush` you are allowed to call [[Context#push]] to emit one element downstream, or you can absorb the
* element by calling [[Context#pull]]. Note that you can only emit zero or one element downstream from `onPull`.
* To emit more than one element you have to push the remaining elements from [[#onPull]], one-by-one.
* `onPush` is not called again until `onPull` has requested more elements with [[Context#pull]].
*
* [[StatefulStage]] has support for making it easy to emit more than one element from `onPush`.
*
* [[#onPull]] is called when there is demand from downstream, i.e. you are allowed to push one element
* downstream with [[Context#push]], or request elements from upstreams with [[Context#pull]]. If you
* always perform transitive pull by calling `ctx.pull` from `onPull` you can use [[PushStage]] instead of
* `PushPullStage`.
*
* Stages are allowed to do early completion of downstream and cancel of upstream. This is done with [[Context#finish]],
* which is a combination of cancel/complete.
*
* Since onComplete is not a backpressured signal it is sometimes preferable to push a final element and then
* immediately finish. This combination is exposed as [[Context#pushAndFinish]] which enables stages to
* propagate completion events without waiting for an extra round of pull.
*
* Another peculiarity is how to convert termination events (complete/failure) into elements. The problem
* here is that the termination events are not backpressured while elements are. This means that simply calling
* [[Context#push]] as a response to [[#onUpstreamFinish]] or [[#onUpstreamFailure]] will very likely break boundedness
* and result in a buffer overflow somewhere. Therefore the only allowed command in this case is
* [[Context#absorbTermination]] which stops the propagation of the termination signal, and puts the stage in a
* [[akka.stream.stage.Context#isFinishing]] state. Depending on whether the stage has a pending pull signal it
* has not yet "consumed" by a push its [[#onPull]] handler might be called immediately or later. From
* [[#onPull]] final elements can be pushed before completing downstream with [[Context#finish]] or
* [[Context#pushAndFinish]].
*
* [[StatefulStage]] has support for making it easy to emit final elements.
*
* All these rules are enforced by types and runtime checks where needed. Always return the `Directive`
* from the call to the [[Context]] method, and do only call [[Context]] commands once per callback.
*
* @see [[DetachedStage]]
* @see [[StatefulStage]]
* @see [[PushStage]]
*/
@deprecated("Please use GraphStage instead.", "2.4.2")
abstract class PushPullStage[In, Out] extends AbstractStage[In, Out, SyncDirective, SyncDirective, Context[Out], LifecycleContext]
/**
* `PushStage` is a [[PushPullStage]] that always perform transitive pull by calling `ctx.pull` from `onPull`.
*/
@deprecated("Please use GraphStage instead.", "2.4.2")
abstract class PushStage[In, Out] extends PushPullStage[In, Out] {
/**
* Always pulls from upstream.
*/
final override def onPull(ctx: Context[Out]): SyncDirective = ctx.pull()
}
/**
* `DetachedStage` can be used to implement operations similar to [[akka.stream.scaladsl.FlowOps#buffer buffer]],
* [[akka.stream.scaladsl.FlowOps#expand expand]] and [[akka.stream.scaladsl.FlowOps#conflate conflate]].
*
* `DetachedStage` implementations are boundaries between 1-bounded regions. This means that they need to enforce the
* "exactly one" property both on their upstream and downstream regions. As a consequence a `DetachedStage` can never
* answer an [[#onPull]] with a [[Context#pull]] or answer an [[#onPush]] with a [[Context#push]] since such an action
* would "steal" the event from one region (resulting in zero signals) and would inject it to the other region
* (resulting in two signals).
*
* However, DetachedStages have the ability to call [[akka.stream.stage.DetachedContext#hold]] as a response to
* [[#onPush]] and [[#onPull]] which temporarily takes the signal off and
* stops execution, at the same time putting the stage in an [[akka.stream.stage.DetachedContext#isHolding]] state.
* If the stage is in a holding state it contains one absorbed signal, therefore in this state the only possible
* command to call is [[akka.stream.stage.DetachedContext#pushAndPull]] which results in two events making the
* balance right again: 1 hold + 1 external event = 2 external event
*
* This mechanism allows synchronization between the upstream and downstream regions which otherwise can progress
* independently.
*
* @see [[PushPullStage]]
*/
@deprecated("Please use GraphStage instead.", "2.4.2")
abstract class DetachedStage[In, Out]
extends AbstractStage[In, Out, UpstreamDirective, DownstreamDirective, DetachedContext[Out], LifecycleContext] {
private[stream] override def isDetached = true
/**
* If an exception is thrown from [[#onPush]] this method is invoked to decide how
* to handle the exception. By default this method returns [[Supervision.Stop]].
*
* If an exception is thrown from [[#onPull]] or if the stage is holding state the stream
* will always be completed with failure, because it is not always possible to recover from
* that state.
* In concrete stages it is of course possible to use ordinary try-catch-recover inside
* `onPull` when it is know how to recover from such exceptions.
*/
override def decide(t: Throwable): Supervision.Directive = super.decide(t)
}
/**
* The behavior of [[StatefulStage]] is defined by these two methods, which
* has the same semantics as corresponding methods in [[PushPullStage]].
*/
abstract class StageState[In, Out] {
def onPush(elem: In, ctx: Context[Out]): SyncDirective
def onPull(ctx: Context[Out]): SyncDirective = ctx.pull()
}
/**
* INTERNAL API
*/
private[akka] object StatefulStage {
sealed trait AndThen
case object Finish extends AndThen
final case class Become(state: StageState[Any, Any]) extends AndThen
case object Stay extends AndThen
}
/**
* `StatefulStage` is a [[PushPullStage]] that provides convenience to make some things easier.
*
* The behavior is defined in [[StageState]] instances. The initial behavior is specified
* by subclass implementing the [[#initial]] method. The behavior can be changed by using [[#become]].
*
* Use [[#emit]] or [[#emitAndFinish]] to push more than one element from [[StageState#onPush]] or
* [[StageState#onPull]].
*
* Use [[#terminationEmit]] to push final elements from [[#onUpstreamFinish]] or [[#onUpstreamFailure]].
*/
@deprecated("StatefulStage is deprecated, please use GraphStage instead.", "2.0-M2")
abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] {
import StatefulStage._
/**
* Scala API
*/
abstract class State extends StageState[In, Out]
private[this] var emitting = false
private[this] var _current: StageState[In, Out] = _
become(initial)
/**
* Concrete subclass must return the initial behavior from this method.
*
* **Warning:** This method must not be implemented as `val`.
*/
def initial: StageState[In, Out]
/**
* Current state.
*/
final def current: StageState[In, Out] = _current
/**
* Change the behavior to another [[StageState]].
*/
final def become(state: StageState[In, Out]): Unit = {
require(state ne null, "New state must not be null")
_current = state
}
/**
* Invokes current state.
*/
final override def onPush(elem: In, ctx: Context[Out]): SyncDirective = _current.onPush(elem, ctx)
/**
* Invokes current state.
*/
final override def onPull(ctx: Context[Out]): SyncDirective = _current.onPull(ctx)
override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective =
if (emitting) ctx.absorbTermination()
else ctx.finish()
/**
* Scala API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one
* element downstream.
*/
final def emit(iter: Iterator[Out], ctx: Context[Out]): SyncDirective = emit(iter, ctx, _current)
/**
* Java API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one
* element downstream.
*/
final def emit(iter: java.util.Iterator[Out], ctx: Context[Out]): SyncDirective = {
import scala.collection.JavaConverters._
emit(iter.asScala, ctx)
}
/**
* Scala API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one
* element downstream and after that change behavior.
*/
final def emit(iter: Iterator[Out], ctx: Context[Out], nextState: StageState[In, Out]): SyncDirective = {
if (emitting) throw new IllegalStateException("already in emitting state")
if (iter.isEmpty) {
become(nextState)
ctx.pull()
} else {
val elem = iter.next()
if (iter.hasNext) {
emitting = true
become(emittingState(iter, andThen = Become(nextState.asInstanceOf[StageState[Any, Any]])))
} else
become(nextState)
ctx.push(elem)
}
}
/**
* Java API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one
* element downstream and after that change behavior.
*/
final def emit(iter: java.util.Iterator[Out], ctx: Context[Out], nextState: StageState[In, Out]): SyncDirective = {
import scala.collection.JavaConverters._
emit(iter.asScala, ctx, nextState)
}
/**
* Scala API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one
* element downstream and after that finish (complete downstreams, cancel upstreams).
*/
final def emitAndFinish(iter: Iterator[Out], ctx: Context[Out]): SyncDirective = {
if (emitting) throw new IllegalStateException("already in emitting state")
if (iter.isEmpty)
ctx.finish()
else {
val elem = iter.next()
if (iter.hasNext) {
emitting = true
become(emittingState(iter, andThen = Finish))
ctx.push(elem)
} else
ctx.pushAndFinish(elem)
}
}
/**
* Java API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one
* element downstream and after that finish (complete downstreams, cancel upstreams).
*/
final def emitAndFinish(iter: java.util.Iterator[Out], ctx: Context[Out]): SyncDirective = {
import scala.collection.JavaConverters._
emitAndFinish(iter.asScala, ctx)
}
/**
* Scala API: Can be used from [[#onUpstreamFinish]] to push final elements downstream
* before completing the stream successfully. Note that if this is used from
* [[#onUpstreamFailure]] the failure will be absorbed and the stream will be completed
* successfully.
*/
final def terminationEmit(iter: Iterator[Out], ctx: Context[Out]): TerminationDirective = {
if (iter.isEmpty) {
if (emitting) ctx.absorbTermination()
else ctx.finish()
} else {
val nextState = current match {
case es: EmittingState if emitting es.copy(iter = es.iter ++ iter)
case _ emittingState(iter, andThen = Finish)
}
become(nextState)
ctx.absorbTermination()
}
}
/**
* Java API: Can be used from [[#onUpstreamFinish]] or [[#onUpstreamFailure]] to push final
* elements downstream.
*/
final def terminationEmit(iter: java.util.Iterator[Out], ctx: Context[Out]): TerminationDirective = {
import scala.collection.JavaConverters._
terminationEmit(iter.asScala, ctx)
}
private def emittingState(iter: Iterator[Out], andThen: AndThen) = EmittingState(iter, andThen)
private case class EmittingState(iter: Iterator[Out], andThen: AndThen) extends State {
override def onPush(elem: In, ctx: Context[Out]) = throw new IllegalStateException("onPush not allowed in emittingState")
override def onPull(ctx: Context[Out]) = {
if (iter.hasNext) {
val elem = iter.next()
if (iter.hasNext)
ctx.push(elem)
else if (!ctx.isFinishing) {
emitting = false
andThen match {
case Stay // ok
case Become(newState) become(newState.asInstanceOf[StageState[In, Out]])
case Finish ctx.pushAndFinish(elem)
}
ctx.push(elem)
} else
ctx.pushAndFinish(elem)
} else
throw new IllegalStateException("onPull with empty iterator is not expected in emittingState")
}
}
}
/**
* Return type from [[Context]] methods.
*/
sealed trait Directive
sealed trait AsyncDirective extends Directive
sealed trait SyncDirective extends Directive
sealed trait UpstreamDirective extends SyncDirective
sealed trait DownstreamDirective extends SyncDirective
sealed trait TerminationDirective extends SyncDirective
// never instantiated
sealed abstract class FreeDirective private () extends UpstreamDirective with DownstreamDirective with TerminationDirective with AsyncDirective
trait LifecycleContext {
/**
* Returns the Materializer that was used to materialize this [[Stage]].
* It can be used to materialize sub-flows.
*/
def materializer: Materializer
/** Returns operation attributes associated with the this Stage */
def attributes: Attributes
}
/**
* Passed to the callback methods of [[PushPullStage]] and [[StatefulStage]].
*/
sealed trait Context[Out] extends LifecycleContext {
/**
* Push one element to downstreams.
*/
def push(elem: Out): DownstreamDirective
/**
* Request for more elements from upstreams.
*/
def pull(): UpstreamDirective
/**
* Cancel upstreams and complete downstreams successfully.
*/
def finish(): FreeDirective
/**
* Push one element to downstream immediately followed by
* cancel of upstreams and complete of downstreams.
*/
def pushAndFinish(elem: Out): DownstreamDirective
/**
* Cancel upstreams and complete downstreams with failure.
*/
def fail(cause: Throwable): FreeDirective
/**
* Puts the stage in a finishing state so that
* final elements can be pushed from `onPull`.
*/
def absorbTermination(): TerminationDirective
/**
* This returns `true` after [[#absorbTermination]] has been used.
*/
def isFinishing: Boolean
}
/**
* Passed to the callback methods of [[DetachedStage]].
*
* [[#hold]] stops execution and at the same time putting the stage in a holding state.
* If the stage is in a holding state it contains one absorbed signal, therefore in
* this state the only possible command to call is [[#pushAndPull]] which results in two
* events making the balance right again: 1 hold + 1 external event = 2 external event
*/
trait DetachedContext[Out] extends Context[Out] {
def holdUpstream(): UpstreamDirective
def holdUpstreamAndPush(elem: Out): UpstreamDirective
def holdDownstream(): DownstreamDirective
def holdDownstreamAndPull(): DownstreamDirective
/**
* This returns `true` when [[#hold]] has been used
* and it is reset to `false` after [[#pushAndPull]].
*/
def isHoldingBoth: Boolean = isHoldingUpstream && isHoldingDownstream
def isHoldingUpstream: Boolean
def isHoldingDownstream: Boolean
def pushAndPull(elem: Out): FreeDirective
}

View file

@ -1025,6 +1025,50 @@ object MiMa extends AutoPlugin {
// #20553 Tree flattening should be separate from Fusing
ProblemFilters.exclude[MissingClassProblem]("akka.stream.Fusing$StructuralInfo"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.Fusing$StructuralInfo$")
),
"2.4.14" -> Seq(
// #21423 removal of deprecated stages (in 2.5.x)
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.Source.transform"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.SubSource.transform"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.Flow.transform"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.SubFlow.transform"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.transformMaterializing"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Flow.transform"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Flow.transformMaterializing"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Flow.andThen"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Source.transform"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Source.transformMaterializing"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Source.andThen"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowOps.transform"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowOps.andThen"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.Directive"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.AsyncDirective"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.TerminationDirective"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.AbstractStage$"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.StatefulStage$Become$"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.AbstractStage$PushPullGraphStage"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.StatefulStage$EmittingState$"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.AbstractStage"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.AbstractStage$PushPullGraphLogic"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.Context"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.Stage"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.StatefulStage$"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.DetachedStage"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.StatefulStage$Become"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.StageState"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.AbstractStage$PushPullGraphStageWithMaterializedValue"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.DownstreamDirective"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.PushPullStage"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.LifecycleContext"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.StatefulStage$EmittingState"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.PushStage"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.DetachedContext"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.StatefulStage"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.StatefulStage$State"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.UpstreamDirective"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.FreeDirective"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.StatefulStage$AndThen"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.SyncDirective")
)
)
}

View file

@ -3,7 +3,7 @@
*/
package akka
import com.typesafe.tools.mima.plugin.MimaKeys.reportBinaryIssues
import com.typesafe.tools.mima.plugin.MimaKeys.mimaReportBinaryIssues
import com.typesafe.tools.mima.plugin.MimaPlugin
import net.virtualvoid.sbt.graph.backend.SbtUpdateReport
import net.virtualvoid.sbt.graph.DependencyGraphKeys._
@ -279,7 +279,7 @@ object MimaWithPrValidation extends AutoPlugin {
override def trigger = allRequirements
override def requires = ValidatePullRequest && MimaPlugin
override lazy val projectSettings = Seq(
additionalTasks in ValidatePR += reportBinaryIssues
additionalTasks in ValidatePR += mimaReportBinaryIssues
)
}