remove some deprecated code from streams module (#1958)

* remove some deprecated code from streams module

* Create remove-deprecated-methods.excludes
This commit is contained in:
PJ Fanning 2025-07-31 08:30:22 +01:00 committed by GitHub
parent af6b408cfd
commit efc50b993e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 54 additions and 1275 deletions

View file

@ -27,21 +27,6 @@ trait GraphApply {
createGraph(s, builder)
}
/**
* Creates a new [[Graph]] by importing the given graph `g1` and passing its [[Shape]]
* along with the [[GraphDSL.Builder]] to the given create function.
*
* Deprecated: this method signature does not work with Scala 3 type inference, kept for binary compatibility. Use createGraph instead.
*/
@deprecated("Use createGraph instead", "Akka 2.6.16")
def create[S <: Shape, Mat](g1: Graph[Shape, Mat])(buildBlock: GraphDSL.Builder[Mat] => (g1.Shape) => S): Graph[S, Mat] = {
val builder = new GraphDSL.Builder
val s1 = builder.add(g1, Keep.right)
val s = buildBlock(builder)(s1)
createGraph(s, builder)
}
/**
* Creates a new [[Graph]] by importing the given graph `g1` and passing its [[Shape]]
* along with the [[GraphDSL.Builder]] to the given create function.
@ -57,24 +42,6 @@ trait GraphApply {
[2..#
/**
* Creates a new [[Graph]] by importing the given graphs and passing their [[Shape]]s
* along with the [[GraphDSL.Builder]] to the given create function.
*
* Deprecated: this method signature does not work with Scala 3 type inference, kept for binary compatibility. Use createGraph instead.
*/
@deprecated("Use createGraph instead", "Akka ##2.6.##16")
def create[S <: Shape, Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) => Mat)(buildBlock: GraphDSL.Builder[Mat] => ([#g1.Shape#]) => S): Graph[S, Mat] = {
val builder = new GraphDSL.Builder
val curried = combineMat.curried
val s##1 = builder.add(g##1, (m##1: M##1) => curried(m##1))
[2..#val s1 = builder.add(g1, (f: M1 => Any, m1: M1) => f(m1))#
]
val s = buildBlock(builder)([#s1#])
createGraph(s, builder)
}
/**
* Creates a new [[Graph]] by importing the given graphs and passing their [[Shape]]s
* along with the [[GraphDSL.Builder]] to the given create function.

View file

@ -0,0 +1,53 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Remove deprecated methods
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.FanInShape1N")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.UniformFanInShape.inSeq")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.UniformFanOutShape.outArray")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ConstantFun*")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter#BatchingActorInputBoundary.onDownstreamFinish")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.impl.fusing.PrefixAndTail#PrefixAndTailLogic.onDownstreamFinish")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.io.ByteStringParser$ParsingException")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.impl.io.ByteStringParser#ParsingLogic.onDownstreamFinish")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.javadsl.CoupledTerminationFlow*")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartFlow.onFailuresWithBackoff")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartFlow.withBackoff")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartSink.withBackoff")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartSource.onFailuresWithBackoff")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartSource.withBackoff")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Balance.this")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.scaladsl.CoupledTerminationFlow*")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.GraphApply.create")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.GraphDSL.create")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.MergeHub#MergedSourceLogic.onDownstreamFinish")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Partition.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.RestartFlow.onFailuresWithBackoff")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.RestartFlow.withBackoff")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.RestartSink.withBackoff")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.RestartSource.onFailuresWithBackoff")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.RestartSource.withBackoff")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.ZipWithN.inSeq")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.AbstractInOutHandler.onDownstreamFinish")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.AbstractOutHandler.onDownstreamFinish")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.GraphStageLogic#ConditionalTerminateOutput.onDownstreamFinish")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.GraphStageLogic#EagerTerminateOutput.onDownstreamFinish")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.GraphStageLogic#Emitting.onDownstreamFinish")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.GraphStageLogic#IgnoreTerminateOutput.onDownstreamFinish")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.OutHandler.onDownstreamFinish")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.TimerGraphStageLogic.schedulePeriodicallyWithInitialDelay")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.TimerGraphStageLogic.schedulePeriodically")

View file

@ -1,51 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.stream
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
@deprecated(
"FanInShape1N was removed because it was not used anywhere. Use a custom shape extending from FanInShape directly.",
"Akka 2.5.5")
class FanInShape1N[-T0, -T1, +O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) {
// ports get added to `FanInShape.inlets` as a side-effect of calling `newInlet`
val in0: Inlet[T0 @uncheckedVariance] = newInlet[T0]("in0")
for (i <- 1 until n) newInlet[T1](s"in$i")
def this(n: Int) = this(n, FanInShape.Name[O]("FanInShape1N"))
def this(n: Int, name: String) = this(n, FanInShape.Name[O](name))
def this(
outlet: Outlet[O @uncheckedVariance],
in0: Inlet[T0 @uncheckedVariance],
inlets1: Array[Inlet[T1 @uncheckedVariance]]) =
this(inlets1.length, FanInShape.Ports(outlet, in0 :: inlets1.toList))
override protected def construct(init: FanInShape.Init[O @uncheckedVariance]): FanInShape[O] =
new FanInShape1N(n, init)
override def deepCopy(): FanInShape1N[T0, T1, O] = super.deepCopy().asInstanceOf[FanInShape1N[T0, T1, O]]
@deprecated("Use 'inlets' or 'in(id)' instead.", "Akka 2.5.5")
def in1Seq: immutable.IndexedSeq[Inlet[T1 @uncheckedVariance]] = _in1Seq
// cannot deprecate a lazy val because of genjavadoc problem https://github.com/typesafehub/genjavadoc/issues/85
private lazy val _in1Seq: immutable.IndexedSeq[Inlet[T1 @uncheckedVariance]] =
inlets.tail // head is in0
.toIndexedSeq.asInstanceOf[immutable.IndexedSeq[Inlet[T1]]]
def in(n: Int): Inlet[T1 @uncheckedVariance] = {
require(n > 0, "n must be > 0")
inlets(n).asInstanceOf[Inlet[T1]]
}
}

View file

@ -42,10 +42,5 @@ class UniformFanInShape[-T, +O](val n: Int, _init: FanInShape.Init[O]) extends F
final override def inlets: immutable.Seq[Inlet[T @uncheckedVariance]] =
super.inlets.asInstanceOf[immutable.Seq[Inlet[T]]]
@deprecated("Use 'inlets' or 'in(id)' instead.", "Akka 2.5.5")
def inSeq: immutable.IndexedSeq[Inlet[T @uncheckedVariance]] = _inSeq
// cannot deprecate a lazy val because of genjavadoc problem https://github.com/typesafehub/genjavadoc/issues/85
private lazy val _inSeq: immutable.IndexedSeq[Inlet[T @uncheckedVariance]] = inlets.toIndexedSeq
def in(n: Int): Inlet[T @uncheckedVariance] = inlets(n)
}

View file

@ -36,10 +36,5 @@ class UniformFanOutShape[-I, +O](n: Int, _init: FanOutShape.Init[I @uncheckedVar
final override def outlets: immutable.Seq[Outlet[O @uncheckedVariance]] =
super.outlets.asInstanceOf[immutable.Seq[Outlet[O]]]
@deprecated("use 'outlets' or 'out(id)' instead", "Akka 2.5.5")
def outArray: Array[Outlet[O @uncheckedVariance]] = _outArray
// cannot deprecate a lazy val because of genjavadoc problem https://github.com/typesafehub/genjavadoc/issues/85
private lazy val _outArray: Array[Outlet[O @uncheckedVariance]] = outlets.toArray
def out(n: Int): Outlet[O @uncheckedVariance] = outlets(n)
}

View file

@ -1,56 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.stream.impl
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.japi.{ Pair => JPair }
import pekko.japi.function.{ Function => JFun, Function2 => JFun2 }
/**
* INTERNAL API
*/
@deprecated("Use org.apache.pekko.util.ConstantFun instead", "Akka 2.5.0")
@InternalApi private[pekko] object ConstantFun {
private[this] val JavaIdentityFunction = new JFun[Any, Any] {
@throws(classOf[Exception]) override def apply(param: Any): Any = param
}
val JavaPairFunction = new JFun2[AnyRef, AnyRef, AnyRef JPair AnyRef] {
def apply(p1: AnyRef, p2: AnyRef): AnyRef JPair AnyRef = JPair(p1, p2)
}
def javaCreatePairFunction[A, B]: JFun2[A, B, JPair[A, B]] = JavaPairFunction.asInstanceOf[JFun2[A, B, JPair[A, B]]]
def javaIdentityFunction[T]: JFun[T, T] = JavaIdentityFunction.asInstanceOf[JFun[T, T]]
def scalaIdentityFunction[T]: T => T = conforms.asInstanceOf[Function[T, T]]
def scalaAnyToNone[A, B]: A => Option[B] = none
def scalaAnyTwoToNone[A, B, C]: (A, B) => Option[C] = two2none
def javaAnyToNone[A, B]: A => Option[B] = none
val conforms = (a: Any) => a
val zeroLong = (_: Any) => 0L
val oneLong = (_: Any) => 1L
val oneInt = (_: Any) => 1
val none = (_: Any) => None
val two2none = (_: Any, _: Any) => None
}

View file

@ -187,9 +187,6 @@ import pekko.util.ByteString
throw new IllegalStateException("no initial parser installed: you must use startWith(...)")
}
@deprecated("Deprecated for internal usage. Will not be emitted any more.", "Akka 2.6.20")
class ParsingException(msg: String, cause: Throwable) extends RuntimeException(msg, cause)
val NeedMoreData = new Exception with NoStackTrace
class ByteReader(input: ByteString) {

View file

@ -1,73 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2017-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.stream.javadsl
import org.apache.pekko
/**
* Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow from them.
* Similar to `Flow.fromSinkAndSource` however that API does not connect the completion signals of the wrapped operators.
*/
object CoupledTerminationFlow {
/**
* Similar to [[Flow.fromSinkAndSource]] however couples the termination of these two operators.
*
* E.g. if the emitted [[Flow]] gets a cancellation, the [[Source]] of course is cancelled,
* however the Sink will also be completed. The table below illustrates the effects in detail:
*
* <table>
* <tr>
* <th>Returned Flow</th>
* <th>Sink (<code>in</code>)</th>
* <th>Source (<code>out</code>)</th>
* </tr>
* <tr>
* <td><i>cause:</i> upstream (sink-side) receives completion</td>
* <td><i>effect:</i> receives completion</td>
* <td><i>effect:</i> receives cancel</td>
* </tr>
* <tr>
* <td><i>cause:</i> upstream (sink-side) receives error</td>
* <td><i>effect:</i> receives error</td>
* <td><i>effect:</i> receives cancel</td>
* </tr>
* <tr>
* <td><i>cause:</i> downstream (source-side) receives cancel</td>
* <td><i>effect:</i> completes</td>
* <td><i>effect:</i> receives cancel</td>
* </tr>
* <tr>
* <td><i>effect:</i> cancels upstream, completes downstream</td>
* <td><i>effect:</i> completes</td>
* <td><i>cause:</i> signals complete</td>
* </tr>
* <tr>
* <td><i>effect:</i> cancels upstream, errors downstream</td>
* <td><i>effect:</i> receives error</td>
* <td><i>cause:</i> signals error or throws</td>
* </tr>
* <tr>
* <td><i>effect:</i> cancels upstream, completes downstream</td>
* <td><i>cause:</i> cancels</td>
* <td><i>effect:</i> receives cancel</td>
* </tr>
* </table>
*
* The order in which the `in` and `out` sides receive their respective completion signals is not defined, do not rely on its ordering.
*/
@deprecated("Use `Flow.fromSinkAndSourceCoupledMat(..., ..., Keep.both())` instead", "Akka 2.5.2")
def fromSinkAndSource[I, O, M1, M2](in: Sink[I, M1], out: Source[O, M2]): Flow[I, O, (M1, M2)] =
pekko.stream.scaladsl.Flow.fromSinkAndSourceCoupledMat(in, out)(pekko.stream.scaladsl.Keep.both).asJava
}

View file

@ -13,8 +13,6 @@
package org.apache.pekko.stream.javadsl
import scala.concurrent.duration.FiniteDuration
import org.apache.pekko
import pekko.NotUsed
import pekko.japi.function.Creator
@ -29,146 +27,6 @@ import pekko.stream.RestartSettings
*/
object RestartFlow {
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination
* signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]]
* will be allowed to terminate without being restarted.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
def withBackoff[In, Out](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
withBackoff(settings, flowFactory)
}
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination
* signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]]
* will be allowed to terminate without being restarted.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
@deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.",
since = "Akka 2.6.10")
def withBackoff[In, Out](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor)
withBackoff(settings, flowFactory)
}
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts
* is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's
* running, and then the [[Flow]] will be allowed to terminate without being restarted.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
def withBackoff[In, Out](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRestarts: Int,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
withBackoff(settings, flowFactory)
}
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts
* is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's
* running, and then the [[Flow]] will be allowed to terminate without being restarted.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
@deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.",
since = "Akka 2.6.10")
def withBackoff[In, Out](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
maxRestarts: Int,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
withBackoff(settings, flowFactory)
}
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
* backoff.
@ -194,79 +52,6 @@ object RestartFlow {
}
.asJava
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts
* using an exponential backoff.
*
* This new [[Flow]] will not emit failures. Any failure by the original [[Flow]] (the wrapped one) before that
* time will be handled by restarting it as long as maxRestarts is not reached.
* However, any termination signals, completion or cancellation sent to this [[Flow]] will terminate
* the wrapped [[Flow]], if it's running, and then the [[Flow]] will be allowed to terminate without being restarted.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
def onFailuresWithBackoff[In, Out](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRestarts: Int,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
onFailuresWithBackoff(settings, flowFactory)
}
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts
* using an exponential backoff.
*
* This new [[Flow]] will not emit failures. Any failure by the original [[Flow]] (the wrapped one) before that
* time will be handled by restarting it as long as maxRestarts is not reached.
* However, any termination signals, completion or cancellation sent to this [[Flow]] will terminate
* the wrapped [[Flow]], if it's running, and then the [[Flow]] will be allowed to terminate without being restarted.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
@deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.",
since = "Akka 2.6.10")
def onFailuresWithBackoff[In, Out](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
maxRestarts: Int,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
onFailuresWithBackoff(settings, flowFactory)
}
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts
* using an exponential backoff.

