diff --git a/actor-testkit-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/actor-testkit-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
new file mode 100644
index 0000000000..0fb18f5c50
--- /dev/null
+++ b/actor-testkit-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Remove deprecated methods
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.actor.testkit.typed.scaladsl.LoggingTestKit.intercept")
diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/EffectfulActorContext.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/EffectfulActorContext.scala
index 6e38d38877..67e24c75e4 100644
--- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/EffectfulActorContext.scala
+++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/EffectfulActorContext.scala
@@ -111,9 +111,6 @@ import scala.reflect.ClassTag
override def startTimerAtFixedRate(key: Any, msg: T, initialDelay: FiniteDuration, interval: FiniteDuration): Unit =
startTimer(key, msg, interval, Effect.TimerScheduled.FixedRateModeWithInitialDelay(initialDelay))
- override def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit =
- startTimer(key, msg, interval, Effect.TimerScheduled.FixedRateMode)
-
override def startSingleTimer(key: Any, msg: T, delay: FiniteDuration): Unit =
startTimer(key, msg, delay, Effect.TimerScheduled.SingleMode)
diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/LoggingTestKitImpl.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/LoggingTestKitImpl.scala
index 555c4c4c22..35cfcd3b18 100644
--- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/LoggingTestKitImpl.scala
+++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/LoggingTestKitImpl.scala
@@ -123,10 +123,6 @@ import pekko.testkit.TestKit
override def expect[T](system: ActorSystem[_], code: Supplier[T]): T =
expect(code.get())(system)
- // deprecated (renamed to expect)
- override def intercept[T](code: => T)(implicit system: ActorSystem[_]): T =
- expect(code)(system)
-
private def checkLogback(system: ActorSystem[_]): Unit = {
if (!system.dynamicAccess.classIsOnClasspath("ch.qos.logback.classic.spi.ILoggingEvent")) {
throw new IllegalStateException("LoggingEventFilter requires logback-classic dependency in classpath.")
diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/LoggingTestKit.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/LoggingTestKit.scala
index 460fa67270..26c2ed13da 100644
--- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/LoggingTestKit.scala
+++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/LoggingTestKit.scala
@@ -109,17 +109,6 @@ import pekko.annotation.DoNotInherit
* Care is taken to remove the testkit when the block is finished or aborted.
*/
def expect[T](code: => T)(implicit system: ActorSystem[_]): T
-
- /**
- * Run the given code block and assert that the criteria of this `LoggingTestKit` has
- * matched within the configured `pekko.actor.testkit.typed.filter-leeway`
- * as often as requested by its `occurrences` parameter specifies.
- *
- * Care is taken to remove the testkit when the block is finished or aborted.
- */
- @deprecated("Use expect instead.", "Akka 2.6.0")
- def intercept[T](code: => T)(implicit system: ActorSystem[_]): T
-
}
/**
diff --git a/actor-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/actor-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
new file mode 100644
index 0000000000..c301da7df2
--- /dev/null
+++ b/actor-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Remove deprecated methods
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.actor.typed.javadsl.TimerScheduler.startPeriodicTimer")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.actor.typed.scaladsl.TimerScheduler.startPeriodicTimer")
diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/TimerSchedulerImpl.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/TimerSchedulerImpl.scala
index dc25fdfdec..b9b09d7605 100644
--- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/TimerSchedulerImpl.scala
+++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/TimerSchedulerImpl.scala
@@ -78,11 +78,6 @@ import scala.concurrent.duration.FiniteDuration
override final def startTimerAtFixedRate(key: Any, msg: T, initialDelay: Duration, interval: Duration): Unit =
startTimerAtFixedRate(key, msg, initialDelay.asScala, interval.asScala)
- override final def startPeriodicTimer(key: Any, msg: T, interval: Duration): Unit = {
- // this follows the deprecation note in the super class
- startTimerWithFixedDelay(key, msg, interval.asScala)
- }
-
override final def startSingleTimer(key: Any, msg: T, delay: Duration): Unit =
startSingleTimer(key, msg, delay.asScala)
}
@@ -110,9 +105,6 @@ import scala.concurrent.duration.FiniteDuration
override def startTimerWithFixedDelay(key: Any, msg: T, initialDelay: FiniteDuration, delay: FiniteDuration): Unit =
startTimer(key, msg, delay, FixedDelayMode(initialDelay))
- override def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit =
- startTimer(key, msg, interval, FixedRateMode(interval))
-
override def startSingleTimer(key: Any, msg: T, delay: FiniteDuration): Unit =
startTimer(key, msg, delay, SingleMode)
diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/TimerScheduler.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/TimerScheduler.scala
index b0083d9192..2f65cb18c4 100644
--- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/TimerScheduler.scala
+++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/TimerScheduler.scala
@@ -219,15 +219,6 @@ trait TimerScheduler[T] {
def startTimerAtFixedRate(msg: T, initialDelay: java.time.Duration, interval: java.time.Duration): Unit =
startTimerAtFixedRate(msg, msg, initialDelay, interval)
- /**
- * Deprecated API: See [[TimerScheduler#startTimerWithFixedDelay]] or [[TimerScheduler#startTimerAtFixedRate]].
- */
- @deprecated(
- "Use startTimerWithFixedDelay or startTimerAtFixedRate instead. This has the same semantics as " +
- "startTimerAtFixedRate, but startTimerWithFixedDelay is often preferred.",
- since = "Akka 2.6.0")
- def startPeriodicTimer(key: Any, msg: T, interval: Duration): Unit
-
/**
* Start a timer that will send `msg` once to the `self` actor after
* the given `delay`.
diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/TimerScheduler.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/TimerScheduler.scala
index e84b1f2436..f121810e2e 100644
--- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/TimerScheduler.scala
+++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/TimerScheduler.scala
@@ -219,15 +219,6 @@ trait TimerScheduler[T] {
def startTimerAtFixedRate(msg: T, initialDelay: FiniteDuration, interval: FiniteDuration): Unit =
startTimerAtFixedRate(msg, msg, initialDelay, interval)
- /**
- * Deprecated API: See [[TimerScheduler#startTimerWithFixedDelay]] or [[TimerScheduler#startTimerAtFixedRate]].
- */
- @deprecated(
- "Use startTimerWithFixedDelay or startTimerAtFixedRate instead. This has the same semantics as " +
- "startTimerAtFixedRate, but startTimerWithFixedDelay is often preferred.",
- since = "Akka 2.6.0")
- def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit
-
/**
* Start a timer that will send `msg` once to the `self` actor after
* the given `delay`.
diff --git a/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
index aa5f869fda..1ffacf5376 100644
--- a/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
+++ b/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
@@ -53,4 +53,6 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.pattern.Bac
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.pattern.BackoffSupervisor.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.pattern.CircuitBreaker.create")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.pattern.CircuitBreaker.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.pattern.Patterns.after")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.pattern.PatternsCS*")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.serialization.Serialization.deserialize")
diff --git a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala
index c263d852db..66f17a1a4f 100644
--- a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala
+++ b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala
@@ -477,26 +477,6 @@ object Patterns {
value: Callable[CompletionStage[T]]): CompletionStage[T] =
timeoutCompletionStage(duration.asScala, scheduler)(value.call())(context)
- /**
- * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable
- * after the specified duration.
- */
- @deprecated("Use the overload one which accepts a Callable of Future instead.", since = "Akka 2.5.22")
- def after[T](duration: FiniteDuration, scheduler: Scheduler, context: ExecutionContext, value: Future[T]): Future[T] =
- scalaAfter(duration, scheduler)(value)(context)
-
- /**
- * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided value
- * after the specified duration.
- */
- @deprecated("Use the overloaded one which accepts a Callable of CompletionStage instead.", since = "Akka 2.5.22")
- def after[T](
- duration: java.time.Duration,
- scheduler: Scheduler,
- context: ExecutionContext,
- value: CompletionStage[T]): CompletionStage[T] =
- afterCompletionStage(duration.asScala, scheduler)(value)(context)
-
/**
* Returns an internally retrying [[java.util.concurrent.CompletionStage]]
* The first attempt will be made immediately, each subsequent attempt will be made immediately
@@ -881,448 +861,3 @@ object Patterns {
attempted => delayFunction.apply(attempted).toScala.map(_.asScala))(context, scheduler).asJava
}
}
-
-/**
- * Java 8+ API for Pekko patterns such as `ask`, `pipe` and others which work with [[java.util.concurrent.CompletionStage]].
- *
- * For working with Scala [[scala.concurrent.Future]] from Java you may want to use [[pekko.pattern.Patterns]] instead.
- */
-@deprecated("Use Patterns instead.", since = "Akka 2.5.19")
-object PatternsCS {
- import scala.concurrent.duration._
-
- import pekko.actor.ActorRef
- import pekko.japi
- import pekko.pattern.{ ask => scalaAsk, gracefulStop => scalaGracefulStop, retry => scalaRetry }
- import pekko.util.Timeout
-
- /**
- * Java API for `org.apache.pekko.pattern.ask`:
- * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
- * holding the eventual reply message; this means that the target actor
- * needs to send the result to the `sender` reference provided.
- *
- * The CompletionStage will be completed with an [[pekko.pattern.AskTimeoutException]] after the
- * given timeout has expired; this is independent from any timeout applied
- * while awaiting a result for this future (i.e. in
- * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the
- * recipient actor didn't send a reply.
- *
- * Warning:
- * When using future callbacks, inside actors you need to carefully avoid closing over
- * the containing actor’s object, i.e. do not call methods or access mutable state
- * on the enclosing actor from within the callback. This would break the actor
- * encapsulation and may introduce synchronization bugs and race conditions because
- * the callback will be scheduled concurrently to the enclosing actor. Unfortunately
- * there is not yet a way to detect these illegal accesses at compile time.
- *
- * Recommended usage:
- *
- * {{{
- * final CompletionStage f = PatternsCS.ask(worker, request, timeout);
- * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
- * }}}
- */
- @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.15")
- def ask(actor: ActorRef, message: Any, timeout: Timeout): CompletionStage[AnyRef] =
- scalaAsk(actor, message)(timeout).asJava.asInstanceOf[CompletionStage[AnyRef]]
-
- /**
- * Java API for `org.apache.pekko.pattern.ask`:
- * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
- * holding the eventual reply message; this means that the target actor
- * needs to send the result to the `sender` reference provided.
- *
- * The CompletionStage will be completed with an [[pekko.pattern.AskTimeoutException]] after the
- * given timeout has expired; this is independent from any timeout applied
- * while awaiting a result for this future (i.e. in
- * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the
- * recipient actor didn't send a reply.
- *
- * Warning:
- * When using future callbacks, inside actors you need to carefully avoid closing over
- * the containing actor’s object, i.e. do not call methods or access mutable state
- * on the enclosing actor from within the callback. This would break the actor
- * encapsulation and may introduce synchronization bugs and race conditions because
- * the callback will be scheduled concurrently to the enclosing actor. Unfortunately
- * there is not yet a way to detect these illegal accesses at compile time.
- *
- * Recommended usage:
- *
- * {{{
- * final CompletionStage f = PatternsCS.ask(worker, request, duration);
- * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
- * }}}
- */
- @deprecated("Use Patterns.ask instead.", since = "Akka 2.5.19")
- def ask(actor: ActorRef, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] =
- ask(actor, message, Timeout.create(timeout))
-
- /**
- * A variation of ask which allows to implement "replyTo" pattern by including
- * sender reference in message.
- *
- * {{{
- * final CompletionStage f = PatternsCS.askWithReplyTo(
- * worker,
- * askSender -> new Request(askSender),
- * timeout);
- * }}}
- *
- * @param actor the actor to be asked
- * @param messageFactory function taking an actor ref and returning the message to be sent
- * @param timeout the timeout for the response before failing the returned completion operator
- */
- @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.15")
- def askWithReplyTo(
- actor: ActorRef,
- messageFactory: japi.function.Function[ActorRef, Any],
- timeout: Timeout): CompletionStage[AnyRef] =
- extended.ask(actor, messageFactory.apply _)(timeout).asJava.asInstanceOf[CompletionStage[AnyRef]]
-
- /**
- * A variation of ask which allows to implement "replyTo" pattern by including
- * sender reference in message.
- *
- * {{{
- * final CompletionStage f = PatternsCS.askWithReplyTo(
- * worker,
- * askSender -> new Request(askSender),
- * timeout);
- * }}}
- *
- * @param actor the actor to be asked
- * @param messageFactory function taking an actor ref and returning the message to be sent
- * @param timeout the timeout for the response before failing the returned completion stage
- */
- @deprecated("Use Pattens.askWithReplyTo instead.", since = "Akka 2.5.19")
- def askWithReplyTo(
- actor: ActorRef,
- messageFactory: japi.function.Function[ActorRef, Any],
- timeout: java.time.Duration): CompletionStage[AnyRef] =
- extended.ask(actor, messageFactory.apply _)(Timeout.create(timeout)).asJava.asInstanceOf[CompletionStage[AnyRef]]
-
- /**
- * Java API for `org.apache.pekko.pattern.ask`:
- * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
- * holding the eventual reply message; this means that the target actor
- * needs to send the result to the `sender` reference provided.
- *
- * The CompletionStage will be completed with an [[pekko.pattern.AskTimeoutException]] after the
- * given timeout has expired; this is independent from any timeout applied
- * while awaiting a result for this future (i.e. in
- * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the
- * recipient actor didn't send a reply.
- *
- * Warning:
- * When using future callbacks, inside actors you need to carefully avoid closing over
- * the containing actor’s object, i.e. do not call methods or access mutable state
- * on the enclosing actor from within the callback. This would break the actor
- * encapsulation and may introduce synchronization bugs and race conditions because
- * the callback will be scheduled concurrently to the enclosing actor. Unfortunately
- * there is not yet a way to detect these illegal accesses at compile time.
- *
- * Recommended usage:
- *
- * {{{
- * final CompletionStage f = PatternsCS.ask(worker, request, timeout);
- * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
- * }}}
- */
- @deprecated("Use Pattens.ask which accepts java.time.Duration instead.", since = "Akka 2.5.19")
- def ask(actor: ActorRef, message: Any, timeoutMillis: Long): CompletionStage[AnyRef] =
- scalaAsk(actor, message)(new Timeout(timeoutMillis, TimeUnit.MILLISECONDS)).asJava
- .asInstanceOf[CompletionStage[AnyRef]]
-
- /**
- * A variation of ask which allows to implement "replyTo" pattern by including
- * sender reference in message.
- *
- * {{{
- * final CompletionStage f = PatternsCS.askWithReplyTo(
- * worker,
- * replyTo -> new Request(replyTo),
- * timeout);
- * }}}
- *
- * @param actor the actor to be asked
- * @param messageFactory function taking an actor ref to reply to and returning the message to be sent
- * @param timeoutMillis the timeout for the response before failing the returned completion operator
- */
- @deprecated("Use Pattens.askWithReplyTo which accepts java.time.Duration instead.", since = "Akka 2.5.19")
- def askWithReplyTo(
- actor: ActorRef,
- messageFactory: japi.function.Function[ActorRef, Any],
- timeoutMillis: Long): CompletionStage[AnyRef] =
- askWithReplyTo(actor, messageFactory, Timeout(timeoutMillis.millis))
-
- /**
- * Java API for `org.apache.pekko.pattern.ask`:
- * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
- * holding the eventual reply message; this means that the target [[pekko.actor.ActorSelection]]
- * needs to send the result to the `sender` reference provided.
- *
- * The CompletionStage will be completed with an [[pekko.pattern.AskTimeoutException]] after the
- * given timeout has expired; this is independent from any timeout applied
- * while awaiting a result for this future (i.e. in
- * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the
- * recipient actor didn't send a reply.
- *
- * Warning:
- * When using future callbacks, inside actors you need to carefully avoid closing over
- * the containing actor’s object, i.e. do not call methods or access mutable state
- * on the enclosing actor from within the callback. This would break the actor
- * encapsulation and may introduce synchronization bugs and race conditions because
- * the callback will be scheduled concurrently to the enclosing actor. Unfortunately
- * there is not yet a way to detect these illegal accesses at compile time.
- *
- * Recommended usage:
- *
- * {{{
- * final CompletionStage f = PatternsCS.ask(selection, request, timeout);
- * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
- * }}}
- */
- @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.15")
- def ask(selection: ActorSelection, message: Any, timeout: Timeout): CompletionStage[AnyRef] =
- scalaAsk(selection, message)(timeout).asJava.asInstanceOf[CompletionStage[AnyRef]]
-
- /**
- * Java API for `org.apache.pekko.pattern.ask`:
- * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
- * holding the eventual reply message; this means that the target [[pekko.actor.ActorSelection]]
- * needs to send the result to the `sender` reference provided.
- *
- * The CompletionStage will be completed with an [[pekko.pattern.AskTimeoutException]] after the
- * given timeout has expired; this is independent from any timeout applied
- * while awaiting a result for this future (i.e. in
- * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the
- * recipient actor didn't send a reply.
- *
- * Warning:
- * When using future callbacks, inside actors you need to carefully avoid closing over
- * the containing actor’s object, i.e. do not call methods or access mutable state
- * on the enclosing actor from within the callback. This would break the actor
- * encapsulation and may introduce synchronization bugs and race conditions because
- * the callback will be scheduled concurrently to the enclosing actor. Unfortunately
- * there is not yet a way to detect these illegal accesses at compile time.
- *
- * Recommended usage:
- *
- * {{{
- * final CompletionStage f = PatternsCS.ask(selection, request, duration);
- * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
- * }}}
- */
- @deprecated("Use Patterns.ask instead.", since = "Akka 2.5.19")
- def ask(selection: ActorSelection, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] =
- ask(selection, message, Timeout.create(timeout))
-
- /**
- * Java API for `org.apache.pekko.pattern.ask`:
- * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
- * holding the eventual reply message; this means that the target [[pekko.actor.ActorSelection]]
- * needs to send the result to the `sender` reference provided.
- *
- * The CompletionStage will be completed with an [[pekko.pattern.AskTimeoutException]] after the
- * given timeout has expired; this is independent from any timeout applied
- * while awaiting a result for this future (i.e. in
- * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the
- * recipient actor didn't send a reply.
- *
- * Warning:
- * When using future callbacks, inside actors you need to carefully avoid closing over
- * the containing actor’s object, i.e. do not call methods or access mutable state
- * on the enclosing actor from within the callback. This would break the actor
- * encapsulation and may introduce synchronization bugs and race conditions because
- * the callback will be scheduled concurrently to the enclosing actor. Unfortunately
- * there is not yet a way to detect these illegal accesses at compile time.
- *
- * Recommended usage:
- *
- * {{{
- * final CompletionStage f = PatternsCS.ask(selection, request, timeout);
- * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
- * }}}
- */
- @deprecated("Use Pattens.ask which accepts java.time.Duration instead.", since = "Akka 2.5.19")
- def ask(selection: ActorSelection, message: Any, timeoutMillis: Long): CompletionStage[AnyRef] =
- scalaAsk(selection, message)(new Timeout(timeoutMillis, TimeUnit.MILLISECONDS)).asJava
- .asInstanceOf[CompletionStage[AnyRef]]
-
- /**
- * A variation of ask which allows to implement "replyTo" pattern by including
- * sender reference in message.
- *
- * {{{
- * final CompletionStage f = Patterns.askWithReplyTo(
- * selection,
- * askSender -> new Request(askSender),
- * timeout);
- * }}}
- */
- @deprecated("Use Pattens.askWithReplyTo which accepts java.time.Duration instead.", since = "Akka 2.5.19")
- def askWithReplyTo(
- selection: ActorSelection,
- messageFactory: japi.Function[ActorRef, Any],
- timeoutMillis: Long): CompletionStage[AnyRef] =
- extended
- .ask(selection, messageFactory.apply _)(Timeout(timeoutMillis.millis))
- .asJava
- .asInstanceOf[CompletionStage[AnyRef]]
-
- /**
- * When this [[java.util.concurrent.CompletionStage]] finishes, send its result to the given
- * [[pekko.actor.ActorRef]] or [[pekko.actor.ActorSelection]].
- * Returns the original CompletionStage to allow method chaining.
- * If the future was completed with failure it is sent as a [[pekko.actor.Status.Failure]]
- * to the recipient.
- *
- * Recommended usage example:
- *
- * {{{
- * final CompletionStage f = PatternsCS.ask(worker, request, timeout);
- * // apply some transformation (i.e. enrich with request info)
- * final CompletionStage transformed = f.thenApply(result -> { ... });
- * // send it on to the next operator
- * PatternsCS.pipe(transformed, context).to(nextActor);
- * }}}
- */
- @deprecated("Use Patterns.pipe instead.", since = "Akka 2.5.19")
- def pipe[T](future: CompletionStage[T], context: ExecutionContext): PipeableCompletionStage[T] =
- pipeCompletionStage(future)(context)
-
- /**
- * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with success (value `true`) when
- * existing messages of the target actor has been processed and the actor has been
- * terminated.
- *
- * Useful when you need to wait for termination or compose ordered termination of several actors.
- *
- * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]]
- * is completed with failure [[pekko.pattern.AskTimeoutException]].
- */
- @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
- def gracefulStop(target: ActorRef, timeout: FiniteDuration): CompletionStage[java.lang.Boolean] =
- scalaGracefulStop(target, timeout).asJava.asInstanceOf[CompletionStage[java.lang.Boolean]]
-
- /**
- * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with success (value `true`) when
- * existing messages of the target actor has been processed and the actor has been
- * terminated.
- *
- * Useful when you need to wait for termination or compose ordered termination of several actors.
- *
- * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]]
- * is completed with failure [[pekko.pattern.AskTimeoutException]].
- */
- @deprecated("Use Patterns.gracefulStop instead.", since = "Akka 2.5.19")
- def gracefulStop(target: ActorRef, timeout: java.time.Duration): CompletionStage[java.lang.Boolean] =
- scalaGracefulStop(target, timeout.asScala).asJava.asInstanceOf[CompletionStage[java.lang.Boolean]]
-
- /**
- * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with success (value `true`) when
- * existing messages of the target actor has been processed and the actor has been
- * terminated.
- *
- * Useful when you need to wait for termination or compose ordered termination of several actors.
- *
- * If you want to invoke specialized stopping logic on your target actor instead of PoisonPill, you can pass your
- * stop command as `stopMessage` parameter
- *
- * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]]
- * is completed with failure [[pekko.pattern.AskTimeoutException]].
- */
- @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
- def gracefulStop(target: ActorRef, timeout: FiniteDuration, stopMessage: Any): CompletionStage[java.lang.Boolean] =
- scalaGracefulStop(target, timeout, stopMessage).asJava.asInstanceOf[CompletionStage[java.lang.Boolean]]
-
- /**
- * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with success (value `true`) when
- * existing messages of the target actor has been processed and the actor has been
- * terminated.
- *
- * Useful when you need to wait for termination or compose ordered termination of several actors.
- *
- * If you want to invoke specialized stopping logic on your target actor instead of PoisonPill, you can pass your
- * stop command as `stopMessage` parameter
- *
- * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]]
- * is completed with failure [[pekko.pattern.AskTimeoutException]].
- */
- @deprecated("Use Patterns.gracefulStop instead.", since = "Akka 2.5.19")
- def gracefulStop(
- target: ActorRef,
- timeout: java.time.Duration,
- stopMessage: Any): CompletionStage[java.lang.Boolean] =
- scalaGracefulStop(target, timeout.asScala, stopMessage).asJava.asInstanceOf[CompletionStage[java.lang.Boolean]]
-
- /**
- * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided Callable
- * after the specified duration.
- */
- @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
- def after[T](
- duration: FiniteDuration,
- scheduler: Scheduler,
- context: ExecutionContext,
- value: Callable[CompletionStage[T]]): CompletionStage[T] =
- afterCompletionStage(duration, scheduler)(value.call())(context)
-
- /**
- * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided Callable
- * after the specified duration.
- */
- @deprecated("Use Patterns.after instead.", since = "Akka 2.5.19")
- def after[T](
- duration: java.time.Duration,
- scheduler: Scheduler,
- context: ExecutionContext,
- value: Callable[CompletionStage[T]]): CompletionStage[T] =
- afterCompletionStage(duration.asScala, scheduler)(value.call())(context)
-
- /**
- * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided value
- * after the specified duration.
- */
- @deprecated(
- "Use Patterns.after which accepts java.time.Duration and Callable of CompletionStage instead.",
- since = "Akka 2.5.22")
- def after[T](
- duration: FiniteDuration,
- scheduler: Scheduler,
- context: ExecutionContext,
- value: CompletionStage[T]): CompletionStage[T] =
- afterCompletionStage(duration, scheduler)(value)(context)
-
- /**
- * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided value
- * after the specified duration.
- */
- @deprecated(
- "Use Patterns.after which accepts java.time.Duration and Callable of CompletionStage instead.",
- since = "Akka 2.5.22")
- def after[T](
- duration: java.time.Duration,
- scheduler: Scheduler,
- context: ExecutionContext,
- value: CompletionStage[T]): CompletionStage[T] =
- afterCompletionStage(duration.asScala, scheduler)(value)(context)
-
- /**
- * Returns an internally retrying [[java.util.concurrent.CompletionStage]]
- * The first attempt will be made immediately, and each subsequent attempt will be made after 'delay'.
- * A scheduler (eg context.system.scheduler) must be provided to delay each retry
- * If attempts are exhausted the returned completion operator is simply the result of invoking attempt.
- * Note that the attempt function will be invoked on the given execution context for subsequent tries
- * and therefore must be thread safe (i.e. not touch unsafe mutable state).
- */
- @deprecated("Use Patterns.retry instead.", since = "Akka 2.5.19")
- def retry[T](
- attempt: Callable[CompletionStage[T]],
- attempts: Int,
- delay: java.time.Duration,
- scheduler: Scheduler,
- ec: ExecutionContext): CompletionStage[T] =
- scalaRetry(() => attempt.call().asScala, attempts, delay.asScala)(ec, scheduler).asJava
-}
diff --git a/cluster-sharding/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/cluster-sharding/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
new file mode 100644
index 0000000000..f129e622b9
--- /dev/null
+++ b/cluster-sharding/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Remove deprecated methods
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.cluster.sharding.ClusterShardingSettings.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.cluster.sharding.ClusterShardingSettings#TuningParameters.this")
diff --git a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingSettings.scala b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingSettings.scala
index 2fc309cc75..18d0430ee4 100644
--- a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingSettings.scala
+++ b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingSettings.scala
@@ -937,172 +937,6 @@ object ClusterShardingSettings {
require(
entityRecoveryStrategy == "all" || entityRecoveryStrategy == "constant",
s"Unknown 'entity-recovery-strategy' [$entityRecoveryStrategy], valid values are 'all' or 'constant'")
-
- // included for binary compatibility
- @deprecated(
- "Use the ClusterShardingSettings factory methods or the constructor including " +
- "leastShardAllocationAbsoluteLimit and leastShardAllocationRelativeLimit instead",
- since = "Akka 2.6.10")
- def this(
- coordinatorFailureBackoff: FiniteDuration,
- retryInterval: FiniteDuration,
- bufferSize: Int,
- handOffTimeout: FiniteDuration,
- shardStartTimeout: FiniteDuration,
- shardFailureBackoff: FiniteDuration,
- entityRestartBackoff: FiniteDuration,
- rebalanceInterval: FiniteDuration,
- snapshotAfter: Int,
- keepNrOfBatches: Int,
- leastShardAllocationRebalanceThreshold: Int,
- leastShardAllocationMaxSimultaneousRebalance: Int,
- waitingForStateTimeout: FiniteDuration,
- updatingStateTimeout: FiniteDuration,
- entityRecoveryStrategy: String,
- entityRecoveryConstantRateStrategyFrequency: FiniteDuration,
- entityRecoveryConstantRateStrategyNumberOfEntities: Int,
- coordinatorStateWriteMajorityPlus: Int,
- coordinatorStateReadMajorityPlus: Int) =
- this(
- coordinatorFailureBackoff,
- retryInterval,
- bufferSize,
- handOffTimeout,
- shardStartTimeout,
- shardFailureBackoff,
- entityRestartBackoff,
- rebalanceInterval,
- snapshotAfter,
- keepNrOfBatches,
- leastShardAllocationRebalanceThreshold,
- leastShardAllocationMaxSimultaneousRebalance,
- waitingForStateTimeout,
- updatingStateTimeout,
- entityRecoveryStrategy,
- entityRecoveryConstantRateStrategyFrequency,
- entityRecoveryConstantRateStrategyNumberOfEntities,
- coordinatorStateWriteMajorityPlus,
- coordinatorStateReadMajorityPlus,
- leastShardAllocationAbsoluteLimit = 100,
- leastShardAllocationRelativeLimit = 0.1)
-
- // included for binary compatibility
- @deprecated(
- "Use the ClusterShardingSettings factory methods or the constructor including " +
- "coordinatorStateWriteMajorityPlus and coordinatorStateReadMajorityPlus instead",
- since = "Akka 2.6.5")
- def this(
- coordinatorFailureBackoff: FiniteDuration,
- retryInterval: FiniteDuration,
- bufferSize: Int,
- handOffTimeout: FiniteDuration,
- shardStartTimeout: FiniteDuration,
- shardFailureBackoff: FiniteDuration,
- entityRestartBackoff: FiniteDuration,
- rebalanceInterval: FiniteDuration,
- snapshotAfter: Int,
- keepNrOfBatches: Int,
- leastShardAllocationRebalanceThreshold: Int,
- leastShardAllocationMaxSimultaneousRebalance: Int,
- waitingForStateTimeout: FiniteDuration,
- updatingStateTimeout: FiniteDuration,
- entityRecoveryStrategy: String,
- entityRecoveryConstantRateStrategyFrequency: FiniteDuration,
- entityRecoveryConstantRateStrategyNumberOfEntities: Int) =
- this(
- coordinatorFailureBackoff,
- retryInterval,
- bufferSize,
- handOffTimeout,
- shardStartTimeout,
- shardFailureBackoff,
- entityRestartBackoff,
- rebalanceInterval,
- snapshotAfter,
- keepNrOfBatches,
- leastShardAllocationRebalanceThreshold,
- leastShardAllocationMaxSimultaneousRebalance,
- waitingForStateTimeout,
- updatingStateTimeout,
- entityRecoveryStrategy,
- entityRecoveryConstantRateStrategyFrequency,
- entityRecoveryConstantRateStrategyNumberOfEntities,
- coordinatorStateWriteMajorityPlus = 5,
- coordinatorStateReadMajorityPlus = 5)
-
- // included for binary compatibility
- @deprecated("Use the ClusterShardingSettings factory methods or the full constructor instead", since = "Akka 2.6.5")
- def this(
- coordinatorFailureBackoff: FiniteDuration,
- retryInterval: FiniteDuration,
- bufferSize: Int,
- handOffTimeout: FiniteDuration,
- shardStartTimeout: FiniteDuration,
- shardFailureBackoff: FiniteDuration,
- entityRestartBackoff: FiniteDuration,
- rebalanceInterval: FiniteDuration,
- snapshotAfter: Int,
- leastShardAllocationRebalanceThreshold: Int,
- leastShardAllocationMaxSimultaneousRebalance: Int,
- waitingForStateTimeout: FiniteDuration,
- updatingStateTimeout: FiniteDuration,
- entityRecoveryStrategy: String,
- entityRecoveryConstantRateStrategyFrequency: FiniteDuration,
- entityRecoveryConstantRateStrategyNumberOfEntities: Int) = {
- this(
- coordinatorFailureBackoff,
- retryInterval,
- bufferSize,
- handOffTimeout,
- shardStartTimeout,
- shardFailureBackoff,
- entityRestartBackoff,
- rebalanceInterval,
- snapshotAfter,
- 2,
- leastShardAllocationRebalanceThreshold,
- leastShardAllocationMaxSimultaneousRebalance,
- waitingForStateTimeout,
- updatingStateTimeout,
- entityRecoveryStrategy,
- entityRecoveryConstantRateStrategyFrequency,
- entityRecoveryConstantRateStrategyNumberOfEntities)
- }
-
- // included for binary compatibility
- @deprecated("Use the ClusterShardingSettings factory methods or the full constructor instead", since = "Akka 2.6.5")
- def this(
- coordinatorFailureBackoff: FiniteDuration,
- retryInterval: FiniteDuration,
- bufferSize: Int,
- handOffTimeout: FiniteDuration,
- shardStartTimeout: FiniteDuration,
- shardFailureBackoff: FiniteDuration,
- entityRestartBackoff: FiniteDuration,
- rebalanceInterval: FiniteDuration,
- snapshotAfter: Int,
- leastShardAllocationRebalanceThreshold: Int,
- leastShardAllocationMaxSimultaneousRebalance: Int,
- waitingForStateTimeout: FiniteDuration,
- updatingStateTimeout: FiniteDuration) = {
- this(
- coordinatorFailureBackoff,
- retryInterval,
- bufferSize,
- handOffTimeout,
- shardStartTimeout,
- shardFailureBackoff,
- entityRestartBackoff,
- rebalanceInterval,
- snapshotAfter,
- leastShardAllocationRebalanceThreshold,
- leastShardAllocationMaxSimultaneousRebalance,
- waitingForStateTimeout,
- updatingStateTimeout,
- "all",
- 100.milliseconds,
- 5)
- }
}
}
@@ -1137,163 +971,6 @@ final class ClusterShardingSettings(
val coordinatorSingletonSettings: ClusterSingletonManagerSettings,
val leaseSettings: Option[LeaseUsageSettings])
extends NoSerializationVerificationNeeded {
- @deprecated(
- "Use the ClusterShardingSettings factory methods or the constructor including coordinatorSingletonOverrideRole instead",
- "Akka 2.6.20")
- def this(
- role: Option[String],
- rememberEntities: Boolean,
- journalPluginId: String,
- snapshotPluginId: String,
- stateStoreMode: String,
- rememberEntitiesStore: String,
- passivationStrategySettings: ClusterShardingSettings.PassivationStrategySettings,
- shardRegionQueryTimeout: FiniteDuration,
- tuningParameters: ClusterShardingSettings.TuningParameters,
- coordinatorSingletonSettings: ClusterSingletonManagerSettings,
- leaseSettings: Option[LeaseUsageSettings]) =
- this(
- role,
- rememberEntities,
- journalPluginId,
- snapshotPluginId,
- stateStoreMode,
- rememberEntitiesStore,
- passivationStrategySettings,
- shardRegionQueryTimeout,
- tuningParameters,
- true,
- coordinatorSingletonSettings,
- leaseSettings)
-
- @deprecated(
- "Use the ClusterShardingSettings factory methods or the constructor including passivationStrategySettings instead",
- "Akka 2.6.18")
- def this(
- role: Option[String],
- rememberEntities: Boolean,
- journalPluginId: String,
- snapshotPluginId: String,
- stateStoreMode: String,
- rememberEntitiesStore: String,
- passivateIdleEntityAfter: FiniteDuration,
- shardRegionQueryTimeout: FiniteDuration,
- tuningParameters: ClusterShardingSettings.TuningParameters,
- coordinatorSingletonSettings: ClusterSingletonManagerSettings,
- leaseSettings: Option[LeaseUsageSettings]) =
- this(
- role,
- rememberEntities,
- journalPluginId,
- snapshotPluginId,
- stateStoreMode,
- rememberEntitiesStore,
- ClusterShardingSettings.PassivationStrategySettings.oldDefault(passivateIdleEntityAfter),
- shardRegionQueryTimeout,
- tuningParameters,
- true,
- coordinatorSingletonSettings,
- leaseSettings)
-
- @deprecated(
- "Use the ClusterShardingSettings factory methods or the constructor including rememberedEntitiesStore instead",
- "Akka 2.6.7")
- def this(
- role: Option[String],
- rememberEntities: Boolean,
- journalPluginId: String,
- snapshotPluginId: String,
- stateStoreMode: String,
- passivateIdleEntityAfter: FiniteDuration,
- shardRegionQueryTimeout: FiniteDuration,
- tuningParameters: ClusterShardingSettings.TuningParameters,
- coordinatorSingletonSettings: ClusterSingletonManagerSettings,
- leaseSettings: Option[LeaseUsageSettings]) =
- this(
- role,
- rememberEntities,
- journalPluginId,
- snapshotPluginId,
- stateStoreMode,
- "ddata",
- passivateIdleEntityAfter,
- shardRegionQueryTimeout,
- tuningParameters,
- coordinatorSingletonSettings,
- leaseSettings)
-
- // bin compat for Akka 2.5.23
- @deprecated(
- "Use the ClusterShardingSettings factory methods or the constructor including shardRegionQueryTimeout instead",
- since = "Akka 2.6.0")
- def this(
- role: Option[String],
- rememberEntities: Boolean,
- journalPluginId: String,
- snapshotPluginId: String,
- stateStoreMode: String,
- passivateIdleEntityAfter: FiniteDuration,
- tuningParameters: ClusterShardingSettings.TuningParameters,
- coordinatorSingletonSettings: ClusterSingletonManagerSettings,
- leaseSettings: Option[LeaseUsageSettings]) =
- this(
- role,
- rememberEntities,
- journalPluginId,
- snapshotPluginId,
- stateStoreMode,
- passivateIdleEntityAfter,
- 3.seconds,
- tuningParameters,
- coordinatorSingletonSettings,
- leaseSettings)
-
- // bin compat for Akka 2.5.21
- @deprecated(
- "Use the ClusterShardingSettings factory methods or the constructor including shardRegionQueryTimeout instead",
- since = "Akka 2.5.21")
- def this(
- role: Option[String],
- rememberEntities: Boolean,
- journalPluginId: String,
- snapshotPluginId: String,
- stateStoreMode: String,
- passivateIdleEntityAfter: FiniteDuration,
- tuningParameters: ClusterShardingSettings.TuningParameters,
- coordinatorSingletonSettings: ClusterSingletonManagerSettings) =
- this(
- role,
- rememberEntities,
- journalPluginId,
- snapshotPluginId,
- stateStoreMode,
- passivateIdleEntityAfter,
- 3.seconds,
- tuningParameters,
- coordinatorSingletonSettings,
- None)
-
- // included for binary compatibility reasons
- @deprecated(
- "Use the ClusterShardingSettings factory methods or the constructor including passivateIdleEntityAfter instead",
- since = "Akka 2.5.18")
- def this(
- role: Option[String],
- rememberEntities: Boolean,
- journalPluginId: String,
- snapshotPluginId: String,
- stateStoreMode: String,
- tuningParameters: ClusterShardingSettings.TuningParameters,
- coordinatorSingletonSettings: ClusterSingletonManagerSettings) =
- this(
- role,
- rememberEntities,
- journalPluginId,
- snapshotPluginId,
- stateStoreMode,
- Duration.Zero,
- tuningParameters,
- coordinatorSingletonSettings)
import ClusterShardingSettings.{ RememberEntitiesStoreCustom, StateStoreModeDData, StateStoreModePersistence }
require(
diff --git a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala
index 75ad99be19..c71309f555 100644
--- a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala
+++ b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala
@@ -1305,8 +1305,7 @@ abstract class ShardCoordinator(
*
* @see [[ClusterSharding$ ClusterSharding extension]]
*/
-@deprecated("Use `ddata` mode, persistence mode is deprecated.", "Akka 2.6.0")
-class PersistentShardCoordinator(
+private class PersistentShardCoordinator(
override val typeName: String,
settings: ClusterShardingSettings,
allocationStrategy: ShardCoordinator.ShardAllocationStrategy)
diff --git a/docs/src/main/paradox/stream/operators/FileIO/fromFile.md b/docs/src/main/paradox/stream/operators/FileIO/fromFile.md
deleted file mode 100644
index 06c5a115cd..0000000000
--- a/docs/src/main/paradox/stream/operators/FileIO/fromFile.md
+++ /dev/null
@@ -1,22 +0,0 @@
-# FileIO.fromFile
-
-Emits the contents of a file.
-
-@ref[File IO Sinks and Sources](../index.md#file-io-sinks-and-sources)
-
-@@@ warning
-
-The `fromFile` operator has been deprecated, use @ref:[fromPath](./fromPath.md) instead.
-
-@@@
-
-## Signature
-
-@apidoc[FileIO.fromFile](FileIO$) { scala="#fromFile(f:java.io.File,chunkSize:Int):org.apache.pekko.stream.scaladsl.Source[org.apache.pekko.util.ByteString,scala.concurrent.Future[org.apache.pekko.stream.IOResult]]" java="#fromFile(java.io.File)" java="#fromFile(java.io.File,int)" }
-
-
-## Description
-
-Emits the contents of a file, as `ByteString`s, materializes into a @scala[`Future`] @java[`CompletionStage`] which will be completed with
-a `IOResult` upon reaching the end of the file or if there is a failure.
-
diff --git a/docs/src/main/paradox/stream/operators/FileIO/toFile.md b/docs/src/main/paradox/stream/operators/FileIO/toFile.md
deleted file mode 100644
index c75ed1f7d7..0000000000
--- a/docs/src/main/paradox/stream/operators/FileIO/toFile.md
+++ /dev/null
@@ -1,21 +0,0 @@
-# FileIO.toFile
-
-Create a sink which will write incoming `ByteString` s to a given file.
-
-@ref[File IO Sinks and Sources](../index.md#file-io-sinks-and-sources)
-
-@@@ warning
-
-The `toFile` operator has been deprecated, use @ref:[toPath](./toPath.md) instead.
-
-@@@
-
-## Signature
-
-@apidoc[FileIO.toFile](FileIO$) { scala="#toFile(f:java.io.File,options:Set[java.nio.file.OpenOption]):org.apache.pekko.stream.scaladsl.Sink[org.apache.pekko.util.ByteString,scala.concurrent.Future[org.apache.pekko.stream.IOResult]]" java="#toFile(java.io.File,java.util.Set)" }
-
-
-## Description
-
-Creates a Sink which writes incoming `ByteString` elements to the given file path. Overwrites existing files by truncating their contents as default.
-Materializes a @scala[`Future`] @java[`CompletionStage`] of `IOResult` that will be completed with the size of the file (in bytes) at the streams completion, and a possible exception if IO operation was not completed successfully.
diff --git a/docs/src/main/paradox/stream/operators/Sink/foreachParallel.md b/docs/src/main/paradox/stream/operators/Sink/foreachParallel.md
deleted file mode 100644
index 8514e770f3..0000000000
--- a/docs/src/main/paradox/stream/operators/Sink/foreachParallel.md
+++ /dev/null
@@ -1,16 +0,0 @@
-# Sink.foreachParallel
-
-Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.
-
-@ref[Sink operators](../index.md#sink-operators)
-
-## Reactive Streams semantics
-
-@@@div { .callout }
-
-**cancels** never
-
-**backpressures** when the previous parallel procedure invocations has not yet completed
-
-@@@
-
diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md
index 8d0dfabcba..ad61511663 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -68,7 +68,6 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
|Sink| @ref[forall](Sink/forall.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.|
|Sink| @ref[foreach](Sink/foreach.md)|Invoke a given procedure for each element received.|
|Sink| @ref[foreachAsync](Sink/foreachAsync.md)|Invoke a given procedure asynchronously for each element received.|
-|Sink| @ref[foreachParallel](Sink/foreachParallel.md)|Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.|
|Sink| @ref[fromMaterializer](Sink/fromMaterializer.md)|Defer the creation of a `Sink` until materialization and access `Materializer` and `Attributes`|
|Sink| @ref[fromSubscriber](Sink/fromSubscriber.md)|Integration with Reactive Streams, wraps a `org.reactivestreams.Subscriber` as a sink.|
|Sink| @ref[futureSink](Sink/futureSink.md)|Streams the elements to the given future sink once it successfully completes. |
@@ -133,9 +132,7 @@ Sources and sinks for reading and writing files can be found on `FileIO`.
| |Operator|Description|
|--|--|--|
-|FileIO| @ref[fromFile](FileIO/fromFile.md)|Emits the contents of a file.|
|FileIO| @ref[fromPath](FileIO/fromPath.md)|Emits the contents of a file from the given path.|
-|FileIO| @ref[toFile](FileIO/toFile.md)|Create a sink which will write incoming `ByteString` s to a given file.|
|FileIO| @ref[toPath](FileIO/toPath.md)|Create a sink which will write incoming `ByteString` s to a given file path.|
## Simple operators
@@ -475,11 +472,9 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [forall](Sink/forall.md)
* [foreach](Sink/foreach.md)
* [foreachAsync](Sink/foreachAsync.md)
-* [foreachParallel](Sink/foreachParallel.md)
* [from](Source/from.md)
* [fromArray](Source/fromArray.md)
* [fromCompletionStage](Source/fromCompletionStage.md)
-* [fromFile](FileIO/fromFile.md)
* [fromFuture](Source/fromFuture.md)
* [fromFutureSource](Source/fromFutureSource.md)
* [fromInputStream](StreamConverters/fromInputStream.md)
@@ -607,7 +602,6 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [takeWithin](Source-or-Flow/takeWithin.md)
* [throttle](Source-or-Flow/throttle.md)
* [tick](Source/tick.md)
-* [toFile](FileIO/toFile.md)
* [toPath](FileIO/toPath.md)
* [unfold](Source/unfold.md)
* [unfoldAsync](Source/unfoldAsync.md)
diff --git a/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala b/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala
index 0cdbafea71..46bf05c5ac 100644
--- a/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala
+++ b/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala
@@ -54,9 +54,6 @@ private[pekko] object NoMaterializer extends Materializer {
def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable =
throw new UnsupportedOperationException("NoMaterializer cannot schedule a single event")
- def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable =
- throw new UnsupportedOperationException("NoMaterializer cannot schedule a repeated event")
-
override def scheduleWithFixedDelay(
initialDelay: FiniteDuration,
delay: FiniteDuration,
diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
index 0d3de6261f..65f8157750 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
@@ -170,6 +170,7 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with Matchers {
Ignore(_ == pekko.stream.scaladsl.Source.getClass, _ == "actorRefWithAck", _ => true, _ => true), // Internal in scaladsl
Ignore(_ == pekko.stream.scaladsl.Source.getClass, _ == "actorRefWithBackpressure", _ => true, _ => true), // Internal in scaladsl
Ignore(_ == pekko.stream.scaladsl.BidiFlow.getClass, _ == "apply", _ == 24, _ => true),
+ Ignore(_ == pekko.stream.scaladsl.BidiFlow.getClass, _ == "bidirectionalIdleTimeout", _ => true, _ => true),
Ignore(_ == pekko.stream.scaladsl.GraphDSL.getClass, _ == "runnable", _ == 24, _ => true),
Ignore(_ == pekko.stream.scaladsl.GraphDSL.getClass, _ == "create", _ == 24, _ => true),
// all generated methods like scaladsl.Sink$.akka$stream$scaladsl$Sink$$newOnCompleteStage$1
diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkForeachParallelSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkForeachParallelSpec.scala
deleted file mode 100644
index 4f1310b3ff..0000000000
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkForeachParallelSpec.scala
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * license agreements; and to You under the Apache License, version 2.0:
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * This file is part of the Apache Pekko project, which was derived from Akka.
- */
-
-/*
- * Copyright (C) 2015-2022 Lightbend Inc.
- */
-
-package org.apache.pekko.stream.scaladsl
-
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.TimeUnit
-
-import scala.annotation.nowarn
-import scala.concurrent.Await
-import scala.concurrent.duration._
-import scala.util.control.NoStackTrace
-
-import org.apache.pekko
-import pekko.stream.ActorAttributes._
-import pekko.stream.Supervision._
-import pekko.stream.testkit.StreamSpec
-import pekko.testkit.TestLatch
-import pekko.testkit.TestProbe
-
-@nowarn // tests deprecated APIs
-class SinkForeachParallelSpec extends StreamSpec {
-
- "A ForeachParallel" must {
- "produce elements in the order they are ready" in {
- import system.dispatcher
-
- val probe = TestProbe()
- val latch = (1 to 4).map(_ -> TestLatch(1)).toMap
- val p = Source(1 to 4).runWith(Sink.foreachParallel(4)((n: Int) => {
- Await.ready(latch(n), 5.seconds)
- probe.ref ! n
- }))
- latch(2).countDown()
- probe.expectMsg(2)
- latch(4).countDown()
- probe.expectMsg(4)
- latch(3).countDown()
- probe.expectMsg(3)
-
- assert(!p.isCompleted)
-
- latch(1).countDown()
- probe.expectMsg(1)
-
- Await.result(p, 4.seconds)
- assert(p.isCompleted)
- }
-
- "not run more functions in parallel then specified" in {
- import system.dispatcher
-
- val probe = TestProbe()
- val latch = (1 to 5).map(_ -> TestLatch()).toMap
-
- val p = Source(1 to 5).runWith(Sink.foreachParallel(4)((n: Int) => {
- probe.ref ! n
- Await.ready(latch(n), 5.seconds)
- }))
- probe.expectMsgAllOf(1, 2, 3, 4)
- probe.expectNoMessage(200.millis)
-
- assert(!p.isCompleted)
-
- for (i <- 1 to 4) latch(i).countDown()
-
- latch(5).countDown()
- probe.expectMsg(5)
-
- Await.result(p, 5.seconds)
- assert(p.isCompleted)
-
- }
-
- "resume after function failure" in {
- import system.dispatcher
-
- val probe = TestProbe()
- val latch = TestLatch(1)
-
- val p = Source(1 to 5).runWith(
- Sink
- .foreachParallel(4)((n: Int) => {
- if (n == 3) throw new RuntimeException("err1") with NoStackTrace
- else {
- probe.ref ! n
- Await.ready(latch, 10.seconds)
- }
- })
- .withAttributes(supervisionStrategy(resumingDecider)))
-
- latch.countDown()
- probe.expectMsgAllOf(1, 2, 4, 5)
-
- Await.result(p, 5.seconds)
- }
-
- "finish after function thrown exception" in {
- import system.dispatcher
-
- val probe = TestProbe()
- val element4Latch = new CountDownLatch(1)
- val errorLatch = new CountDownLatch(2)
-
- val p = Source
- .fromIterator(() => Iterator.from(1))
- .runWith(
- Sink
- .foreachParallel(3)((n: Int) => {
- if (n == 3) {
- // Error will happen only after elements 1, 2 has been processed
- errorLatch.await(5, TimeUnit.SECONDS)
- throw new RuntimeException("err2") with NoStackTrace
- } else {
- probe.ref ! n
- errorLatch.countDown()
- element4Latch.await(5, TimeUnit.SECONDS) // Block element 4, 5, 6, ... from entering
- }
- })
- .withAttributes(supervisionStrategy(stoppingDecider)))
-
- // Only the first two messages are guaranteed to arrive due to their enforced ordering related to the time
- // of failure.
- probe.expectMsgAllOf(1, 2)
- element4Latch.countDown() // Release elements 4, 5, 6, ...
-
- a[RuntimeException] must be thrownBy Await.result(p, 3.seconds)
- }
-
- "handle empty source" in {
- import system.dispatcher
-
- val p = Source(List.empty[Int]).runWith(Sink.foreachParallel(3)(a => ()))
-
- Await.result(p, 200.seconds)
- }
-
- }
-
-}
diff --git a/stream-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/stream-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
new file mode 100644
index 0000000000..87ece02dd0
--- /dev/null
+++ b/stream-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Remove deprecated methods
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.typed.javadsl.ActorSource.actorRefWithAck")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.typed.scaladsl.ActorSource.actorRefWithAck")
diff --git a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala
index 0bb3667931..5e31b916e6 100644
--- a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala
+++ b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala
@@ -118,46 +118,4 @@ object ActorSource {
}
})
.asJava
-
- /**
- * Creates a `Source` that is materialized as an [[pekko.actor.ActorRef]].
- * Messages sent to this actor will be emitted to the stream if there is demand from downstream,
- * and a new message will only be accepted after the previous messages has been consumed and acknowledged back.
- * The stream will complete with failure if a message is sent before the acknowledgement has been replied back.
- *
- * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted
- * [[java.lang.Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received
- * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`,
- * the failure will be signaled downstream immediately (instead of the completion signal).
- *
- * The actor will be stopped when the stream is completed, failed or canceled from downstream,
- * i.e. you can watch it to get notified when that happens.
- *
- * @deprecated Use actorRefWithBackpressure instead
- */
- @deprecated("Use actorRefWithBackpressure instead", "Akka 2.6.0")
- def actorRefWithAck[T, Ack](
- ackTo: ActorRef[Ack],
- ackMessage: Ack,
- completionMatcher: pekko.japi.function.Function[T, java.util.Optional[CompletionStrategy]],
- failureMatcher: pekko.japi.function.Function[T, java.util.Optional[Throwable]]): Source[T, ActorRef[T]] =
- pekko.stream.typed.scaladsl.ActorSource
- .actorRefWithBackpressure[T, Ack](
- ackTo,
- ackMessage,
- new JavaPartialFunction[T, CompletionStrategy] {
- override def apply(x: T, isCheck: Boolean): CompletionStrategy = {
- val result = completionMatcher(x)
- if (!result.isPresent) throw JavaPartialFunction.noMatch()
- else result.get()
- }
- },
- new JavaPartialFunction[T, Throwable] {
- override def apply(x: T, isCheck: Boolean): Throwable = {
- val result = failureMatcher(x)
- if (!result.isPresent) throw JavaPartialFunction.noMatch()
- else result.get()
- }
- })
- .asJava
}
diff --git a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorSource.scala b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorSource.scala
index b86083b7b0..388c0f396a 100644
--- a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorSource.scala
+++ b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorSource.scala
@@ -105,32 +105,4 @@ object ActorSource {
completionMatcher.asInstanceOf[PartialFunction[Any, CompletionStrategy]],
failureMatcher.asInstanceOf[PartialFunction[Any, Throwable]])
.mapMaterializedValue(actorRefAdapter)
-
- /**
- * Creates a `Source` that is materialized as an [[pekko.actor.typed.ActorRef]].
- * Messages sent to this actor will be emitted to the stream if there is demand from downstream,
- * and a new message will only be accepted after the previous messages has been consumed and acknowledged back.
- * The stream will complete with failure if a message is sent before the acknowledgement has been replied back.
- *
- * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted
- * [[java.lang.Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received
- * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`,
- * the failure will be signaled downstream immediately (instead of the completion signal).
- *
- * The actor will be stopped when the stream is completed, failed or canceled from downstream,
- * i.e. you can watch it to get notified when that happens.
- */
- @deprecated("Use actorRefWithBackpressure instead", "Akka 2.6.0")
- def actorRefWithAck[T, Ack](
- ackTo: ActorRef[Ack],
- ackMessage: Ack,
- completionMatcher: PartialFunction[T, CompletionStrategy],
- failureMatcher: PartialFunction[T, Throwable]): Source[T, ActorRef[T]] =
- Source
- .actorRefWithAck[T](
- Some(ackTo.toClassic),
- ackMessage,
- completionMatcher.asInstanceOf[PartialFunction[Any, CompletionStrategy]],
- failureMatcher.asInstanceOf[PartialFunction[Any, Throwable]])
- .mapMaterializedValue(actorRefAdapter)
}
diff --git a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
index feb063120b..0a1be3f84e 100644
--- a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
+++ b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
@@ -17,6 +17,7 @@
# Remove deprecated methods
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.FanInShape1N")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.Materializer.schedulePeriodically")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.UniformFanInShape.inSeq")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.UniformFanOutShape.outArray")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ConstantFun*")
@@ -25,13 +26,21 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.impl
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.io.ByteStringParser$ParsingException")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.impl.io.ByteStringParser#ParsingLogic.onDownstreamFinish")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.javadsl.CoupledTerminationFlow*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.BidiFlow.bidirectionalIdleTimeout")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.FileIO.fromFile")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.FileIO.toFile")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartFlow.onFailuresWithBackoff")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartFlow.withBackoff")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartSink.withBackoff")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartSource.onFailuresWithBackoff")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartSource.withBackoff")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.Sink.actorRefWithAck")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.Sink.foreachParallel")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.Source.actorRefWithAck")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Balance.this")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.scaladsl.CoupledTerminationFlow*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FileIO.fromFile*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FileIO.toFile*")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.GraphApply.create")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.GraphDSL.create")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.MergeHub#MergedSourceLogic.onDownstreamFinish")
@@ -41,6 +50,9 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scal
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.RestartSink.withBackoff")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.RestartSource.onFailuresWithBackoff")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.RestartSource.withBackoff")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Sink.foreachParallel")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Sink.actorRefWithAck*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Source.actorRefWithAck")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.ZipWithN.inSeq")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.AbstractInOutHandler.onDownstreamFinish")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.AbstractOutHandler.onDownstreamFinish")
diff --git a/stream/src/main/scala/org/apache/pekko/stream/Materializer.scala b/stream/src/main/scala/org/apache/pekko/stream/Materializer.scala
index 35c04210ff..dd5c56b92b 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/Materializer.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/Materializer.scala
@@ -143,19 +143,6 @@ abstract class Materializer {
*/
def scheduleAtFixedRate(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable
- /**
- * Interface for operators that need timer services for their functionality. Schedules a
- * repeated task with the given interval between invocations.
- *
- * @return A [[pekko.actor.Cancellable]] that allows cancelling the timer. Cancelling is best effort, if the event
- * has been already enqueued it will not have an effect.
- */
- @deprecated(
- "Use scheduleWithFixedDelay or scheduleAtFixedRate instead. This has the same semantics as " +
- "scheduleAtFixedRate, but scheduleWithFixedDelay is often preferred.",
- since = "Akka 2.6.0")
- def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable
-
/**
* Shuts down this materializer and all the operators that have been materialized through this materializer. After
* having shut down, this materializer cannot be used again. Any attempt to materialize operators after having
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala
index 214f333345..84e66b6744 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala
@@ -150,12 +150,6 @@ private[pekko] class SubFusingActorMaterializerImpl(
task: Runnable): Cancellable =
delegate.scheduleAtFixedRate(initialDelay, interval, task)
- override def schedulePeriodically(
- initialDelay: FiniteDuration,
- interval: FiniteDuration,
- task: Runnable): Cancellable =
- scheduleAtFixedRate(initialDelay, interval, task)
-
override def withNamePrefix(name: String): SubFusingActorMaterializerImpl =
new SubFusingActorMaterializerImpl(delegate.withNamePrefix(name), registerShell)
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala
index fbff399df9..fc5b35bd1c 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala
@@ -442,12 +442,6 @@ private final case class SavedIslandData(
task: Runnable): Cancellable =
system.scheduler.scheduleAtFixedRate(initialDelay, interval)(task)(executionContext)
- override def schedulePeriodically(
- initialDelay: FiniteDuration,
- interval: FiniteDuration,
- task: Runnable): Cancellable =
- system.scheduler.scheduleAtFixedRate(initialDelay, interval)(task)(executionContext)
-
override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable =
system.scheduler.scheduleOnce(delay, task)(executionContext)
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala
index 98dd59fcbb..f7bece9f45 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala
@@ -13,9 +13,6 @@
package org.apache.pekko.stream.javadsl
-import scala.annotation.nowarn
-import scala.concurrent.duration.FiniteDuration
-
import org.apache.pekko
import pekko.NotUsed
import pekko.japi.function
@@ -103,23 +100,9 @@ object BidiFlow {
* every second in one direction, but no elements are flowing in the other direction. I.e. this operator considers
* the *joint* frequencies of the elements in both directions.
*/
- @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
- def bidirectionalIdleTimeout[I, O](timeout: FiniteDuration): BidiFlow[I, I, O, O, NotUsed] =
- new BidiFlow(scaladsl.BidiFlow.bidirectionalIdleTimeout(timeout))
-
- /**
- * If the time between two processed elements *in any direction* exceed the provided timeout, the stream is failed
- * with a [[org.apache.pekko.stream.BackpressureTimeoutException]].
- *
- * There is a difference between this operator and having two idleTimeout Flows assembled into a BidiStage.
- * If the timeout is configured to be 1 seconds, then this operator will not fail even though there are elements flowing
- * every second in one direction, but no elements are flowing in the other direction. I.e. this operator considers
- * the *joint* frequencies of the elements in both directions.
- */
- @nowarn("msg=deprecated")
def bidirectionalIdleTimeout[I, O](timeout: java.time.Duration): BidiFlow[I, I, O, O, NotUsed] = {
import pekko.util.JavaDurationConverters._
- bidirectionalIdleTimeout(timeout.asScala)
+ new BidiFlow(scaladsl.BidiFlow.bidirectionalIdleTimeout(timeout.asScala))
}
}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FileIO.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FileIO.scala
index ebab976d0e..74c941eb79 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FileIO.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FileIO.scala
@@ -13,7 +13,6 @@
package org.apache.pekko.stream.javadsl
-import java.io.File
import java.nio.file.{ OpenOption, Path }
import java.util
import java.util.concurrent.CompletionStage
@@ -30,22 +29,6 @@ import pekko.util.ccompat.JavaConverters._
*/
object FileIO {
- /**
- * Creates a Sink that writes incoming [[pekko.util.ByteString]] elements to the given file.
- * Overwrites existing files by truncating their contents, if you want to append to an existing file use
- * [[toFile(File, util.Set[OpenOption])]] with [[java.nio.file.StandardOpenOption.APPEND]].
- *
- * Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
- * and a possible exception if IO operation was not completed successfully.
- *
- * You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or
- * set it for a given Source by using [[pekko.stream.ActorAttributes]].
- *
- * @param f The file to write to
- */
- @deprecated("Use `toPath` instead.", "Akka 2.4.5")
- def toFile(f: File): javadsl.Sink[ByteString, CompletionStage[IOResult]] = toPath(f.toPath)
-
/**
* Creates a Sink that writes incoming [[pekko.util.ByteString]] elements to the given file path.
* Overwrites existing files by truncating their contents, if you want to append to an existing file
@@ -68,22 +51,6 @@ object FileIO {
def toPath(f: Path): javadsl.Sink[ByteString, CompletionStage[IOResult]] =
new Sink(scaladsl.FileIO.toPath(f).toCompletionStage())
- /**
- * Creates a Sink that writes incoming [[pekko.util.ByteString]] elements to the given file.
- *
- * Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
- * and a possible exception if IO operation was not completed successfully.
- *
- * You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or
- * set it for a given Source by using [[pekko.stream.ActorAttributes]].
- *
- * @param f The file to write to
- * @param options File open options, see [[java.nio.file.StandardOpenOption]]
- */
- @deprecated("Use `toPath` instead.", "Akka 2.4.5")
- def toFile[Opt <: OpenOption](f: File, options: util.Set[Opt]): javadsl.Sink[ByteString, CompletionStage[IOResult]] =
- toPath(f.toPath)
-
/**
* Creates a Sink that writes incoming [[pekko.util.ByteString]] elements to the given file path.
*
@@ -130,23 +97,6 @@ object FileIO {
startPosition: Long): javadsl.Sink[ByteString, CompletionStage[IOResult]] =
new Sink(scaladsl.FileIO.toPath(f, options.asScala.toSet, startPosition).toCompletionStage())
- /**
- * Creates a Source from a files contents.
- * Emitted elements are [[pekko.util.ByteString]] elements, chunked by default by 8192 bytes,
- * except the last element, which will be up to 8192 in size.
- *
- * You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or
- * set it for a given Source by using [[pekko.stream.ActorAttributes]].
- *
- * It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
- * and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does
- * not give any guarantee that the bytes were seen by downstream stages.
- *
- * @param f the file to read from
- */
- @deprecated("Use `fromPath` instead.", "Akka 2.4.5")
- def fromFile(f: File): javadsl.Source[ByteString, CompletionStage[IOResult]] = fromPath(f.toPath)
-
/**
* Creates a Source from a files contents.
* Emitted elements are [[pekko.util.ByteString]] elements, chunked by default by 8192 bytes,
@@ -163,25 +113,6 @@ object FileIO {
*/
def fromPath(f: Path): javadsl.Source[ByteString, CompletionStage[IOResult]] = fromPath(f, 8192)
- /**
- * Creates a synchronous Source from a files contents.
- * Emitted elements are `chunkSize` sized [[pekko.util.ByteString]] elements,
- * except the last element, which will be up to `chunkSize` in size.
- *
- * You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or
- * set it for a given Source by using [[pekko.stream.ActorAttributes]].
- *
- * It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
- * and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does
- * not give any guarantee that the bytes were seen by downstream stages.
- *
- * @param f the file to read from
- * @param chunkSize the size of each read operation
- */
- @deprecated("Use `fromPath` instead.", "Akka 2.4.5")
- def fromFile(f: File, chunkSize: Int): javadsl.Source[ByteString, CompletionStage[IOResult]] =
- fromPath(f.toPath, chunkSize)
-
/**
* Creates a synchronous Source from a files contents.
* Emitted elements are `chunkSize` sized [[pekko.util.ByteString]] elements,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
index 6aaa8fc252..1f550c2a0a 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
@@ -20,7 +20,6 @@ import java.util.stream.Collector
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
-import scala.concurrent.ExecutionContext
import scala.util.Try
import org.apache.pekko
@@ -235,24 +234,6 @@ object Sink {
.foreachAsync(parallelism)((x: T) => f(x).asScala.map(_ => ())(ExecutionContexts.parasitic))
.toCompletionStage())
- /**
- * A `Sink` that will invoke the given procedure for each received element in parallel. The sink is materialized
- * into a [[java.util.concurrent.CompletionStage]].
- *
- * If `f` throws an exception and the supervision decision is
- * [[pekko.stream.Supervision.Stop]] the `CompletionStage` will be completed with failure.
- *
- * If `f` throws an exception and the supervision decision is
- * [[pekko.stream.Supervision.Resume]] or [[pekko.stream.Supervision.Restart]] the
- * element is dropped and the stream continues.
- */
- @deprecated(
- "Use `foreachAsync` instead, it allows you to choose how to run the procedure, by calling some other API returning a CompletionStage or using CompletableFuture.supplyAsync.",
- since = "Akka 2.5.17")
- def foreachParallel[T](parallel: Int, f: function.Procedure[T],
- ec: ExecutionContext): Sink[T, CompletionStage[Done]] =
- new Sink(scaladsl.Sink.foreachParallel(parallel)(f.apply)(ec).toCompletionStage())
-
/**
* A `Sink` that when the flow is completed, either through a failure or normal
* completion, apply the provided function with [[scala.util.Success]]
@@ -394,32 +375,6 @@ object Sink {
new Sink(
scaladsl.Sink.actorRefWithBackpressure[In](ref, onInitMessage, onCompleteMessage, t => onFailureMessage(t)))
- /**
- * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
- * First element is always `onInitMessage`, then stream is waiting for acknowledgement message
- * `ackMessage` from the given actor which means that it is ready to process
- * elements. It also requires `ackMessage` message after each stream element
- * to make backpressure work.
- *
- * If the target actor terminates the stream will be canceled.
- * When the stream is completed successfully the given `onCompleteMessage`
- * will be sent to the destination actor.
- * When the stream is completed with failure - result of `onFailureMessage(throwable)`
- * message will be sent to the destination actor.
- *
- * @deprecated Use actorRefWithBackpressure instead
- */
- @deprecated("Use actorRefWithBackpressure instead", "Akka 2.6.0")
- def actorRefWithAck[In](
- ref: ActorRef,
- onInitMessage: Any,
- ackMessage: Any,
- onCompleteMessage: Any,
- onFailureMessage: function.Function[Throwable, Any]): Sink[In, NotUsed] =
- new Sink(
- scaladsl.Sink
- .actorRefWithBackpressure[In](ref, onInitMessage, ackMessage, onCompleteMessage, t => onFailureMessage(t)))
-
/**
* A graph with the shape of a sink logically is a sink, this method makes
* it so also in type.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 33674833da..d8cdef99f0 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -614,71 +614,6 @@ object Source {
}
}))
- /**
- * Creates a `Source` that is materialized as an [[pekko.actor.ActorRef]].
- * Messages sent to this actor will be emitted to the stream if there is demand from downstream,
- * and a new message will only be accepted after the previous messages has been consumed and acknowledged back.
- * The stream will complete with failure if a message is sent before the acknowledgement has been replied back.
- *
- * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted
- * [[java.lang.Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received
- * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`,
- * the failure will be signaled downstream immediately (instead of the completion signal).
- *
- * The actor will be stopped when the stream is completed, failed or canceled from downstream,
- * i.e. you can watch it to get notified when that happens.
- *
- * @deprecated Use actorRefWithBackpressure instead
- */
- @deprecated("Use actorRefWithBackpressure instead", "Akka 2.6.0")
- def actorRefWithAck[T](
- ackMessage: Any,
- completionMatcher: pekko.japi.function.Function[Any, java.util.Optional[CompletionStrategy]],
- failureMatcher: pekko.japi.function.Function[Any, java.util.Optional[Throwable]]): Source[T, ActorRef] =
- new Source(scaladsl.Source.actorRefWithBackpressure(ackMessage,
- new JavaPartialFunction[Any, CompletionStrategy] {
- override def apply(x: Any, isCheck: Boolean): CompletionStrategy = {
- val result = completionMatcher(x)
- if (!result.isPresent) throw JavaPartialFunction.noMatch()
- else result.get()
- }
- },
- new JavaPartialFunction[Any, Throwable] {
- override def apply(x: Any, isCheck: Boolean): Throwable = {
- val result = failureMatcher(x)
- if (!result.isPresent) throw JavaPartialFunction.noMatch()
- else result.get()
- }
- }))
-
- /**
- * Creates a `Source` that is materialized as an [[pekko.actor.ActorRef]].
- * Messages sent to this actor will be emitted to the stream if there is demand from downstream,
- * and a new message will only be accepted after the previous messages has been consumed and acknowledged back.
- * The stream will complete with failure if a message is sent before the acknowledgement has been replied back.
- *
- * The stream can be completed successfully by sending the actor reference a [[pekko.actor.Status.Success]].
- * If the content is [[pekko.stream.CompletionStrategy.immediately]] the completion will be signaled immediately,
- * otherwise if the content is [[pekko.stream.CompletionStrategy.draining]] (or anything else)
- * already buffered element will be signaled before signaling completion.
- *
- * The stream can be completed with failure by sending a [[pekko.actor.Status.Failure]] to the
- * actor reference. In case the Actor is still draining its internal buffer (after having received
- * a [[pekko.actor.Status.Success]]) before signaling completion and it receives a [[pekko.actor.Status.Failure]],
- * the failure will be signaled downstream immediately (instead of the completion signal).
- *
- * The actor will be stopped when the stream is completed, failed or canceled from downstream,
- * i.e. you can watch it to get notified when that happens.
- */
- @deprecated("Use actorRefWithBackpressure accepting completion and failure matchers", "Akka 2.6.0")
- def actorRefWithAck[T](ackMessage: Any): Source[T, ActorRef] =
- new Source(scaladsl.Source.actorRefWithBackpressure(ackMessage,
- {
- case pekko.actor.Status.Success(s: CompletionStrategy) => s
- case pekko.actor.Status.Success(_) => CompletionStrategy.Draining
- case pekko.actor.Status.Success => CompletionStrategy.Draining
- }, { case pekko.actor.Status.Failure(cause) => cause }))
-
/**
* A graph with the shape of a source logically is a source, this method makes
* it so also in type.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FileIO.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FileIO.scala
index 7aa24b65bb..f090acf487 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FileIO.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FileIO.scala
@@ -13,7 +13,6 @@
package org.apache.pekko.stream.scaladsl
-import java.io.File
import java.nio.file.{ OpenOption, Path }
import java.nio.file.StandardOpenOption._
@@ -30,25 +29,6 @@ import pekko.util.ByteString
*/
object FileIO {
- /**
- * Creates a Source from a files contents.
- * Emitted elements are `chunkSize` sized [[pekko.util.ByteString]] elements,
- * except the final element, which will be up to `chunkSize` in size.
- *
- * You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or
- * set it for a given Source by using [[pekko.stream.ActorAttributes]].
- *
- * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
- * and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does
- * not give any guarantee that the bytes were seen by downstream stages.
- *
- * @param f the file to read from
- * @param chunkSize the size of each read operation, defaults to 8192
- */
- @deprecated("Use `fromPath` instead", "Akka 2.4.5")
- def fromFile(f: File, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
- fromPath(f.toPath, chunkSize)
-
/**
* Creates a Source from a files contents.
* Emitted elements are `chunkSize` sized [[pekko.util.ByteString]] elements,
@@ -86,25 +66,6 @@ object FileIO {
def fromPath(f: Path, chunkSize: Int, startPosition: Long): Source[ByteString, Future[IOResult]] =
Source.fromGraph(new FileSource(f, chunkSize, startPosition)).withAttributes(DefaultAttributes.fileSource)
- /**
- * Creates a Sink which writes incoming [[pekko.util.ByteString]] elements to the given file. Overwrites existing files
- * by truncating their contents as default.
- *
- * Materializes a [[Future]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
- * and a possible exception if IO operation was not completed successfully.
- *
- * This source is backed by an Actor which will use the dedicated `pekko.stream.blocking-io-dispatcher`,
- * unless configured otherwise by using [[pekko.stream.ActorAttributes]].
- *
- * @param f the file to write to
- * @param options File open options, see [[java.nio.file.StandardOpenOption]], defaults to Set(WRITE, TRUNCATE_EXISTING, CREATE)
- */
- @deprecated("Use `toPath` instead", "Akka 2.4.5")
- def toFile(
- f: File,
- options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]] =
- toPath(f.toPath, options)
-
/**
* Creates a Sink which writes incoming [[pekko.util.ByteString]] elements to the given file path. Overwrites existing files
* by truncating their contents as default.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
index 9cb9f08296..cfd78106fb 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
@@ -16,7 +16,7 @@ package org.apache.pekko.stream.scaladsl
import scala.annotation.tailrec
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
-import scala.concurrent.{ ExecutionContext, Future }
+import scala.concurrent.Future
import scala.util.{ Failure, Success, Try }
import org.apache.pekko
@@ -387,25 +387,6 @@ object Sink {
})
}
- /**
- * A `Sink` that will invoke the given function to each of the elements
- * as they pass in. The sink is materialized into a [[scala.concurrent.Future]]
- *
- * If `f` throws an exception and the supervision decision is
- * [[pekko.stream.Supervision.Stop]] the `Future` will be completed with failure.
- *
- * If `f` throws an exception and the supervision decision is
- * [[pekko.stream.Supervision.Resume]] or [[pekko.stream.Supervision.Restart]] the
- * element is dropped and the stream continues.
- *
- * See also [[Flow.mapAsyncUnordered]]
- */
- @deprecated(
- "Use `foreachAsync` instead, it allows you to choose how to run the procedure, by calling some other API returning a Future or spawning a new Future.",
- since = "Akka 2.5.17")
- def foreachParallel[T](parallelism: Int)(f: T => Unit)(implicit ec: ExecutionContext): Sink[T, Future[Done]] =
- Flow[T].mapAsyncUnordered(parallelism)(t => Future(f(t))).toMat(Sink.ignore)(Keep.right)
-
/**
* A `Sink` that will invoke the given function for every received element, giving it its previous
* output (or the given `zero` value) and the element as input.
@@ -697,28 +678,6 @@ object Sink {
onFailureMessage: Throwable => Any): Sink[T, NotUsed] =
actorRefWithAck(ref, _ => identity, _ => onInitMessage, None, onCompleteMessage, onFailureMessage)
- /**
- * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
- * First element is always `onInitMessage`, then stream is waiting for acknowledgement message
- * `ackMessage` from the given actor which means that it is ready to process
- * elements. It also requires `ackMessage` message after each stream element
- * to make backpressure work.
- *
- * If the target actor terminates the stream will be canceled.
- * When the stream is completed successfully the given `onCompleteMessage`
- * will be sent to the destination actor.
- * When the stream is completed with failure - result of `onFailureMessage(throwable)`
- * function will be sent to the destination actor.
- */
- @deprecated("Use actorRefWithBackpressure accepting completion and failure matchers instead", "Akka 2.6.0")
- def actorRefWithAck[T](
- ref: ActorRef,
- onInitMessage: Any,
- ackMessage: Any,
- onCompleteMessage: Any,
- onFailureMessage: (Throwable) => Any = Status.Failure.apply): Sink[T, NotUsed] =
- actorRefWithAck(ref, _ => identity, _ => onInitMessage, Some(ackMessage), onCompleteMessage, onFailureMessage)
-
/**
* Creates a `Sink` that is materialized as an [[pekko.stream.scaladsl.SinkQueueWithCancel]].
* [[pekko.stream.scaladsl.SinkQueueWithCancel.pull]] method is pulling element from the stream and returns ``Future[Option[T]``.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index e8b185fa39..0bd946cd9e 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -799,34 +799,6 @@ object Source {
Source.fromGraph(new ActorRefBackpressureSource(None, ackMessage, completionMatcher, failureMatcher))
}
- /**
- * Creates a `Source` that is materialized as an [[pekko.actor.ActorRef]].
- * Messages sent to this actor will be emitted to the stream if there is demand from downstream,
- * and a new message will only be accepted after the previous messages has been consumed and acknowledged back.
- * The stream will complete with failure if a message is sent before the acknowledgement has been replied back.
- *
- * The stream can be completed successfully by sending the actor reference a [[pekko.actor.Status.Success]].
- * If the content is [[pekko.stream.CompletionStrategy.immediately]] the completion will be signaled immediately,
- * otherwise if the content is [[pekko.stream.CompletionStrategy.draining]] (or anything else)
- * already buffered element will be signaled before signaling completion.
- *
- * The stream can be completed with failure by sending a [[pekko.actor.Status.Failure]] to the
- * actor reference. In case the Actor is still draining its internal buffer (after having received
- * a [[pekko.actor.Status.Success]]) before signaling completion and it receives a [[pekko.actor.Status.Failure]],
- * the failure will be signaled downstream immediately (instead of the completion signal).
- *
- * The actor will be stopped when the stream is completed, failed or canceled from downstream,
- * i.e. you can watch it to get notified when that happens.
- */
- @deprecated("Use actorRefWithBackpressure accepting completion and failure matchers instead", "Akka 2.6.0")
- def actorRefWithAck[T](ackMessage: Any): Source[T, ActorRef] =
- actorRefWithAck(None, ackMessage,
- {
- case pekko.actor.Status.Success(s: CompletionStrategy) => s
- case pekko.actor.Status.Success(_) => CompletionStrategy.Draining
- case pekko.actor.Status.Success => CompletionStrategy.Draining
- }, { case pekko.actor.Status.Failure(cause) => cause })
-
/**
* Combines several sources with fan-in strategy like [[Merge]] or [[Concat]] into a single [[Source]].
*/