Akka 29784 stateful map concat relax f ret type (#29792)

* akka-29730__register_newshells_when_no_active_interpreters: unrelated warnings that act as compilation errors on my env.

* akka-29730__register_newshells_when_no_active_interpreters: introduce a reproduction for akka-29730. actor interpreter failed to register new shells introduced by flatMapPrefix when the current interpreter has completed.

* akka-29730__register_newshells_when_no_active_interpreters: fix akka-29730

* akka-29730__register_newshells_when_no_active_interpreters: scalafmtAll

* Update akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala

* akka-29730__register_newshells_when_no_active_interpreters: style

* akka-29730__register_newshells_when_no_active_interpreters: fmt

* akka-29730__register_newshells_when_no_active_interpreters: scalafmt

* akka-29730__register_newshells_when_no_active_interpreters: simplify shortCircuitBatch's condition.

* akka-29730__register_newshells_when_no_active_interpreters: salfmt

* akka-29730__register_newshells_when_no_active_interpreters: println => log.debug

* Update akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala

Co-authored-by: eyal farago <eyal.farago@gmail.com>

* Update akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala

* akka-29730__register_newshells_when_no_active_interpreters: tailrec shortCircuitBatch

* akka-29784__statefulMapConcat_relax_f_ret_type: modify scala dsl's statefulMapConcat and mapConcat to take a function returning an IterableOnce.

* akka-29784__statefulMapConcat_relax_f_ret_type: scalafmtall

* akka-29784__statefulMapConcat_relax_f_ret_type: remove redundant import.

* akka-29784__statefulMapConcat_relax_f_ret_type: remove another unused import.

* akka-29784__statefulMapConcat_relax_f_ret_type: organize imports and annotations around ccompat

* akka-29784__statefulMapConcat_relax_f_ret_type: scalafmtall

* akka-29784__statefulMapConcat_relax_f_ret_type: filter false negative MIMA issues.

* akka-29784__statefulMapConcat_relax_f_ret_type: flowWithCtx

* akka-29784__statefulMapConcat_relax_f_ret_type: avoid deprecated methods in scala 2.13

Co-authored-by: Johannes Rudolph <johannes.rudolph@gmail.com>
This commit is contained in:
eyal farago 2020-11-09 10:19:23 +02:00 committed by GitHub
parent bcee56f4b9
commit 1aaa2fb6bf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 61 additions and 28 deletions

View file

@ -29,6 +29,17 @@ class FlowMapConcatSpec extends StreamSpec("""
TestConfig.RandomTestRange.foreach(_ => runScript(script)(_.mapConcat(x => (1 to x).map(_ => x))))
}
"map and concat iterator" in {
val script = Script(
Seq(0) -> Seq(),
Seq(1) -> Seq(1),
Seq(2) -> Seq(2, 2),
Seq(3) -> Seq(3, 3, 3),
Seq(2) -> Seq(2, 2),
Seq(1) -> Seq(1))
TestConfig.RandomTestRange.foreach(_ => runScript(script)(_.mapConcat(x => Iterator.fill(x)(x))))
}
"map and concat grouping with slow downstream" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val input = (1 to 20).grouped(5).toList
@ -51,6 +62,18 @@ class FlowMapConcatSpec extends StreamSpec("""
.expectComplete()
}
"be able to resume (iterator)" in assertAllStagesStopped {
val ex = new Exception("TEST") with NoStackTrace
Source(1 to 5)
.mapConcat(x => if (x == 3) throw ex else scala.collection.Iterator(x))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(TestSink.probe[Int])
.request(4)
.expectNext(1, 2, 4, 5)
.expectComplete()
}
}
}

View file

@ -0,0 +1,7 @@
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.scaladsl.*.mapConcat")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.impl.*.mapConcat")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.scaladsl.*.statefulMapConcat")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.impl.*.statefulMapConcat")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.impl.fusing.StatefulMapConcat.f")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.impl.fusing.StatefulMapConcat.this")

View file