View file

@ -13,8 +13,6 @@
package org.apache.pekko.stream.javadsl
import scala.concurrent.duration.FiniteDuration
import org.apache.pekko
import pekko.NotUsed
import pekko.japi.function.Creator
@ -29,150 +27,6 @@ import pekko.stream.RestartSettings
*/
object RestartSink {
/**
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it.
* The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that
* happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered
* simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the
* graph.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
def withBackoff[T](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
withBackoff(settings, sinkFactory)
}
/**
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it.
* The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that
* happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered
* simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the
* graph.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/
@deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.",
since = "Akka 2.6.10")
def withBackoff[T](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor)
withBackoff(settings, sinkFactory)
}
/**
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]]
* is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into
* this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted.
* This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right
* before this [[Sink]] in the graph.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
def withBackoff[T](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRestarts: Int,
sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
withBackoff(settings, sinkFactory)
}
/**
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]]
* is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into
* this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted.
* This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right
* before this [[Sink]] in the graph.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/
@deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.",
since = "Akka 2.6.10")
def withBackoff[T](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
maxRestarts: Int,
sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
withBackoff(settings, sinkFactory)
}
/**
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
* backoff.

View file

@ -13,8 +13,6 @@
package org.apache.pekko.stream.javadsl
import scala.concurrent.duration.FiniteDuration
import org.apache.pekko
import pekko.NotUsed
import pekko.japi.function.Creator
@ -29,136 +27,6 @@ import pekko.stream.RestartSettings
*/
object RestartSource {
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]]
* is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]].
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
def withBackoff[T](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
withBackoff(settings, sourceFactory)
}
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]]
* is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]].
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
@deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.",
since = "Akka 2.6.10")
def withBackoff[T](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor)
withBackoff(settings, sourceFactory)
}
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
* and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
def withBackoff[T](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRestarts: Int,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
withBackoff(settings, sourceFactory)
}
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
* and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
@deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.",
since = "Akka 2.6.10")
def withBackoff[T](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
maxRestarts: Int,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
withBackoff(settings, sourceFactory)
}
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
* backoff.
@ -182,130 +50,6 @@ object RestartSource {
}
.asJava
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
*
* This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by
* restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]].
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
def onFailuresWithBackoff[T](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
onFailuresWithBackoff(settings, sourceFactory)
}
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
*
* This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by
* restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]].
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
@deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.",
since = "Akka 2.6.10")
def onFailuresWithBackoff[T](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor)
onFailuresWithBackoff(settings, sourceFactory)
}
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
*
* This [[Source]] will not emit a failure as long as maxRestarts is not reached, since failure of the wrapped [[Source]]
* is handled by restarting it. The wrapped [[Source]] can be cancelled
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
* and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by
* introducing a [[KillSwitch]] right after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
def onFailuresWithBackoff[T](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRestarts: Int,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
onFailuresWithBackoff(settings, sourceFactory)
}
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
*
* This [[Source]] will not emit a failure as long as maxRestarts is not reached, since failure of the wrapped [[Source]]
* is handled by restarting it. The wrapped [[Source]] can be cancelled
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
* and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by
* introducing a [[KillSwitch]] right after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
@deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.",
since = "Akka 2.6.10")
def onFailuresWithBackoff[T](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
maxRestarts: Int,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
onFailuresWithBackoff(settings, sourceFactory)
}
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
*

