From 1aaa2fb6bf75066384f8f193cbde7b18b41152cb Mon Sep 17 00:00:00 2001 From: eyal farago Date: Mon, 9 Nov 2020 10:19:23 +0200 Subject: [PATCH] Akka 29784 stateful map concat relax f ret type (#29792) * akka-29730__register_newshells_when_no_active_interpreters: unrelated warnings that act as compilation errors on my env. * akka-29730__register_newshells_when_no_active_interpreters: introduce a reproduction for akka-29730. actor interpreter failed to register new shells introduced by flatMapPrefix when the current interpreter has completed. * akka-29730__register_newshells_when_no_active_interpreters: fix akka-29730 * akka-29730__register_newshells_when_no_active_interpreters: scalafmtAll * Update akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala * akka-29730__register_newshells_when_no_active_interpreters: style * akka-29730__register_newshells_when_no_active_interpreters: fmt * akka-29730__register_newshells_when_no_active_interpreters: scalafmt * akka-29730__register_newshells_when_no_active_interpreters: simplify shortCircuitBatch's condition. * akka-29730__register_newshells_when_no_active_interpreters: salfmt * akka-29730__register_newshells_when_no_active_interpreters: println => log.debug * Update akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala Co-authored-by: eyal farago * Update akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala * akka-29730__register_newshells_when_no_active_interpreters: tailrec shortCircuitBatch * akka-29784__statefulMapConcat_relax_f_ret_type: modify scala dsl's statefulMapConcat and mapConcat to take a function returning an IterableOnce. * akka-29784__statefulMapConcat_relax_f_ret_type: scalafmtall * akka-29784__statefulMapConcat_relax_f_ret_type: remove redundant import. * akka-29784__statefulMapConcat_relax_f_ret_type: remove another unused import. * akka-29784__statefulMapConcat_relax_f_ret_type: organize imports and annotations around ccompat * akka-29784__statefulMapConcat_relax_f_ret_type: scalafmtall * akka-29784__statefulMapConcat_relax_f_ret_type: filter false negative MIMA issues. * akka-29784__statefulMapConcat_relax_f_ret_type: flowWithCtx * akka-29784__statefulMapConcat_relax_f_ret_type: avoid deprecated methods in scala 2.13 Co-authored-by: Johannes Rudolph --- .../stream/scaladsl/FlowMapConcatSpec.scala | 23 ++++++++++ ...ulMapConcat-relax-types.backwards.excludes | 7 +++ .../scala/akka/stream/impl/fusing/Ops.scala | 43 ++++++++++--------- .../scala/akka/stream/scaladsl/Flow.scala | 7 +-- .../stream/scaladsl/FlowWithContextOps.scala | 9 ++-- 5 files changed, 61 insertions(+), 28 deletions(-) create mode 100644 akka-stream/src/main/mima-filters/2.6.10.backwards.excludes/akka-29784-statefulMapConcat-relax-types.backwards.excludes diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala index ef3755b830..08dfecd893 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala @@ -29,6 +29,17 @@ class FlowMapConcatSpec extends StreamSpec(""" TestConfig.RandomTestRange.foreach(_ => runScript(script)(_.mapConcat(x => (1 to x).map(_ => x)))) } + "map and concat iterator" in { + val script = Script( + Seq(0) -> Seq(), + Seq(1) -> Seq(1), + Seq(2) -> Seq(2, 2), + Seq(3) -> Seq(3, 3, 3), + Seq(2) -> Seq(2, 2), + Seq(1) -> Seq(1)) + TestConfig.RandomTestRange.foreach(_ => runScript(script)(_.mapConcat(x => Iterator.fill(x)(x)))) + } + "map and concat grouping with slow downstream" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() val input = (1 to 20).grouped(5).toList @@ -51,6 +62,18 @@ class FlowMapConcatSpec extends StreamSpec(""" .expectComplete() } + "be able to resume (iterator)" in assertAllStagesStopped { + val ex = new Exception("TEST") with NoStackTrace + + Source(1 to 5) + .mapConcat(x => if (x == 3) throw ex else scala.collection.Iterator(x)) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(TestSink.probe[Int]) + .request(4) + .expectNext(1, 2, 4, 5) + .expectComplete() + } + } } diff --git a/akka-stream/src/main/mima-filters/2.6.10.backwards.excludes/akka-29784-statefulMapConcat-relax-types.backwards.excludes b/akka-stream/src/main/mima-filters/2.6.10.backwards.excludes/akka-29784-statefulMapConcat-relax-types.backwards.excludes new file mode 100644 index 0000000000..2e624d659c --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.10.backwards.excludes/akka-29784-statefulMapConcat-relax-types.backwards.excludes @@ -0,0 +1,7 @@ +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.scaladsl.*.mapConcat") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.impl.*.mapConcat") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.scaladsl.*.statefulMapConcat") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.impl.*.statefulMapConcat") + +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.impl.fusing.StatefulMapConcat.f") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.impl.fusing.StatefulMapConcat.this") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 7339cd67b7..92057d5430 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -6,32 +6,31 @@ package akka.stream.impl.fusing import java.util.concurrent.TimeUnit.NANOSECONDS +import akka.actor.{ ActorRef, Terminated } +import akka.annotation.{ DoNotInherit, InternalApi } +import akka.event.Logging.LogLevel +import akka.event._ +import akka.stream.ActorAttributes.SupervisionStrategy +import akka.stream.Attributes.{ InputBuffer, LogLevels } +import akka.stream.OverflowStrategies._ +import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage +import akka.stream.impl.{ ReactiveStreamsCompliance, Buffer => BufferImpl } +import akka.stream.scaladsl.{ DelayStrategy, Source } +import akka.stream.stage._ +import akka.stream.{ Supervision, _ } +import akka.util.{ unused, OptionVal } +import com.github.ghik.silencer.silent + import scala.annotation.tailrec import scala.collection.immutable import scala.collection.immutable.VectorBuilder import scala.concurrent.Future import scala.concurrent.duration.{ FiniteDuration, _ } -import scala.util.{ Failure, Success, Try } -import scala.util.control.{ NoStackTrace, NonFatal } import scala.util.control.Exception.Catcher - -import com.github.ghik.silencer.silent - -import akka.actor.{ ActorRef, Terminated } -import akka.annotation.{ DoNotInherit, InternalApi } -import akka.event.{ LogMarker, LogSource, Logging, LoggingAdapter, MarkerLoggingAdapter } -import akka.event.Logging.LogLevel -import akka.stream.{ Supervision, _ } -import akka.stream.ActorAttributes.SupervisionStrategy -import akka.stream.Attributes.{ InputBuffer, LogLevels } -import akka.stream.OverflowStrategies._ -import akka.stream.impl.{ ReactiveStreamsCompliance, Buffer => BufferImpl } -import akka.stream.impl.Stages.DefaultAttributes -import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage -import akka.stream.scaladsl.{ DelayStrategy, Source } -import akka.stream.stage._ -import akka.util.OptionVal -import akka.util.unused +import scala.util.control.{ NoStackTrace, NonFatal } +import scala.util.{ Failure, Success, Try } +import akka.util.ccompat._ /** * INTERNAL API @@ -2159,7 +2158,9 @@ private[stream] object Collect { /** * INTERNAL API */ -@InternalApi private[akka] final class StatefulMapConcat[In, Out](val f: () => In => immutable.Iterable[Out]) +@InternalApi +@ccompatUsedUntil213 +private[akka] final class StatefulMapConcat[In, Out](val f: () => In => IterableOnce[Out]) extends GraphStage[FlowShape[In, Out]] { val in = Inlet[In]("StatefulMapConcat.in") val out = Outlet[Out]("StatefulMapConcat.out") diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 9ffefd69d2..ebb2166193 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -33,6 +33,7 @@ import akka.stream.impl.fusing._ import akka.stream.impl.fusing.FlattenMerge import akka.stream.stage._ import akka.util.{ ConstantFun, Timeout } +import akka.util.ccompat._ /** * A `Flow` is a set of stream processing steps that has one open input and one open output. @@ -780,9 +781,9 @@ final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBui * Binary compatibility is only maintained for callers of this trait’s interface. */ @DoNotInherit +@ccompatUsedUntil213 trait FlowOps[+Out, +Mat] { import GraphDSL.Implicits._ - import akka.stream.impl.Stages._ type Repr[+O] <: FlowOps[O, Mat] { @@ -969,7 +970,7 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels * */ - def mapConcat[T](f: Out => immutable.Iterable[T]): Repr[T] = statefulMapConcat(() => f) + def mapConcat[T](f: Out => IterableOnce[T]): Repr[T] = statefulMapConcat(() => f) /** * Transform each input element into an `Iterable` of output elements that is @@ -995,7 +996,7 @@ trait FlowOps[+Out, +Mat] { * * See also [[FlowOps.mapConcat]] */ - def statefulMapConcat[T](f: () => Out => immutable.Iterable[T]): Repr[T] = + def statefulMapConcat[T](f: () => Out => IterableOnce[T]): Repr[T] = via(new StatefulMapConcat(f)) /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala index 94be79902a..f83e116f15 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala @@ -8,19 +8,20 @@ import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration - import akka.NotUsed import akka.dispatch.ExecutionContexts import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.stream._ import akka.stream.impl.Throttle -import akka.util.ConstantFun +import akka.util.{ ccompat, ConstantFun } +import ccompat._ /** * Shared stream operations for [[FlowWithContext]] and [[SourceWithContext]] that automatically propagate a context * element with each data element. * */ +@ccompatUsedUntil213 trait FlowWithContextOps[+Out, +Ctx, +Mat] { type ReprMat[+O, +C, +M] <: FlowWithContextOps[O, C, M] { type ReprMat[+OO, +CC, +MatMat] = FlowWithContextOps.this.ReprMat[OO, CC, MatMat] @@ -173,9 +174,9 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { * * @see [[akka.stream.scaladsl.FlowOps.mapConcat]] */ - def mapConcat[Out2](f: Out => immutable.Iterable[Out2]): Repr[Out2, Ctx] = + def mapConcat[Out2](f: Out => IterableOnce[Out2]): Repr[Out2, Ctx] = via(flow.mapConcat { - case (e, ctx) => f(e).map(_ -> ctx) + case (e, ctx) => f(e).iterator.map(_ -> ctx) }) /**