@ -6,32 +6,31 @@ package akka.stream.impl.fusing
import java.util.concurrent.TimeUnit.NANOSECONDS
import akka.actor.{ ActorRef, Terminated }
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.event.Logging.LogLevel
import akka.event._
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.OverflowStrategies._
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ ReactiveStreamsCompliance, Buffer => BufferImpl }
import akka.stream.scaladsl.{ DelayStrategy, Source }
import akka.stream.stage._
import akka.stream.{ Supervision, _ }
import akka.util.{ unused, OptionVal }
import com.github.ghik.silencer.silent
import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.immutable.VectorBuilder
import scala.concurrent.Future
import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.util.{ Failure, Success, Try }
import scala.util.control.{ NoStackTrace, NonFatal }
import scala.util.control.Exception.Catcher
import com.github.ghik.silencer.silent
import akka.actor.{ ActorRef, Terminated }
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.event.{ LogMarker, LogSource, Logging, LoggingAdapter, MarkerLoggingAdapter }
import akka.event.Logging.LogLevel
import akka.stream.{ Supervision, _ }
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.OverflowStrategies._
import akka.stream.impl.{ ReactiveStreamsCompliance, Buffer => BufferImpl }
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.scaladsl.{ DelayStrategy, Source }
import akka.stream.stage._
import akka.util.OptionVal
import akka.util.unused
import scala.util.control.{ NoStackTrace, NonFatal }
import scala.util.{ Failure, Success, Try }
import akka.util.ccompat._
/**
* INTERNAL API
@ -2159,7 +2158,9 @@ private[stream] object Collect {
/**
* INTERNAL API
*/
@InternalApi private[akka] final class StatefulMapConcat[In, Out](val f: () => In => immutable.Iterable[Out])
@InternalApi
@ccompatUsedUntil213
private[akka] final class StatefulMapConcat[In, Out](val f: () => In => IterableOnce[Out])
extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("StatefulMapConcat.in")
val out = Outlet[Out]("StatefulMapConcat.out")

View file

@ -33,6 +33,7 @@ import akka.stream.impl.fusing._
import akka.stream.impl.fusing.FlattenMerge
import akka.stream.stage._
import akka.util.{ ConstantFun, Timeout }
import akka.util.ccompat._
/**
* A `Flow` is a set of stream processing steps that has one open input and one open output.
@ -780,9 +781,9 @@ final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBui
* Binary compatibility is only maintained for callers of this traits interface.
*/
@DoNotInherit
@ccompatUsedUntil213
trait FlowOps[+Out, +Mat] {
import GraphDSL.Implicits._
import akka.stream.impl.Stages._
type Repr[+O] <: FlowOps[O, Mat] {
@ -969,7 +970,7 @@ trait FlowOps[+Out, +Mat] {
* '''Cancels when''' downstream cancels
*
*/
def mapConcat[T](f: Out => immutable.Iterable[T]): Repr[T] = statefulMapConcat(() => f)
def mapConcat[T](f: Out => IterableOnce[T]): Repr[T] = statefulMapConcat(() => f)
/**
* Transform each input element into an `Iterable` of output elements that is
@ -995,7 +996,7 @@ trait FlowOps[+Out, +Mat] {
*
* See also [[FlowOps.mapConcat]]
*/
def statefulMapConcat[T](f: () => Out => immutable.Iterable[T]): Repr[T] =
def statefulMapConcat[T](f: () => Out => IterableOnce[T]): Repr[T] =
via(new StatefulMapConcat(f))
/**

View file

@ -8,19 +8,20 @@ import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import akka.NotUsed
import akka.dispatch.ExecutionContexts
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import akka.stream._
import akka.stream.impl.Throttle
import akka.util.ConstantFun
import akka.util.{ ccompat, ConstantFun }
import ccompat._
/**
* Shared stream operations for [[FlowWithContext]] and [[SourceWithContext]] that automatically propagate a context
* element with each data element.
*
*/
@ccompatUsedUntil213
trait FlowWithContextOps[+Out, +Ctx, +Mat] {
type ReprMat[+O, +C, +M] <: FlowWithContextOps[O, C, M] {
type ReprMat[+OO, +CC, +MatMat] = FlowWithContextOps.this.ReprMat[OO, CC, MatMat]
@ -173,9 +174,9 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
*
* @see [[akka.stream.scaladsl.FlowOps.mapConcat]]
*/
def mapConcat[Out2](f: Out => immutable.Iterable[Out2]): Repr[Out2, Ctx] =
def mapConcat[Out2](f: Out => IterableOnce[Out2]): Repr[Out2, Ctx] =
via(flow.mapConcat {
case (e, ctx) => f(e).map(_ -> ctx)
case (e, ctx) => f(e).iterator.map(_ -> ctx)
})
/**