View file

@ -17,64 +17,6 @@ import org.apache.pekko
import pekko.stream._
import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
/**
* Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow from them.
* Similar to `Flow.fromSinkAndSource` however that API does not connect the completion signals of the wrapped operators.
*/
object CoupledTerminationFlow {
/**
* Similar to [[Flow.fromSinkAndSource]] however couples the termination of these two operators.
*
* E.g. if the emitted [[Flow]] gets a cancellation, the [[Source]] of course is cancelled,
* however the Sink will also be completed. The table below illustrates the effects in detail:
*
* <table>
* <tr>
* <th>Returned Flow</th>
* <th>Sink (<code>in</code>)</th>
* <th>Source (<code>out</code>)</th>
* </tr>
* <tr>
* <td><i>cause:</i> upstream (sink-side) receives completion</td>
* <td><i>effect:</i> receives completion</td>
* <td><i>effect:</i> receives cancel</td>
* </tr>
* <tr>
* <td><i>cause:</i> upstream (sink-side) receives error</td>
* <td><i>effect:</i> receives error</td>
* <td><i>effect:</i> receives cancel</td>
* </tr>
* <tr>
* <td><i>cause:</i> downstream (source-side) receives cancel</td>
* <td><i>effect:</i> completes</td>
* <td><i>effect:</i> receives cancel</td>
* </tr>
* <tr>
* <td><i>effect:</i> cancels upstream, completes downstream</td>
* <td><i>effect:</i> completes</td>
* <td><i>cause:</i> signals complete</td>
* </tr>
* <tr>
* <td><i>effect:</i> cancels upstream, errors downstream</td>
* <td><i>effect:</i> receives error</td>
* <td><i>cause:</i> signals error or throws</td>
* </tr>
* <tr>
* <td><i>effect:</i> cancels upstream, completes downstream</td>
* <td><i>cause:</i> cancels</td>
* <td><i>effect:</i> receives cancel</td>
* </tr>
* </table>
*
* The order in which the `in` and `out` sides receive their respective completion signals is not defined, do not rely on its ordering.
*/
@deprecated("Use `Flow.fromSinkAndSourceCoupledMat(..., ...)(Keep.both)` instead", "Akka 2.5.2")
def fromSinkAndSource[I, O, M1, M2](in: Sink[I, M1], out: Source[O, M2]): Flow[I, O, (M1, M2)] =
Flow.fromSinkAndSourceCoupledMat(in, out)(Keep.both)
}
/** INTERNAL API */
private[stream] final class CoupledTerminationBidi[I, O] extends GraphStage[BidiShape[I, I, O, O]] {
val in1: Inlet[I] = Inlet("CoupledCompletion.in1")

View file

@ -806,12 +806,6 @@ object Partition {
final class Partition[T](val outputPorts: Int, val partitioner: T => Int, val eagerCancel: Boolean)
extends GraphStage[UniformFanOutShape[T, T]] {
/**
* Sets `eagerCancel` to `false`.
*/
@deprecated("Use the constructor which also specifies the `eagerCancel` parameter", "Akka 2.5.10")
def this(outputPorts: Int, partitioner: T => Int) = this(outputPorts, partitioner, false)
val in: Inlet[T] = Inlet[T]("Partition.in")
val out: Seq[Outlet[T]] = Seq.tabulate(outputPorts)(i => Outlet[T]("Partition.out" + i)) // FIXME BC make this immutable.IndexedSeq as type + Vector as concrete impl
override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*)
@ -947,9 +941,6 @@ final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean,
// one output might seem counter intuitive but saves us from special handling in other places
require(outputPorts >= 1, "A Balance must have one or more output ports")
@deprecated("Use the constructor which also specifies the `eagerCancel` parameter", since = "Akka 2.5.12")
def this(outputPorts: Int, waitForAllDownstreams: Boolean) = this(outputPorts, waitForAllDownstreams, false)
val in: Inlet[T] = Inlet[T]("Balance.in")
val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i => Outlet[T]("Balance.out" + i))
override def initialAttributes = DefaultAttributes.balance
@ -1224,9 +1215,6 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] => O)(n: Int) extends GraphStage[U
override val shape = new UniformFanInShape[A, O](n)
def out: Outlet[O] = shape.out
@deprecated("use `shape.inlets` or `shape.in(id)` instead", "Akka 2.5.5")
def inSeq: immutable.IndexedSeq[Inlet[A]] = shape.inlets.asInstanceOf[immutable.IndexedSeq[Inlet[A]]]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
var pending = 0

View file

@ -37,73 +37,6 @@ import pekko.util.OptionVal
*/
object RestartFlow {
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination
* signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]]
* will be allowed to terminate without being restarted.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
@deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.",
since = "Akka 2.6.10")
def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(
flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] = {
val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
withBackoff(settings)(flowFactory)
}
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts
* is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's
* running, and then the [[Flow]] will be allowed to terminate without being restarted.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
@deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.",
since = "Akka 2.6.10")
def withBackoff[In, Out](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRestarts: Int)(flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] = {
val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
withBackoff(settings)(flowFactory)
}
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
* backoff.
@ -125,43 +58,6 @@ object RestartFlow {
def withBackoff[In, Out](settings: RestartSettings)(flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] =
Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, settings, onlyOnFailures = false))
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails using an exponential
* backoff. Notice that this [[Flow]] will not restart on completion of the wrapped flow.
*
* This [[Flow]] will not emit any failure
* The failures by the wrapped [[Flow]] will be handled by
* restarting the wrapping [[Flow]] as long as maxRestarts is not reached.
* Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's
* running, and then the [[Flow]] will be allowed to terminate without being restarted.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
@deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.",
since = "Akka 2.6.10")
def onFailuresWithBackoff[In, Out](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRestarts: Int)(flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] = {
val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
onFailuresWithBackoff(settings)(flowFactory)
}
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails using an exponential
* backoff. Notice that this [[Flow]] will not restart on completion of the wrapped flow.

