diff --git a/stream/src/main/boilerplate/org/apache/pekko/stream/scaladsl/GraphApply.scala.template b/stream/src/main/boilerplate/org/apache/pekko/stream/scaladsl/GraphApply.scala.template index 0894cf124c..3141a4fed5 100644 --- a/stream/src/main/boilerplate/org/apache/pekko/stream/scaladsl/GraphApply.scala.template +++ b/stream/src/main/boilerplate/org/apache/pekko/stream/scaladsl/GraphApply.scala.template @@ -27,21 +27,6 @@ trait GraphApply { createGraph(s, builder) } - /** - * Creates a new [[Graph]] by importing the given graph `g1` and passing its [[Shape]] - * along with the [[GraphDSL.Builder]] to the given create function. - * - * Deprecated: this method signature does not work with Scala 3 type inference, kept for binary compatibility. Use createGraph instead. - */ - @deprecated("Use createGraph instead", "Akka 2.6.16") - def create[S <: Shape, Mat](g1: Graph[Shape, Mat])(buildBlock: GraphDSL.Builder[Mat] => (g1.Shape) => S): Graph[S, Mat] = { - val builder = new GraphDSL.Builder - val s1 = builder.add(g1, Keep.right) - val s = buildBlock(builder)(s1) - - createGraph(s, builder) - } - /** * Creates a new [[Graph]] by importing the given graph `g1` and passing its [[Shape]] * along with the [[GraphDSL.Builder]] to the given create function. @@ -57,24 +42,6 @@ trait GraphApply { [2..# - /** - * Creates a new [[Graph]] by importing the given graphs and passing their [[Shape]]s - * along with the [[GraphDSL.Builder]] to the given create function. - * - * Deprecated: this method signature does not work with Scala 3 type inference, kept for binary compatibility. Use createGraph instead. - */ - @deprecated("Use createGraph instead", "Akka ##2.6.##16") - def create[S <: Shape, Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) => Mat)(buildBlock: GraphDSL.Builder[Mat] => ([#g1.Shape#]) => S): Graph[S, Mat] = { - val builder = new GraphDSL.Builder - val curried = combineMat.curried - val s##1 = builder.add(g##1, (m##1: M##1) => curried(m##1)) - [2..#val s1 = builder.add(g1, (f: M1 => Any, m1: M1) => f(m1))# - ] - val s = buildBlock(builder)([#s1#]) - - createGraph(s, builder) - } - /** * Creates a new [[Graph]] by importing the given graphs and passing their [[Shape]]s * along with the [[GraphDSL.Builder]] to the given create function. diff --git a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes new file mode 100644 index 0000000000..feb063120b --- /dev/null +++ b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Remove deprecated methods +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.FanInShape1N") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.UniformFanInShape.inSeq") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.UniformFanOutShape.outArray") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ConstantFun*") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter#BatchingActorInputBoundary.onDownstreamFinish") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.impl.fusing.PrefixAndTail#PrefixAndTailLogic.onDownstreamFinish") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.io.ByteStringParser$ParsingException") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.impl.io.ByteStringParser#ParsingLogic.onDownstreamFinish") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.javadsl.CoupledTerminationFlow*") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartFlow.onFailuresWithBackoff") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartFlow.withBackoff") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartSink.withBackoff") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartSource.onFailuresWithBackoff") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartSource.withBackoff") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Balance.this") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.scaladsl.CoupledTerminationFlow*") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.GraphApply.create") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.GraphDSL.create") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.MergeHub#MergedSourceLogic.onDownstreamFinish") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Partition.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.RestartFlow.onFailuresWithBackoff") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.RestartFlow.withBackoff") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.RestartSink.withBackoff") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.RestartSource.onFailuresWithBackoff") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.RestartSource.withBackoff") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.ZipWithN.inSeq") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.AbstractInOutHandler.onDownstreamFinish") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.AbstractOutHandler.onDownstreamFinish") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.GraphStageLogic#ConditionalTerminateOutput.onDownstreamFinish") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.GraphStageLogic#EagerTerminateOutput.onDownstreamFinish") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.GraphStageLogic#Emitting.onDownstreamFinish") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.GraphStageLogic#IgnoreTerminateOutput.onDownstreamFinish") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.OutHandler.onDownstreamFinish") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.TimerGraphStageLogic.schedulePeriodicallyWithInitialDelay") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.TimerGraphStageLogic.schedulePeriodically") diff --git a/stream/src/main/scala/org/apache/pekko/stream/FanInShape1N.scala b/stream/src/main/scala/org/apache/pekko/stream/FanInShape1N.scala deleted file mode 100644 index 497b9c0f82..0000000000 --- a/stream/src/main/scala/org/apache/pekko/stream/FanInShape1N.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2018-2022 Lightbend Inc. - */ - -package org.apache.pekko.stream - -import scala.annotation.unchecked.uncheckedVariance -import scala.collection.immutable - -@deprecated( - "FanInShape1N was removed because it was not used anywhere. Use a custom shape extending from FanInShape directly.", - "Akka 2.5.5") -class FanInShape1N[-T0, -T1, +O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) { - - // ports get added to `FanInShape.inlets` as a side-effect of calling `newInlet` - val in0: Inlet[T0 @uncheckedVariance] = newInlet[T0]("in0") - for (i <- 1 until n) newInlet[T1](s"in$i") - - def this(n: Int) = this(n, FanInShape.Name[O]("FanInShape1N")) - def this(n: Int, name: String) = this(n, FanInShape.Name[O](name)) - def this( - outlet: Outlet[O @uncheckedVariance], - in0: Inlet[T0 @uncheckedVariance], - inlets1: Array[Inlet[T1 @uncheckedVariance]]) = - this(inlets1.length, FanInShape.Ports(outlet, in0 :: inlets1.toList)) - override protected def construct(init: FanInShape.Init[O @uncheckedVariance]): FanInShape[O] = - new FanInShape1N(n, init) - override def deepCopy(): FanInShape1N[T0, T1, O] = super.deepCopy().asInstanceOf[FanInShape1N[T0, T1, O]] - - @deprecated("Use 'inlets' or 'in(id)' instead.", "Akka 2.5.5") - def in1Seq: immutable.IndexedSeq[Inlet[T1 @uncheckedVariance]] = _in1Seq - - // cannot deprecate a lazy val because of genjavadoc problem https://github.com/typesafehub/genjavadoc/issues/85 - private lazy val _in1Seq: immutable.IndexedSeq[Inlet[T1 @uncheckedVariance]] = - inlets.tail // head is in0 - .toIndexedSeq.asInstanceOf[immutable.IndexedSeq[Inlet[T1]]] - - def in(n: Int): Inlet[T1 @uncheckedVariance] = { - require(n > 0, "n must be > 0") - inlets(n).asInstanceOf[Inlet[T1]] - } -} diff --git a/stream/src/main/scala/org/apache/pekko/stream/UniformFanInShape.scala b/stream/src/main/scala/org/apache/pekko/stream/UniformFanInShape.scala index 7e2d0085ee..0490f7e650 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/UniformFanInShape.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/UniformFanInShape.scala @@ -42,10 +42,5 @@ class UniformFanInShape[-T, +O](val n: Int, _init: FanInShape.Init[O]) extends F final override def inlets: immutable.Seq[Inlet[T @uncheckedVariance]] = super.inlets.asInstanceOf[immutable.Seq[Inlet[T]]] - @deprecated("Use 'inlets' or 'in(id)' instead.", "Akka 2.5.5") - def inSeq: immutable.IndexedSeq[Inlet[T @uncheckedVariance]] = _inSeq - - // cannot deprecate a lazy val because of genjavadoc problem https://github.com/typesafehub/genjavadoc/issues/85 - private lazy val _inSeq: immutable.IndexedSeq[Inlet[T @uncheckedVariance]] = inlets.toIndexedSeq def in(n: Int): Inlet[T @uncheckedVariance] = inlets(n) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/UniformFanOutShape.scala b/stream/src/main/scala/org/apache/pekko/stream/UniformFanOutShape.scala index 5a08ac5c22..62818baa1a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/UniformFanOutShape.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/UniformFanOutShape.scala @@ -36,10 +36,5 @@ class UniformFanOutShape[-I, +O](n: Int, _init: FanOutShape.Init[I @uncheckedVar final override def outlets: immutable.Seq[Outlet[O @uncheckedVariance]] = super.outlets.asInstanceOf[immutable.Seq[Outlet[O]]] - @deprecated("use 'outlets' or 'out(id)' instead", "Akka 2.5.5") - def outArray: Array[Outlet[O @uncheckedVariance]] = _outArray - - // cannot deprecate a lazy val because of genjavadoc problem https://github.com/typesafehub/genjavadoc/issues/85 - private lazy val _outArray: Array[Outlet[O @uncheckedVariance]] = outlets.toArray def out(n: Int): Outlet[O @uncheckedVariance] = outlets(n) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/ConstantFun.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/ConstantFun.scala deleted file mode 100644 index 504c313503..0000000000 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/ConstantFun.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2015-2022 Lightbend Inc. - */ - -package org.apache.pekko.stream.impl - -import org.apache.pekko -import pekko.annotation.InternalApi -import pekko.japi.{ Pair => JPair } -import pekko.japi.function.{ Function => JFun, Function2 => JFun2 } - -/** - * INTERNAL API - */ -@deprecated("Use org.apache.pekko.util.ConstantFun instead", "Akka 2.5.0") -@InternalApi private[pekko] object ConstantFun { - private[this] val JavaIdentityFunction = new JFun[Any, Any] { - @throws(classOf[Exception]) override def apply(param: Any): Any = param - } - - val JavaPairFunction = new JFun2[AnyRef, AnyRef, AnyRef JPair AnyRef] { - def apply(p1: AnyRef, p2: AnyRef): AnyRef JPair AnyRef = JPair(p1, p2) - } - - def javaCreatePairFunction[A, B]: JFun2[A, B, JPair[A, B]] = JavaPairFunction.asInstanceOf[JFun2[A, B, JPair[A, B]]] - - def javaIdentityFunction[T]: JFun[T, T] = JavaIdentityFunction.asInstanceOf[JFun[T, T]] - - def scalaIdentityFunction[T]: T => T = conforms.asInstanceOf[Function[T, T]] - - def scalaAnyToNone[A, B]: A => Option[B] = none - def scalaAnyTwoToNone[A, B, C]: (A, B) => Option[C] = two2none - def javaAnyToNone[A, B]: A => Option[B] = none - - val conforms = (a: Any) => a - - val zeroLong = (_: Any) => 0L - - val oneLong = (_: Any) => 1L - - val oneInt = (_: Any) => 1 - - val none = (_: Any) => None - - val two2none = (_: Any, _: Any) => None - -} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/io/ByteStringParser.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/io/ByteStringParser.scala index c21088f847..c1516351a7 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/io/ByteStringParser.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/io/ByteStringParser.scala @@ -187,9 +187,6 @@ import pekko.util.ByteString throw new IllegalStateException("no initial parser installed: you must use startWith(...)") } - @deprecated("Deprecated for internal usage. Will not be emitted any more.", "Akka 2.6.20") - class ParsingException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) - val NeedMoreData = new Exception with NoStackTrace class ByteReader(input: ByteString) { diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/CoupledTerminationFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/CoupledTerminationFlow.scala deleted file mode 100644 index 5d3b95811c..0000000000 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/CoupledTerminationFlow.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2017-2022 Lightbend Inc. - */ - -package org.apache.pekko.stream.javadsl - -import org.apache.pekko - -/** - * Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow from them. - * Similar to `Flow.fromSinkAndSource` however that API does not connect the completion signals of the wrapped operators. - */ -object CoupledTerminationFlow { - - /** - * Similar to [[Flow.fromSinkAndSource]] however couples the termination of these two operators. - * - * E.g. if the emitted [[Flow]] gets a cancellation, the [[Source]] of course is cancelled, - * however the Sink will also be completed. The table below illustrates the effects in detail: - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - *
Returned FlowSink (in)Source (out)
cause: upstream (sink-side) receives completioneffect: receives completioneffect: receives cancel
cause: upstream (sink-side) receives erroreffect: receives erroreffect: receives cancel
cause: downstream (source-side) receives canceleffect: completeseffect: receives cancel
effect: cancels upstream, completes downstreameffect: completescause: signals complete
effect: cancels upstream, errors downstreameffect: receives errorcause: signals error or throws
effect: cancels upstream, completes downstreamcause: cancelseffect: receives cancel
- * - * The order in which the `in` and `out` sides receive their respective completion signals is not defined, do not rely on its ordering. - */ - @deprecated("Use `Flow.fromSinkAndSourceCoupledMat(..., ..., Keep.both())` instead", "Akka 2.5.2") - def fromSinkAndSource[I, O, M1, M2](in: Sink[I, M1], out: Source[O, M2]): Flow[I, O, (M1, M2)] = - pekko.stream.scaladsl.Flow.fromSinkAndSourceCoupledMat(in, out)(pekko.stream.scaladsl.Keep.both).asJava -} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/RestartFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/RestartFlow.scala index 5ede75abf3..4811277a4d 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/RestartFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/RestartFlow.scala @@ -13,8 +13,6 @@ package org.apache.pekko.stream.javadsl -import scala.concurrent.duration.FiniteDuration - import org.apache.pekko import pekko.NotUsed import pekko.japi.function.Creator @@ -29,146 +27,6 @@ import pekko.stream.RestartSettings */ object RestartFlow { - /** - * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or - * completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination - * signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]] - * will be allowed to terminate without being restarted. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, - * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param flowFactory A factory for producing the [[Flow]] to wrap. - */ - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12") - def withBackoff[In, Out]( - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double, - flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { - val settings = RestartSettings(minBackoff, maxBackoff, randomFactor) - withBackoff(settings, flowFactory) - } - - /** - * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or - * completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination - * signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]] - * will be allowed to terminate without being restarted. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, - * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param flowFactory A factory for producing the [[Flow]] to wrap. - */ - @deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.", - since = "Akka 2.6.10") - def withBackoff[In, Out]( - minBackoff: java.time.Duration, - maxBackoff: java.time.Duration, - randomFactor: Double, - flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { - val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor) - withBackoff(settings, flowFactory) - } - - /** - * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or - * completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts - * is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's - * running, and then the [[Flow]] will be allowed to terminate without being restarted. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, - * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param flowFactory A factory for producing the [[Flow]] to wrap. - */ - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12") - def withBackoff[In, Out]( - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double, - maxRestarts: Int, - flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { - val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff) - withBackoff(settings, flowFactory) - } - - /** - * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or - * completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts - * is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's - * running, and then the [[Flow]] will be allowed to terminate without being restarted. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, - * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param flowFactory A factory for producing the [[Flow]] to wrap. - */ - @deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.", - since = "Akka 2.6.10") - def withBackoff[In, Out]( - minBackoff: java.time.Duration, - maxBackoff: java.time.Duration, - randomFactor: Double, - maxRestarts: Int, - flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { - val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff) - withBackoff(settings, flowFactory) - } - /** * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential * backoff. @@ -194,79 +52,6 @@ object RestartFlow { } .asJava - /** - * Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts - * using an exponential backoff. - * - * This new [[Flow]] will not emit failures. Any failure by the original [[Flow]] (the wrapped one) before that - * time will be handled by restarting it as long as maxRestarts is not reached. - * However, any termination signals, completion or cancellation sent to this [[Flow]] will terminate - * the wrapped [[Flow]], if it's running, and then the [[Flow]] will be allowed to terminate without being restarted. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, - * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param flowFactory A factory for producing the [[Flow]] to wrap. - */ - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12") - def onFailuresWithBackoff[In, Out]( - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double, - maxRestarts: Int, - flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { - val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff) - onFailuresWithBackoff(settings, flowFactory) - } - - /** - * Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts - * using an exponential backoff. - * - * This new [[Flow]] will not emit failures. Any failure by the original [[Flow]] (the wrapped one) before that - * time will be handled by restarting it as long as maxRestarts is not reached. - * However, any termination signals, completion or cancellation sent to this [[Flow]] will terminate - * the wrapped [[Flow]], if it's running, and then the [[Flow]] will be allowed to terminate without being restarted. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, - * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param flowFactory A factory for producing the [[Flow]] to wrap. - */ - @deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.", - since = "Akka 2.6.10") - def onFailuresWithBackoff[In, Out]( - minBackoff: java.time.Duration, - maxBackoff: java.time.Duration, - randomFactor: Double, - maxRestarts: Int, - flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { - val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff) - onFailuresWithBackoff(settings, flowFactory) - } - /** * Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts * using an exponential backoff. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/RestartSink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/RestartSink.scala index b7c3f9c7ce..6f7ef2c133 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/RestartSink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/RestartSink.scala @@ -13,8 +13,6 @@ package org.apache.pekko.stream.javadsl -import scala.concurrent.duration.FiniteDuration - import org.apache.pekko import pekko.NotUsed import pekko.japi.function.Creator @@ -29,150 +27,6 @@ import pekko.stream.RestartSettings */ object RestartSink { - /** - * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it. - * The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that - * happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered - * simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the - * graph. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already - * sent may have been lost. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param sinkFactory A factory for producing the [[Sink]] to wrap. - */ - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12") - def withBackoff[T]( - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double, - sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { - val settings = RestartSettings(minBackoff, maxBackoff, randomFactor) - withBackoff(settings, sinkFactory) - } - - /** - * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it. - * The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that - * happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered - * simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the - * graph. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already - * sent may have been lost. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param sinkFactory A factory for producing the [[Sink]] to wrap. - */ - @deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.", - since = "Akka 2.6.10") - def withBackoff[T]( - minBackoff: java.time.Duration, - maxBackoff: java.time.Duration, - randomFactor: Double, - sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { - val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor) - withBackoff(settings, sinkFactory) - } - - /** - * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]] - * is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into - * this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted. - * This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right - * before this [[Sink]] in the graph. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already - * sent may have been lost. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param sinkFactory A factory for producing the [[Sink]] to wrap. - */ - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12") - def withBackoff[T]( - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double, - maxRestarts: Int, - sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { - val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff) - withBackoff(settings, sinkFactory) - } - - /** - * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]] - * is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into - * this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted. - * This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right - * before this [[Sink]] in the graph. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already - * sent may have been lost. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param sinkFactory A factory for producing the [[Sink]] to wrap. - */ - @deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.", - since = "Akka 2.6.10") - def withBackoff[T]( - minBackoff: java.time.Duration, - maxBackoff: java.time.Duration, - randomFactor: Double, - maxRestarts: Int, - sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { - val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff) - withBackoff(settings, sinkFactory) - } - /** * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential * backoff. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/RestartSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/RestartSource.scala index 1c96aa20a9..b27f8cb688 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/RestartSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/RestartSource.scala @@ -13,8 +13,6 @@ package org.apache.pekko.stream.javadsl -import scala.concurrent.duration.FiniteDuration - import org.apache.pekko import pekko.NotUsed import pekko.japi.function.Creator @@ -29,136 +27,6 @@ import pekko.stream.RestartSettings */ object RestartSource { - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]] - * is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]]. - * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - */ - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12") - def withBackoff[T]( - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double, - sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { - val settings = RestartSettings(minBackoff, maxBackoff, randomFactor) - withBackoff(settings, sourceFactory) - } - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]] - * is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]]. - * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - */ - @deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.", - since = "Akka 2.6.10") - def withBackoff[T]( - minBackoff: java.time.Duration, - maxBackoff: java.time.Duration, - randomFactor: Double, - sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { - val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor) - withBackoff(settings, sourceFactory) - } - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion - * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled - * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, - * and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - */ - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12") - def withBackoff[T]( - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double, - maxRestarts: Int, - sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { - val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff) - withBackoff(settings, sourceFactory) - } - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion - * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled - * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, - * and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - */ - @deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.", - since = "Akka 2.6.10") - def withBackoff[T]( - minBackoff: java.time.Duration, - maxBackoff: java.time.Duration, - randomFactor: Double, - maxRestarts: Int, - sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { - val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff) - withBackoff(settings, sourceFactory) - } - /** * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential * backoff. @@ -182,130 +50,6 @@ object RestartSource { } .asJava - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. - * - * This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by - * restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]]. - * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - */ - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12") - def onFailuresWithBackoff[T]( - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double, - sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { - val settings = RestartSettings(minBackoff, maxBackoff, randomFactor) - onFailuresWithBackoff(settings, sourceFactory) - } - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. - * - * This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by - * restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]]. - * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - */ - @deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.", - since = "Akka 2.6.10") - def onFailuresWithBackoff[T]( - minBackoff: java.time.Duration, - maxBackoff: java.time.Duration, - randomFactor: Double, - sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { - val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor) - onFailuresWithBackoff(settings, sourceFactory) - } - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. - * - * This [[Source]] will not emit a failure as long as maxRestarts is not reached, since failure of the wrapped [[Source]] - * is handled by restarting it. The wrapped [[Source]] can be cancelled - * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, - * and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by - * introducing a [[KillSwitch]] right after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - */ - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12") - def onFailuresWithBackoff[T]( - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double, - maxRestarts: Int, - sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { - val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff) - onFailuresWithBackoff(settings, sourceFactory) - } - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. - * - * This [[Source]] will not emit a failure as long as maxRestarts is not reached, since failure of the wrapped [[Source]] - * is handled by restarting it. The wrapped [[Source]] can be cancelled - * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, - * and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by - * introducing a [[KillSwitch]] right after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - */ - @deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.", - since = "Akka 2.6.10") - def onFailuresWithBackoff[T]( - minBackoff: java.time.Duration, - maxBackoff: java.time.Duration, - randomFactor: Double, - maxRestarts: Int, - sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { - val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff) - onFailuresWithBackoff(settings, sourceFactory) - } - /** * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/CoupledTerminationFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/CoupledTerminationFlow.scala index f3743db7c9..aa21da6b9d 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/CoupledTerminationFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/CoupledTerminationFlow.scala @@ -17,64 +17,6 @@ import org.apache.pekko import pekko.stream._ import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } -/** - * Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow from them. - * Similar to `Flow.fromSinkAndSource` however that API does not connect the completion signals of the wrapped operators. - */ -object CoupledTerminationFlow { - - /** - * Similar to [[Flow.fromSinkAndSource]] however couples the termination of these two operators. - * - * E.g. if the emitted [[Flow]] gets a cancellation, the [[Source]] of course is cancelled, - * however the Sink will also be completed. The table below illustrates the effects in detail: - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - *
Returned FlowSink (in)Source (out)
cause: upstream (sink-side) receives completioneffect: receives completioneffect: receives cancel
cause: upstream (sink-side) receives erroreffect: receives erroreffect: receives cancel
cause: downstream (source-side) receives canceleffect: completeseffect: receives cancel
effect: cancels upstream, completes downstreameffect: completescause: signals complete
effect: cancels upstream, errors downstreameffect: receives errorcause: signals error or throws
effect: cancels upstream, completes downstreamcause: cancelseffect: receives cancel
- * - * The order in which the `in` and `out` sides receive their respective completion signals is not defined, do not rely on its ordering. - */ - @deprecated("Use `Flow.fromSinkAndSourceCoupledMat(..., ...)(Keep.both)` instead", "Akka 2.5.2") - def fromSinkAndSource[I, O, M1, M2](in: Sink[I, M1], out: Source[O, M2]): Flow[I, O, (M1, M2)] = - Flow.fromSinkAndSourceCoupledMat(in, out)(Keep.both) - -} - /** INTERNAL API */ private[stream] final class CoupledTerminationBidi[I, O] extends GraphStage[BidiShape[I, I, O, O]] { val in1: Inlet[I] = Inlet("CoupledCompletion.in1") diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala index bf28108523..0790907517 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala @@ -806,12 +806,6 @@ object Partition { final class Partition[T](val outputPorts: Int, val partitioner: T => Int, val eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]] { - /** - * Sets `eagerCancel` to `false`. - */ - @deprecated("Use the constructor which also specifies the `eagerCancel` parameter", "Akka 2.5.10") - def this(outputPorts: Int, partitioner: T => Int) = this(outputPorts, partitioner, false) - val in: Inlet[T] = Inlet[T]("Partition.in") val out: Seq[Outlet[T]] = Seq.tabulate(outputPorts)(i => Outlet[T]("Partition.out" + i)) // FIXME BC make this immutable.IndexedSeq as type + Vector as concrete impl override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*) @@ -947,9 +941,6 @@ final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean, // one output might seem counter intuitive but saves us from special handling in other places require(outputPorts >= 1, "A Balance must have one or more output ports") - @deprecated("Use the constructor which also specifies the `eagerCancel` parameter", since = "Akka 2.5.12") - def this(outputPorts: Int, waitForAllDownstreams: Boolean) = this(outputPorts, waitForAllDownstreams, false) - val in: Inlet[T] = Inlet[T]("Balance.in") val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i => Outlet[T]("Balance.out" + i)) override def initialAttributes = DefaultAttributes.balance @@ -1224,9 +1215,6 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] => O)(n: Int) extends GraphStage[U override val shape = new UniformFanInShape[A, O](n) def out: Outlet[O] = shape.out - @deprecated("use `shape.inlets` or `shape.in(id)` instead", "Akka 2.5.5") - def inSeq: immutable.IndexedSeq[Inlet[A]] = shape.inlets.asInstanceOf[immutable.IndexedSeq[Inlet[A]]] - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { var pending = 0 diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala index 5ac86e0c30..8a90aa3212 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala @@ -37,73 +37,6 @@ import pekko.util.OptionVal */ object RestartFlow { - /** - * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or - * completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination - * signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]] - * will be allowed to terminate without being restarted. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, - * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param flowFactory A factory for producing the [[Flow]] to wrap. - */ - @deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.", - since = "Akka 2.6.10") - def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)( - flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] = { - val settings = RestartSettings(minBackoff, maxBackoff, randomFactor) - withBackoff(settings)(flowFactory) - } - - /** - * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or - * completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts - * is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's - * running, and then the [[Flow]] will be allowed to terminate without being restarted. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, - * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param flowFactory A factory for producing the [[Flow]] to wrap. - */ - @deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.", - since = "Akka 2.6.10") - def withBackoff[In, Out]( - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double, - maxRestarts: Int)(flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] = { - val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff) - withBackoff(settings)(flowFactory) - } - /** * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential * backoff. @@ -125,43 +58,6 @@ object RestartFlow { def withBackoff[In, Out](settings: RestartSettings)(flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] = Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, settings, onlyOnFailures = false)) - /** - * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails using an exponential - * backoff. Notice that this [[Flow]] will not restart on completion of the wrapped flow. - * - * This [[Flow]] will not emit any failure - * The failures by the wrapped [[Flow]] will be handled by - * restarting the wrapping [[Flow]] as long as maxRestarts is not reached. - * Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's - * running, and then the [[Flow]] will be allowed to terminate without being restarted. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, - * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param flowFactory A factory for producing the [[Flow]] to wrap. - */ - @deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.", - since = "Akka 2.6.10") - def onFailuresWithBackoff[In, Out]( - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double, - maxRestarts: Int)(flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] = { - val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff) - onFailuresWithBackoff(settings)(flowFactory) - } - /** * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails using an exponential * backoff. Notice that this [[Flow]] will not restart on completion of the wrapped flow. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartSink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartSink.scala index 02743ce1ef..1bb1fd3153 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartSink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartSink.scala @@ -13,8 +13,6 @@ package org.apache.pekko.stream.scaladsl -import scala.concurrent.duration.FiniteDuration - import org.apache.pekko import pekko.NotUsed import pekko.stream.{ Attributes, Inlet, RestartSettings, SinkShape } @@ -29,72 +27,6 @@ import pekko.stream.stage.{ GraphStage, GraphStageLogic } */ object RestartSink { - /** - * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it. - * The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that - * happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered - * simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the - * graph. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already - * sent may have been lost. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param sinkFactory A factory for producing the [[Sink]] to wrap. - */ - @deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.", - since = "Akka 2.6.10") - def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)( - sinkFactory: () => Sink[T, _]): Sink[T, NotUsed] = { - val settings = RestartSettings(minBackoff, maxBackoff, randomFactor) - withBackoff(settings)(sinkFactory) - } - - /** - * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]] - * is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into - * this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted. - * This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right - * before this [[Sink]] in the graph. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already - * sent may have been lost. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param sinkFactory A factory for producing the [[Sink]] to wrap. - */ - @deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.", - since = "Akka 2.6.10") - def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)( - sinkFactory: () => Sink[T, _]): Sink[T, NotUsed] = { - val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff) - withBackoff(settings)(sinkFactory) - } - /** * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential * backoff. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartSource.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartSource.scala index aba594accd..9524de8af0 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartSource.scala @@ -13,8 +13,6 @@ package org.apache.pekko.stream.scaladsl -import scala.concurrent.duration.FiniteDuration - import org.apache.pekko import pekko.NotUsed import pekko.stream.{ Attributes, Outlet, RestartSettings, SourceShape } @@ -29,65 +27,6 @@ import pekko.stream.stage.{ GraphStage, GraphStageLogic } */ object RestartSource { - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]] - * is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]]. - * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - */ - @deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.", - since = "Akka 2.6.10") - def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)( - sourceFactory: () => Source[T, _]): Source[T, NotUsed] = { - val settings = RestartSettings(minBackoff, maxBackoff, randomFactor) - withBackoff(settings)(sourceFactory) - } - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion - * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled - * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, - * and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - */ - @deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.", - since = "Akka 2.6.10") - def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)( - sourceFactory: () => Source[T, _]): Source[T, NotUsed] = { - val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff) - withBackoff(settings)(sourceFactory) - } - /** * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential * backoff. @@ -107,67 +46,6 @@ object RestartSource { def withBackoff[T](settings: RestartSettings)(sourceFactory: () => Source[T, _]): Source[T, NotUsed] = Source.fromGraph(new RestartWithBackoffSource(sourceFactory, settings, onlyOnFailures = false)) - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. - * - * This [[Source]] will not emit a failure as long as maxRestarts is not reached, since failure of the wrapped [[Source]] - * is handled by restarting it. The wrapped [[Source]] can be cancelled - * by cancelling this [[Source]]. - * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - */ - @deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.", - since = "Akka 2.6.10") - def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)( - sourceFactory: () => Source[T, _]): Source[T, NotUsed] = { - val settings = RestartSettings(minBackoff, maxBackoff, randomFactor) - onFailuresWithBackoff(settings)(sourceFactory) - } - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. - * - * This [[Source]] will not emit a failure as long as maxRestarts is not reached, since failure of the wrapped [[Source]] - * is handled by restarting it. The wrapped [[Source]] can be cancelled - * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, - * and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - */ - @deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.", - since = "Akka 2.6.10") - def onFailuresWithBackoff[T]( - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double, - maxRestarts: Int)(sourceFactory: () => Source[T, _]): Source[T, NotUsed] = { - val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff) - onFailuresWithBackoff(settings)(sourceFactory) - } - /** * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala index c77b293077..609a8afe9c 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala @@ -1833,66 +1833,6 @@ abstract class TimerGraphStageLogic(_shape: Shape) extends GraphStageLogic(_shap scheduleAtFixedRate(timerKey, initialDelay.asScala, interval.asScala) } - /** - * Schedule timer to call [[#onTimer]] periodically with the given interval after the specified - * initial delay. - * Any existing timer with the same key will automatically be canceled before - * adding the new timer. - */ - @deprecated( - "Use scheduleWithFixedDelay or scheduleAtFixedRate instead. This has the same semantics as " + - "scheduleAtFixedRate, but scheduleWithFixedDelay is often preferred.", - since = "Akka 2.6.0") - final protected def schedulePeriodicallyWithInitialDelay( - timerKey: Any, - initialDelay: FiniteDuration, - interval: FiniteDuration): Unit = - scheduleAtFixedRate(timerKey, initialDelay, interval) - - /** - * Schedule timer to call [[#onTimer]] periodically with the given interval after the specified - * initial delay. - * Any existing timer with the same key will automatically be canceled before - * adding the new timer. - */ - @deprecated( - "Use scheduleWithFixedDelay or scheduleAtFixedRate instead. This has the same semantics as " + - "scheduleAtFixedRate, but scheduleWithFixedDelay is often preferred.", - since = "Akka 2.6.0") - final protected def schedulePeriodicallyWithInitialDelay( - timerKey: Any, - initialDelay: java.time.Duration, - interval: java.time.Duration): Unit = { - import pekko.util.JavaDurationConverters._ - schedulePeriodicallyWithInitialDelay(timerKey, initialDelay.asScala, interval.asScala) - } - - /** - * Schedule timer to call [[#onTimer]] periodically with the given interval. - * Any existing timer with the same key will automatically be canceled before - * adding the new timer. - */ - @deprecated( - "Use scheduleWithFixedDelay or scheduleAtFixedRate instead. This has the same semantics as " + - "scheduleAtFixedRate, but scheduleWithFixedDelay is often preferred.", - since = "Akka 2.6.0") - final protected def schedulePeriodically(timerKey: Any, interval: FiniteDuration): Unit = - schedulePeriodicallyWithInitialDelay(timerKey, interval, interval) - - /** - * Schedule timer to call [[#onTimer]] periodically with the given interval. - * Any existing timer with the same key will automatically be canceled before - * adding the new timer. - */ - @deprecated( - "Use scheduleWithFixedDelay or scheduleAtFixedRate instead. This has the same semantics as " + - "scheduleAtFixedRate, but scheduleWithFixedDelay is often preferred.", - since = "Akka 2.6.0") - final protected def schedulePeriodically(timerKey: Any, interval: java.time.Duration): Unit = { - import pekko.util.JavaDurationConverters._ - schedulePeriodically(timerKey, interval.asScala) - } - /** * Cancel timer, ensuring that the [[#onTimer]] is not subsequently called. * @@ -1956,14 +1896,7 @@ trait OutHandler { @throws(classOf[Exception]) def onPull(): Unit - /** - * Called when the output port will no longer accept any new elements. After this callback no other callbacks will - * be called for this port. - */ - @throws(classOf[Exception]) - @deprecatedOverriding("Override `def onDownstreamFinish(cause: Throwable)`, instead.", since = "Akka 2.6.0") // warns when overriding - @deprecated("Call onDownstreamFinish with a cancellation cause.", since = "Akka 2.6.0") // warns when calling - def onDownstreamFinish(): Unit = { + private def onDownstreamFinish(): Unit = { val thisStage = GraphInterpreter.currentInterpreter.activeStage require( thisStage.lastCancellationCause ne null,