View file

@ -13,8 +13,6 @@
package org.apache.pekko.stream.scaladsl
import scala.concurrent.duration.FiniteDuration
import org.apache.pekko
import pekko.NotUsed
import pekko.stream.{ Attributes, Inlet, RestartSettings, SinkShape }
@ -29,72 +27,6 @@ import pekko.stream.stage.{ GraphStage, GraphStageLogic }
*/
object RestartSink {
/**
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it.
* The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that
* happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered
* simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the
* graph.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/
@deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.",
since = "Akka 2.6.10")
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(
sinkFactory: () => Sink[T, _]): Sink[T, NotUsed] = {
val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
withBackoff(settings)(sinkFactory)
}
/**
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]]
* is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into
* this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted.
* This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right
* before this [[Sink]] in the graph.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/
@deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.",
since = "Akka 2.6.10")
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(
sinkFactory: () => Sink[T, _]): Sink[T, NotUsed] = {
val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
withBackoff(settings)(sinkFactory)
}
/**
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
* backoff.

View file

@ -13,8 +13,6 @@
package org.apache.pekko.stream.scaladsl
import scala.concurrent.duration.FiniteDuration
import org.apache.pekko
import pekko.NotUsed
import pekko.stream.{ Attributes, Outlet, RestartSettings, SourceShape }
@ -29,65 +27,6 @@ import pekko.stream.stage.{ GraphStage, GraphStageLogic }
*/
object RestartSource {
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]]
* is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]].
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
@deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.",
since = "Akka 2.6.10")
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(
sourceFactory: () => Source[T, _]): Source[T, NotUsed] = {
val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
withBackoff(settings)(sourceFactory)
}
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
* and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
@deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.",
since = "Akka 2.6.10")
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(
sourceFactory: () => Source[T, _]): Source[T, NotUsed] = {
val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
withBackoff(settings)(sourceFactory)
}
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
* backoff.
@ -107,67 +46,6 @@ object RestartSource {
def withBackoff[T](settings: RestartSettings)(sourceFactory: () => Source[T, _]): Source[T, NotUsed] =
Source.fromGraph(new RestartWithBackoffSource(sourceFactory, settings, onlyOnFailures = false))
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
*
* This [[Source]] will not emit a failure as long as maxRestarts is not reached, since failure of the wrapped [[Source]]
* is handled by restarting it. The wrapped [[Source]] can be cancelled
* by cancelling this [[Source]].
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
@deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.",
since = "Akka 2.6.10")
def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(
sourceFactory: () => Source[T, _]): Source[T, NotUsed] = {
val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
onFailuresWithBackoff(settings)(sourceFactory)
}
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
*
* This [[Source]] will not emit a failure as long as maxRestarts is not reached, since failure of the wrapped [[Source]]
* is handled by restarting it. The wrapped [[Source]] can be cancelled
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
* and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[pekko.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
@deprecated("Use the overloaded method which accepts org.apache.pekko.stream.RestartSettings instead.",
since = "Akka 2.6.10")
def onFailuresWithBackoff[T](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRestarts: Int)(sourceFactory: () => Source[T, _]): Source[T, NotUsed] = {
val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
onFailuresWithBackoff(settings)(sourceFactory)
}
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
*

View file

@ -1833,66 +1833,6 @@ abstract class TimerGraphStageLogic(_shape: Shape) extends GraphStageLogic(_shap
scheduleAtFixedRate(timerKey, initialDelay.asScala, interval.asScala)
}
/**
* Schedule timer to call [[#onTimer]] periodically with the given interval after the specified
* initial delay.
* Any existing timer with the same key will automatically be canceled before
* adding the new timer.
*/
@deprecated(
"Use scheduleWithFixedDelay or scheduleAtFixedRate instead. This has the same semantics as " +
"scheduleAtFixedRate, but scheduleWithFixedDelay is often preferred.",
since = "Akka 2.6.0")
final protected def schedulePeriodicallyWithInitialDelay(
timerKey: Any,
initialDelay: FiniteDuration,
interval: FiniteDuration): Unit =
scheduleAtFixedRate(timerKey, initialDelay, interval)
/**
* Schedule timer to call [[#onTimer]] periodically with the given interval after the specified
* initial delay.
* Any existing timer with the same key will automatically be canceled before
* adding the new timer.
*/
@deprecated(
"Use scheduleWithFixedDelay or scheduleAtFixedRate instead. This has the same semantics as " +
"scheduleAtFixedRate, but scheduleWithFixedDelay is often preferred.",
since = "Akka 2.6.0")
final protected def schedulePeriodicallyWithInitialDelay(
timerKey: Any,
initialDelay: java.time.Duration,
interval: java.time.Duration): Unit = {
import pekko.util.JavaDurationConverters._
schedulePeriodicallyWithInitialDelay(timerKey, initialDelay.asScala, interval.asScala)
}
/**
* Schedule timer to call [[#onTimer]] periodically with the given interval.
* Any existing timer with the same key will automatically be canceled before
* adding the new timer.
*/
@deprecated(
"Use scheduleWithFixedDelay or scheduleAtFixedRate instead. This has the same semantics as " +
"scheduleAtFixedRate, but scheduleWithFixedDelay is often preferred.",
since = "Akka 2.6.0")
final protected def schedulePeriodically(timerKey: Any, interval: FiniteDuration): Unit =
schedulePeriodicallyWithInitialDelay(timerKey, interval, interval)
/**
* Schedule timer to call [[#onTimer]] periodically with the given interval.
* Any existing timer with the same key will automatically be canceled before
* adding the new timer.
*/
@deprecated(
"Use scheduleWithFixedDelay or scheduleAtFixedRate instead. This has the same semantics as " +
"scheduleAtFixedRate, but scheduleWithFixedDelay is often preferred.",
since = "Akka 2.6.0")
final protected def schedulePeriodically(timerKey: Any, interval: java.time.Duration): Unit = {
import pekko.util.JavaDurationConverters._
schedulePeriodically(timerKey, interval.asScala)
}
/**
* Cancel timer, ensuring that the [[#onTimer]] is not subsequently called.
*
@ -1956,14 +1896,7 @@ trait OutHandler {
@throws(classOf[Exception])
def onPull(): Unit
/**
* Called when the output port will no longer accept any new elements. After this callback no other callbacks will
* be called for this port.
*/
@throws(classOf[Exception])
@deprecatedOverriding("Override `def onDownstreamFinish(cause: Throwable)`, instead.", since = "Akka 2.6.0") // warns when overriding
@deprecated("Call onDownstreamFinish with a cancellation cause.", since = "Akka 2.6.0") // warns when calling
def onDownstreamFinish(): Unit = {
private def onDownstreamFinish(): Unit = {
val thisStage = GraphInterpreter.currentInterpreter.activeStage
require(
thisStage.lastCancellationCause ne null,