From 19788583ee839eb12e1ccb9901bfe23ef1bd80c6 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sat, 2 Aug 2025 13:18:20 +0100 Subject: [PATCH] remove more deprecated code (#1984) * remove more deprecated code * Update DslFactoriesConsistencySpec.scala * mima * remove docs for removed FileIO operations * more * remove foreachparallel * remove deprecated patterns * tidy up * Update remove-deprecated-methods.excludes --- .../remove-deprecated-methods.excludes | 19 + .../internal/EffectfulActorContext.scala | 3 - .../typed/internal/LoggingTestKitImpl.scala | 4 - .../typed/scaladsl/LoggingTestKit.scala | 11 - .../remove-deprecated-methods.excludes | 20 + .../typed/internal/TimerSchedulerImpl.scala | 8 - .../actor/typed/javadsl/TimerScheduler.scala | 9 - .../actor/typed/scaladsl/TimerScheduler.scala | 9 - .../remove-deprecated-methods.excludes | 2 + .../org/apache/pekko/pattern/Patterns.scala | 465 ------------------ .../remove-deprecated-methods.excludes | 20 + .../sharding/ClusterShardingSettings.scala | 323 ------------ .../cluster/sharding/ShardCoordinator.scala | 3 +- .../stream/operators/FileIO/fromFile.md | 22 - .../paradox/stream/operators/FileIO/toFile.md | 21 - .../stream/operators/Sink/foreachParallel.md | 16 - .../main/paradox/stream/operators/index.md | 6 - .../impl/fusing/GraphInterpreterSpecKit.scala | 3 - .../stream/DslFactoriesConsistencySpec.scala | 1 + .../scaladsl/SinkForeachParallelSpec.scala | 150 ------ .../remove-deprecated-methods.excludes | 20 + .../stream/typed/javadsl/ActorSource.scala | 42 -- .../stream/typed/scaladsl/ActorSource.scala | 28 -- .../remove-deprecated-methods.excludes | 12 + .../apache/pekko/stream/Materializer.scala | 13 - .../stream/impl/ActorMaterializerImpl.scala | 6 - .../impl/PhasedFusingActorMaterializer.scala | 6 - .../pekko/stream/javadsl/BidiFlow.scala | 19 +- .../apache/pekko/stream/javadsl/FileIO.scala | 69 --- .../apache/pekko/stream/javadsl/Sink.scala | 45 -- .../apache/pekko/stream/javadsl/Source.scala | 65 --- .../apache/pekko/stream/scaladsl/FileIO.scala | 39 -- .../apache/pekko/stream/scaladsl/Sink.scala | 43 +- .../apache/pekko/stream/scaladsl/Source.scala | 28 -- 34 files changed, 97 insertions(+), 1453 deletions(-) create mode 100644 actor-testkit-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes create mode 100644 actor-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes create mode 100644 cluster-sharding/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes delete mode 100644 docs/src/main/paradox/stream/operators/FileIO/fromFile.md delete mode 100644 docs/src/main/paradox/stream/operators/FileIO/toFile.md delete mode 100644 docs/src/main/paradox/stream/operators/Sink/foreachParallel.md delete mode 100644 stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkForeachParallelSpec.scala create mode 100644 stream-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes diff --git a/actor-testkit-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/actor-testkit-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes new file mode 100644 index 0000000000..0fb18f5c50 --- /dev/null +++ b/actor-testkit-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Remove deprecated methods +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.actor.testkit.typed.scaladsl.LoggingTestKit.intercept") diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/EffectfulActorContext.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/EffectfulActorContext.scala index 6e38d38877..67e24c75e4 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/EffectfulActorContext.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/EffectfulActorContext.scala @@ -111,9 +111,6 @@ import scala.reflect.ClassTag override def startTimerAtFixedRate(key: Any, msg: T, initialDelay: FiniteDuration, interval: FiniteDuration): Unit = startTimer(key, msg, interval, Effect.TimerScheduled.FixedRateModeWithInitialDelay(initialDelay)) - override def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit = - startTimer(key, msg, interval, Effect.TimerScheduled.FixedRateMode) - override def startSingleTimer(key: Any, msg: T, delay: FiniteDuration): Unit = startTimer(key, msg, delay, Effect.TimerScheduled.SingleMode) diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/LoggingTestKitImpl.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/LoggingTestKitImpl.scala index 555c4c4c22..35cfcd3b18 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/LoggingTestKitImpl.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/LoggingTestKitImpl.scala @@ -123,10 +123,6 @@ import pekko.testkit.TestKit override def expect[T](system: ActorSystem[_], code: Supplier[T]): T = expect(code.get())(system) - // deprecated (renamed to expect) - override def intercept[T](code: => T)(implicit system: ActorSystem[_]): T = - expect(code)(system) - private def checkLogback(system: ActorSystem[_]): Unit = { if (!system.dynamicAccess.classIsOnClasspath("ch.qos.logback.classic.spi.ILoggingEvent")) { throw new IllegalStateException("LoggingEventFilter requires logback-classic dependency in classpath.") diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/LoggingTestKit.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/LoggingTestKit.scala index 460fa67270..26c2ed13da 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/LoggingTestKit.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/LoggingTestKit.scala @@ -109,17 +109,6 @@ import pekko.annotation.DoNotInherit * Care is taken to remove the testkit when the block is finished or aborted. */ def expect[T](code: => T)(implicit system: ActorSystem[_]): T - - /** - * Run the given code block and assert that the criteria of this `LoggingTestKit` has - * matched within the configured `pekko.actor.testkit.typed.filter-leeway` - * as often as requested by its `occurrences` parameter specifies. - * - * Care is taken to remove the testkit when the block is finished or aborted. - */ - @deprecated("Use expect instead.", "Akka 2.6.0") - def intercept[T](code: => T)(implicit system: ActorSystem[_]): T - } /** diff --git a/actor-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/actor-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes new file mode 100644 index 0000000000..c301da7df2 --- /dev/null +++ b/actor-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Remove deprecated methods +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.actor.typed.javadsl.TimerScheduler.startPeriodicTimer") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.actor.typed.scaladsl.TimerScheduler.startPeriodicTimer") diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/TimerSchedulerImpl.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/TimerSchedulerImpl.scala index dc25fdfdec..b9b09d7605 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/TimerSchedulerImpl.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/TimerSchedulerImpl.scala @@ -78,11 +78,6 @@ import scala.concurrent.duration.FiniteDuration override final def startTimerAtFixedRate(key: Any, msg: T, initialDelay: Duration, interval: Duration): Unit = startTimerAtFixedRate(key, msg, initialDelay.asScala, interval.asScala) - override final def startPeriodicTimer(key: Any, msg: T, interval: Duration): Unit = { - // this follows the deprecation note in the super class - startTimerWithFixedDelay(key, msg, interval.asScala) - } - override final def startSingleTimer(key: Any, msg: T, delay: Duration): Unit = startSingleTimer(key, msg, delay.asScala) } @@ -110,9 +105,6 @@ import scala.concurrent.duration.FiniteDuration override def startTimerWithFixedDelay(key: Any, msg: T, initialDelay: FiniteDuration, delay: FiniteDuration): Unit = startTimer(key, msg, delay, FixedDelayMode(initialDelay)) - override def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit = - startTimer(key, msg, interval, FixedRateMode(interval)) - override def startSingleTimer(key: Any, msg: T, delay: FiniteDuration): Unit = startTimer(key, msg, delay, SingleMode) diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/TimerScheduler.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/TimerScheduler.scala index b0083d9192..2f65cb18c4 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/TimerScheduler.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/TimerScheduler.scala @@ -219,15 +219,6 @@ trait TimerScheduler[T] { def startTimerAtFixedRate(msg: T, initialDelay: java.time.Duration, interval: java.time.Duration): Unit = startTimerAtFixedRate(msg, msg, initialDelay, interval) - /** - * Deprecated API: See [[TimerScheduler#startTimerWithFixedDelay]] or [[TimerScheduler#startTimerAtFixedRate]]. - */ - @deprecated( - "Use startTimerWithFixedDelay or startTimerAtFixedRate instead. This has the same semantics as " + - "startTimerAtFixedRate, but startTimerWithFixedDelay is often preferred.", - since = "Akka 2.6.0") - def startPeriodicTimer(key: Any, msg: T, interval: Duration): Unit - /** * Start a timer that will send `msg` once to the `self` actor after * the given `delay`. diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/TimerScheduler.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/TimerScheduler.scala index e84b1f2436..f121810e2e 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/TimerScheduler.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/TimerScheduler.scala @@ -219,15 +219,6 @@ trait TimerScheduler[T] { def startTimerAtFixedRate(msg: T, initialDelay: FiniteDuration, interval: FiniteDuration): Unit = startTimerAtFixedRate(msg, msg, initialDelay, interval) - /** - * Deprecated API: See [[TimerScheduler#startTimerWithFixedDelay]] or [[TimerScheduler#startTimerAtFixedRate]]. - */ - @deprecated( - "Use startTimerWithFixedDelay or startTimerAtFixedRate instead. This has the same semantics as " + - "startTimerAtFixedRate, but startTimerWithFixedDelay is often preferred.", - since = "Akka 2.6.0") - def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit - /** * Start a timer that will send `msg` once to the `self` actor after * the given `delay`. diff --git a/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes index aa5f869fda..1ffacf5376 100644 --- a/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes +++ b/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes @@ -53,4 +53,6 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.pattern.Bac ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.pattern.BackoffSupervisor.this") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.pattern.CircuitBreaker.create") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.pattern.CircuitBreaker.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.pattern.Patterns.after") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.pattern.PatternsCS*") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.serialization.Serialization.deserialize") diff --git a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala index c263d852db..66f17a1a4f 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala @@ -477,26 +477,6 @@ object Patterns { value: Callable[CompletionStage[T]]): CompletionStage[T] = timeoutCompletionStage(duration.asScala, scheduler)(value.call())(context) - /** - * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable - * after the specified duration. - */ - @deprecated("Use the overload one which accepts a Callable of Future instead.", since = "Akka 2.5.22") - def after[T](duration: FiniteDuration, scheduler: Scheduler, context: ExecutionContext, value: Future[T]): Future[T] = - scalaAfter(duration, scheduler)(value)(context) - - /** - * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided value - * after the specified duration. - */ - @deprecated("Use the overloaded one which accepts a Callable of CompletionStage instead.", since = "Akka 2.5.22") - def after[T]( - duration: java.time.Duration, - scheduler: Scheduler, - context: ExecutionContext, - value: CompletionStage[T]): CompletionStage[T] = - afterCompletionStage(duration.asScala, scheduler)(value)(context) - /** * Returns an internally retrying [[java.util.concurrent.CompletionStage]] * The first attempt will be made immediately, each subsequent attempt will be made immediately @@ -881,448 +861,3 @@ object Patterns { attempted => delayFunction.apply(attempted).toScala.map(_.asScala))(context, scheduler).asJava } } - -/** - * Java 8+ API for Pekko patterns such as `ask`, `pipe` and others which work with [[java.util.concurrent.CompletionStage]]. - * - * For working with Scala [[scala.concurrent.Future]] from Java you may want to use [[pekko.pattern.Patterns]] instead. - */ -@deprecated("Use Patterns instead.", since = "Akka 2.5.19") -object PatternsCS { - import scala.concurrent.duration._ - - import pekko.actor.ActorRef - import pekko.japi - import pekko.pattern.{ ask => scalaAsk, gracefulStop => scalaGracefulStop, retry => scalaRetry } - import pekko.util.Timeout - - /** - * Java API for `org.apache.pekko.pattern.ask`: - * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] - * holding the eventual reply message; this means that the target actor - * needs to send the result to the `sender` reference provided. - * - * The CompletionStage will be completed with an [[pekko.pattern.AskTimeoutException]] after the - * given timeout has expired; this is independent from any timeout applied - * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the - * recipient actor didn't send a reply. - * - * Warning: - * When using future callbacks, inside actors you need to carefully avoid closing over - * the containing actor’s object, i.e. do not call methods or access mutable state - * on the enclosing actor from within the callback. This would break the actor - * encapsulation and may introduce synchronization bugs and race conditions because - * the callback will be scheduled concurrently to the enclosing actor. Unfortunately - * there is not yet a way to detect these illegal accesses at compile time. - * - * Recommended usage: - * - * {{{ - * final CompletionStage f = PatternsCS.ask(worker, request, timeout); - * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); - * }}} - */ - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.15") - def ask(actor: ActorRef, message: Any, timeout: Timeout): CompletionStage[AnyRef] = - scalaAsk(actor, message)(timeout).asJava.asInstanceOf[CompletionStage[AnyRef]] - - /** - * Java API for `org.apache.pekko.pattern.ask`: - * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] - * holding the eventual reply message; this means that the target actor - * needs to send the result to the `sender` reference provided. - * - * The CompletionStage will be completed with an [[pekko.pattern.AskTimeoutException]] after the - * given timeout has expired; this is independent from any timeout applied - * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the - * recipient actor didn't send a reply. - * - * Warning: - * When using future callbacks, inside actors you need to carefully avoid closing over - * the containing actor’s object, i.e. do not call methods or access mutable state - * on the enclosing actor from within the callback. This would break the actor - * encapsulation and may introduce synchronization bugs and race conditions because - * the callback will be scheduled concurrently to the enclosing actor. Unfortunately - * there is not yet a way to detect these illegal accesses at compile time. - * - * Recommended usage: - * - * {{{ - * final CompletionStage f = PatternsCS.ask(worker, request, duration); - * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); - * }}} - */ - @deprecated("Use Patterns.ask instead.", since = "Akka 2.5.19") - def ask(actor: ActorRef, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] = - ask(actor, message, Timeout.create(timeout)) - - /** - * A variation of ask which allows to implement "replyTo" pattern by including - * sender reference in message. - * - * {{{ - * final CompletionStage f = PatternsCS.askWithReplyTo( - * worker, - * askSender -> new Request(askSender), - * timeout); - * }}} - * - * @param actor the actor to be asked - * @param messageFactory function taking an actor ref and returning the message to be sent - * @param timeout the timeout for the response before failing the returned completion operator - */ - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.15") - def askWithReplyTo( - actor: ActorRef, - messageFactory: japi.function.Function[ActorRef, Any], - timeout: Timeout): CompletionStage[AnyRef] = - extended.ask(actor, messageFactory.apply _)(timeout).asJava.asInstanceOf[CompletionStage[AnyRef]] - - /** - * A variation of ask which allows to implement "replyTo" pattern by including - * sender reference in message. - * - * {{{ - * final CompletionStage f = PatternsCS.askWithReplyTo( - * worker, - * askSender -> new Request(askSender), - * timeout); - * }}} - * - * @param actor the actor to be asked - * @param messageFactory function taking an actor ref and returning the message to be sent - * @param timeout the timeout for the response before failing the returned completion stage - */ - @deprecated("Use Pattens.askWithReplyTo instead.", since = "Akka 2.5.19") - def askWithReplyTo( - actor: ActorRef, - messageFactory: japi.function.Function[ActorRef, Any], - timeout: java.time.Duration): CompletionStage[AnyRef] = - extended.ask(actor, messageFactory.apply _)(Timeout.create(timeout)).asJava.asInstanceOf[CompletionStage[AnyRef]] - - /** - * Java API for `org.apache.pekko.pattern.ask`: - * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] - * holding the eventual reply message; this means that the target actor - * needs to send the result to the `sender` reference provided. - * - * The CompletionStage will be completed with an [[pekko.pattern.AskTimeoutException]] after the - * given timeout has expired; this is independent from any timeout applied - * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the - * recipient actor didn't send a reply. - * - * Warning: - * When using future callbacks, inside actors you need to carefully avoid closing over - * the containing actor’s object, i.e. do not call methods or access mutable state - * on the enclosing actor from within the callback. This would break the actor - * encapsulation and may introduce synchronization bugs and race conditions because - * the callback will be scheduled concurrently to the enclosing actor. Unfortunately - * there is not yet a way to detect these illegal accesses at compile time. - * - * Recommended usage: - * - * {{{ - * final CompletionStage f = PatternsCS.ask(worker, request, timeout); - * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); - * }}} - */ - @deprecated("Use Pattens.ask which accepts java.time.Duration instead.", since = "Akka 2.5.19") - def ask(actor: ActorRef, message: Any, timeoutMillis: Long): CompletionStage[AnyRef] = - scalaAsk(actor, message)(new Timeout(timeoutMillis, TimeUnit.MILLISECONDS)).asJava - .asInstanceOf[CompletionStage[AnyRef]] - - /** - * A variation of ask which allows to implement "replyTo" pattern by including - * sender reference in message. - * - * {{{ - * final CompletionStage f = PatternsCS.askWithReplyTo( - * worker, - * replyTo -> new Request(replyTo), - * timeout); - * }}} - * - * @param actor the actor to be asked - * @param messageFactory function taking an actor ref to reply to and returning the message to be sent - * @param timeoutMillis the timeout for the response before failing the returned completion operator - */ - @deprecated("Use Pattens.askWithReplyTo which accepts java.time.Duration instead.", since = "Akka 2.5.19") - def askWithReplyTo( - actor: ActorRef, - messageFactory: japi.function.Function[ActorRef, Any], - timeoutMillis: Long): CompletionStage[AnyRef] = - askWithReplyTo(actor, messageFactory, Timeout(timeoutMillis.millis)) - - /** - * Java API for `org.apache.pekko.pattern.ask`: - * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] - * holding the eventual reply message; this means that the target [[pekko.actor.ActorSelection]] - * needs to send the result to the `sender` reference provided. - * - * The CompletionStage will be completed with an [[pekko.pattern.AskTimeoutException]] after the - * given timeout has expired; this is independent from any timeout applied - * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the - * recipient actor didn't send a reply. - * - * Warning: - * When using future callbacks, inside actors you need to carefully avoid closing over - * the containing actor’s object, i.e. do not call methods or access mutable state - * on the enclosing actor from within the callback. This would break the actor - * encapsulation and may introduce synchronization bugs and race conditions because - * the callback will be scheduled concurrently to the enclosing actor. Unfortunately - * there is not yet a way to detect these illegal accesses at compile time. - * - * Recommended usage: - * - * {{{ - * final CompletionStage f = PatternsCS.ask(selection, request, timeout); - * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); - * }}} - */ - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.15") - def ask(selection: ActorSelection, message: Any, timeout: Timeout): CompletionStage[AnyRef] = - scalaAsk(selection, message)(timeout).asJava.asInstanceOf[CompletionStage[AnyRef]] - - /** - * Java API for `org.apache.pekko.pattern.ask`: - * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] - * holding the eventual reply message; this means that the target [[pekko.actor.ActorSelection]] - * needs to send the result to the `sender` reference provided. - * - * The CompletionStage will be completed with an [[pekko.pattern.AskTimeoutException]] after the - * given timeout has expired; this is independent from any timeout applied - * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the - * recipient actor didn't send a reply. - * - * Warning: - * When using future callbacks, inside actors you need to carefully avoid closing over - * the containing actor’s object, i.e. do not call methods or access mutable state - * on the enclosing actor from within the callback. This would break the actor - * encapsulation and may introduce synchronization bugs and race conditions because - * the callback will be scheduled concurrently to the enclosing actor. Unfortunately - * there is not yet a way to detect these illegal accesses at compile time. - * - * Recommended usage: - * - * {{{ - * final CompletionStage f = PatternsCS.ask(selection, request, duration); - * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); - * }}} - */ - @deprecated("Use Patterns.ask instead.", since = "Akka 2.5.19") - def ask(selection: ActorSelection, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] = - ask(selection, message, Timeout.create(timeout)) - - /** - * Java API for `org.apache.pekko.pattern.ask`: - * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] - * holding the eventual reply message; this means that the target [[pekko.actor.ActorSelection]] - * needs to send the result to the `sender` reference provided. - * - * The CompletionStage will be completed with an [[pekko.pattern.AskTimeoutException]] after the - * given timeout has expired; this is independent from any timeout applied - * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the - * recipient actor didn't send a reply. - * - * Warning: - * When using future callbacks, inside actors you need to carefully avoid closing over - * the containing actor’s object, i.e. do not call methods or access mutable state - * on the enclosing actor from within the callback. This would break the actor - * encapsulation and may introduce synchronization bugs and race conditions because - * the callback will be scheduled concurrently to the enclosing actor. Unfortunately - * there is not yet a way to detect these illegal accesses at compile time. - * - * Recommended usage: - * - * {{{ - * final CompletionStage f = PatternsCS.ask(selection, request, timeout); - * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); - * }}} - */ - @deprecated("Use Pattens.ask which accepts java.time.Duration instead.", since = "Akka 2.5.19") - def ask(selection: ActorSelection, message: Any, timeoutMillis: Long): CompletionStage[AnyRef] = - scalaAsk(selection, message)(new Timeout(timeoutMillis, TimeUnit.MILLISECONDS)).asJava - .asInstanceOf[CompletionStage[AnyRef]] - - /** - * A variation of ask which allows to implement "replyTo" pattern by including - * sender reference in message. - * - * {{{ - * final CompletionStage f = Patterns.askWithReplyTo( - * selection, - * askSender -> new Request(askSender), - * timeout); - * }}} - */ - @deprecated("Use Pattens.askWithReplyTo which accepts java.time.Duration instead.", since = "Akka 2.5.19") - def askWithReplyTo( - selection: ActorSelection, - messageFactory: japi.Function[ActorRef, Any], - timeoutMillis: Long): CompletionStage[AnyRef] = - extended - .ask(selection, messageFactory.apply _)(Timeout(timeoutMillis.millis)) - .asJava - .asInstanceOf[CompletionStage[AnyRef]] - - /** - * When this [[java.util.concurrent.CompletionStage]] finishes, send its result to the given - * [[pekko.actor.ActorRef]] or [[pekko.actor.ActorSelection]]. - * Returns the original CompletionStage to allow method chaining. - * If the future was completed with failure it is sent as a [[pekko.actor.Status.Failure]] - * to the recipient. - * - * Recommended usage example: - * - * {{{ - * final CompletionStage f = PatternsCS.ask(worker, request, timeout); - * // apply some transformation (i.e. enrich with request info) - * final CompletionStage transformed = f.thenApply(result -> { ... }); - * // send it on to the next operator - * PatternsCS.pipe(transformed, context).to(nextActor); - * }}} - */ - @deprecated("Use Patterns.pipe instead.", since = "Akka 2.5.19") - def pipe[T](future: CompletionStage[T], context: ExecutionContext): PipeableCompletionStage[T] = - pipeCompletionStage(future)(context) - - /** - * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with success (value `true`) when - * existing messages of the target actor has been processed and the actor has been - * terminated. - * - * Useful when you need to wait for termination or compose ordered termination of several actors. - * - * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]] - * is completed with failure [[pekko.pattern.AskTimeoutException]]. - */ - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12") - def gracefulStop(target: ActorRef, timeout: FiniteDuration): CompletionStage[java.lang.Boolean] = - scalaGracefulStop(target, timeout).asJava.asInstanceOf[CompletionStage[java.lang.Boolean]] - - /** - * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with success (value `true`) when - * existing messages of the target actor has been processed and the actor has been - * terminated. - * - * Useful when you need to wait for termination or compose ordered termination of several actors. - * - * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]] - * is completed with failure [[pekko.pattern.AskTimeoutException]]. - */ - @deprecated("Use Patterns.gracefulStop instead.", since = "Akka 2.5.19") - def gracefulStop(target: ActorRef, timeout: java.time.Duration): CompletionStage[java.lang.Boolean] = - scalaGracefulStop(target, timeout.asScala).asJava.asInstanceOf[CompletionStage[java.lang.Boolean]] - - /** - * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with success (value `true`) when - * existing messages of the target actor has been processed and the actor has been - * terminated. - * - * Useful when you need to wait for termination or compose ordered termination of several actors. - * - * If you want to invoke specialized stopping logic on your target actor instead of PoisonPill, you can pass your - * stop command as `stopMessage` parameter - * - * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]] - * is completed with failure [[pekko.pattern.AskTimeoutException]]. - */ - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12") - def gracefulStop(target: ActorRef, timeout: FiniteDuration, stopMessage: Any): CompletionStage[java.lang.Boolean] = - scalaGracefulStop(target, timeout, stopMessage).asJava.asInstanceOf[CompletionStage[java.lang.Boolean]] - - /** - * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with success (value `true`) when - * existing messages of the target actor has been processed and the actor has been - * terminated. - * - * Useful when you need to wait for termination or compose ordered termination of several actors. - * - * If you want to invoke specialized stopping logic on your target actor instead of PoisonPill, you can pass your - * stop command as `stopMessage` parameter - * - * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]] - * is completed with failure [[pekko.pattern.AskTimeoutException]]. - */ - @deprecated("Use Patterns.gracefulStop instead.", since = "Akka 2.5.19") - def gracefulStop( - target: ActorRef, - timeout: java.time.Duration, - stopMessage: Any): CompletionStage[java.lang.Boolean] = - scalaGracefulStop(target, timeout.asScala, stopMessage).asJava.asInstanceOf[CompletionStage[java.lang.Boolean]] - - /** - * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided Callable - * after the specified duration. - */ - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12") - def after[T]( - duration: FiniteDuration, - scheduler: Scheduler, - context: ExecutionContext, - value: Callable[CompletionStage[T]]): CompletionStage[T] = - afterCompletionStage(duration, scheduler)(value.call())(context) - - /** - * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided Callable - * after the specified duration. - */ - @deprecated("Use Patterns.after instead.", since = "Akka 2.5.19") - def after[T]( - duration: java.time.Duration, - scheduler: Scheduler, - context: ExecutionContext, - value: Callable[CompletionStage[T]]): CompletionStage[T] = - afterCompletionStage(duration.asScala, scheduler)(value.call())(context) - - /** - * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided value - * after the specified duration. - */ - @deprecated( - "Use Patterns.after which accepts java.time.Duration and Callable of CompletionStage instead.", - since = "Akka 2.5.22") - def after[T]( - duration: FiniteDuration, - scheduler: Scheduler, - context: ExecutionContext, - value: CompletionStage[T]): CompletionStage[T] = - afterCompletionStage(duration, scheduler)(value)(context) - - /** - * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided value - * after the specified duration. - */ - @deprecated( - "Use Patterns.after which accepts java.time.Duration and Callable of CompletionStage instead.", - since = "Akka 2.5.22") - def after[T]( - duration: java.time.Duration, - scheduler: Scheduler, - context: ExecutionContext, - value: CompletionStage[T]): CompletionStage[T] = - afterCompletionStage(duration.asScala, scheduler)(value)(context) - - /** - * Returns an internally retrying [[java.util.concurrent.CompletionStage]] - * The first attempt will be made immediately, and each subsequent attempt will be made after 'delay'. - * A scheduler (eg context.system.scheduler) must be provided to delay each retry - * If attempts are exhausted the returned completion operator is simply the result of invoking attempt. - * Note that the attempt function will be invoked on the given execution context for subsequent tries - * and therefore must be thread safe (i.e. not touch unsafe mutable state). - */ - @deprecated("Use Patterns.retry instead.", since = "Akka 2.5.19") - def retry[T]( - attempt: Callable[CompletionStage[T]], - attempts: Int, - delay: java.time.Duration, - scheduler: Scheduler, - ec: ExecutionContext): CompletionStage[T] = - scalaRetry(() => attempt.call().asScala, attempts, delay.asScala)(ec, scheduler).asJava -} diff --git a/cluster-sharding/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/cluster-sharding/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes new file mode 100644 index 0000000000..f129e622b9 --- /dev/null +++ b/cluster-sharding/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Remove deprecated methods +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.cluster.sharding.ClusterShardingSettings.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.cluster.sharding.ClusterShardingSettings#TuningParameters.this") diff --git a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingSettings.scala b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingSettings.scala index 2fc309cc75..18d0430ee4 100644 --- a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingSettings.scala +++ b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingSettings.scala @@ -937,172 +937,6 @@ object ClusterShardingSettings { require( entityRecoveryStrategy == "all" || entityRecoveryStrategy == "constant", s"Unknown 'entity-recovery-strategy' [$entityRecoveryStrategy], valid values are 'all' or 'constant'") - - // included for binary compatibility - @deprecated( - "Use the ClusterShardingSettings factory methods or the constructor including " + - "leastShardAllocationAbsoluteLimit and leastShardAllocationRelativeLimit instead", - since = "Akka 2.6.10") - def this( - coordinatorFailureBackoff: FiniteDuration, - retryInterval: FiniteDuration, - bufferSize: Int, - handOffTimeout: FiniteDuration, - shardStartTimeout: FiniteDuration, - shardFailureBackoff: FiniteDuration, - entityRestartBackoff: FiniteDuration, - rebalanceInterval: FiniteDuration, - snapshotAfter: Int, - keepNrOfBatches: Int, - leastShardAllocationRebalanceThreshold: Int, - leastShardAllocationMaxSimultaneousRebalance: Int, - waitingForStateTimeout: FiniteDuration, - updatingStateTimeout: FiniteDuration, - entityRecoveryStrategy: String, - entityRecoveryConstantRateStrategyFrequency: FiniteDuration, - entityRecoveryConstantRateStrategyNumberOfEntities: Int, - coordinatorStateWriteMajorityPlus: Int, - coordinatorStateReadMajorityPlus: Int) = - this( - coordinatorFailureBackoff, - retryInterval, - bufferSize, - handOffTimeout, - shardStartTimeout, - shardFailureBackoff, - entityRestartBackoff, - rebalanceInterval, - snapshotAfter, - keepNrOfBatches, - leastShardAllocationRebalanceThreshold, - leastShardAllocationMaxSimultaneousRebalance, - waitingForStateTimeout, - updatingStateTimeout, - entityRecoveryStrategy, - entityRecoveryConstantRateStrategyFrequency, - entityRecoveryConstantRateStrategyNumberOfEntities, - coordinatorStateWriteMajorityPlus, - coordinatorStateReadMajorityPlus, - leastShardAllocationAbsoluteLimit = 100, - leastShardAllocationRelativeLimit = 0.1) - - // included for binary compatibility - @deprecated( - "Use the ClusterShardingSettings factory methods or the constructor including " + - "coordinatorStateWriteMajorityPlus and coordinatorStateReadMajorityPlus instead", - since = "Akka 2.6.5") - def this( - coordinatorFailureBackoff: FiniteDuration, - retryInterval: FiniteDuration, - bufferSize: Int, - handOffTimeout: FiniteDuration, - shardStartTimeout: FiniteDuration, - shardFailureBackoff: FiniteDuration, - entityRestartBackoff: FiniteDuration, - rebalanceInterval: FiniteDuration, - snapshotAfter: Int, - keepNrOfBatches: Int, - leastShardAllocationRebalanceThreshold: Int, - leastShardAllocationMaxSimultaneousRebalance: Int, - waitingForStateTimeout: FiniteDuration, - updatingStateTimeout: FiniteDuration, - entityRecoveryStrategy: String, - entityRecoveryConstantRateStrategyFrequency: FiniteDuration, - entityRecoveryConstantRateStrategyNumberOfEntities: Int) = - this( - coordinatorFailureBackoff, - retryInterval, - bufferSize, - handOffTimeout, - shardStartTimeout, - shardFailureBackoff, - entityRestartBackoff, - rebalanceInterval, - snapshotAfter, - keepNrOfBatches, - leastShardAllocationRebalanceThreshold, - leastShardAllocationMaxSimultaneousRebalance, - waitingForStateTimeout, - updatingStateTimeout, - entityRecoveryStrategy, - entityRecoveryConstantRateStrategyFrequency, - entityRecoveryConstantRateStrategyNumberOfEntities, - coordinatorStateWriteMajorityPlus = 5, - coordinatorStateReadMajorityPlus = 5) - - // included for binary compatibility - @deprecated("Use the ClusterShardingSettings factory methods or the full constructor instead", since = "Akka 2.6.5") - def this( - coordinatorFailureBackoff: FiniteDuration, - retryInterval: FiniteDuration, - bufferSize: Int, - handOffTimeout: FiniteDuration, - shardStartTimeout: FiniteDuration, - shardFailureBackoff: FiniteDuration, - entityRestartBackoff: FiniteDuration, - rebalanceInterval: FiniteDuration, - snapshotAfter: Int, - leastShardAllocationRebalanceThreshold: Int, - leastShardAllocationMaxSimultaneousRebalance: Int, - waitingForStateTimeout: FiniteDuration, - updatingStateTimeout: FiniteDuration, - entityRecoveryStrategy: String, - entityRecoveryConstantRateStrategyFrequency: FiniteDuration, - entityRecoveryConstantRateStrategyNumberOfEntities: Int) = { - this( - coordinatorFailureBackoff, - retryInterval, - bufferSize, - handOffTimeout, - shardStartTimeout, - shardFailureBackoff, - entityRestartBackoff, - rebalanceInterval, - snapshotAfter, - 2, - leastShardAllocationRebalanceThreshold, - leastShardAllocationMaxSimultaneousRebalance, - waitingForStateTimeout, - updatingStateTimeout, - entityRecoveryStrategy, - entityRecoveryConstantRateStrategyFrequency, - entityRecoveryConstantRateStrategyNumberOfEntities) - } - - // included for binary compatibility - @deprecated("Use the ClusterShardingSettings factory methods or the full constructor instead", since = "Akka 2.6.5") - def this( - coordinatorFailureBackoff: FiniteDuration, - retryInterval: FiniteDuration, - bufferSize: Int, - handOffTimeout: FiniteDuration, - shardStartTimeout: FiniteDuration, - shardFailureBackoff: FiniteDuration, - entityRestartBackoff: FiniteDuration, - rebalanceInterval: FiniteDuration, - snapshotAfter: Int, - leastShardAllocationRebalanceThreshold: Int, - leastShardAllocationMaxSimultaneousRebalance: Int, - waitingForStateTimeout: FiniteDuration, - updatingStateTimeout: FiniteDuration) = { - this( - coordinatorFailureBackoff, - retryInterval, - bufferSize, - handOffTimeout, - shardStartTimeout, - shardFailureBackoff, - entityRestartBackoff, - rebalanceInterval, - snapshotAfter, - leastShardAllocationRebalanceThreshold, - leastShardAllocationMaxSimultaneousRebalance, - waitingForStateTimeout, - updatingStateTimeout, - "all", - 100.milliseconds, - 5) - } } } @@ -1137,163 +971,6 @@ final class ClusterShardingSettings( val coordinatorSingletonSettings: ClusterSingletonManagerSettings, val leaseSettings: Option[LeaseUsageSettings]) extends NoSerializationVerificationNeeded { - @deprecated( - "Use the ClusterShardingSettings factory methods or the constructor including coordinatorSingletonOverrideRole instead", - "Akka 2.6.20") - def this( - role: Option[String], - rememberEntities: Boolean, - journalPluginId: String, - snapshotPluginId: String, - stateStoreMode: String, - rememberEntitiesStore: String, - passivationStrategySettings: ClusterShardingSettings.PassivationStrategySettings, - shardRegionQueryTimeout: FiniteDuration, - tuningParameters: ClusterShardingSettings.TuningParameters, - coordinatorSingletonSettings: ClusterSingletonManagerSettings, - leaseSettings: Option[LeaseUsageSettings]) = - this( - role, - rememberEntities, - journalPluginId, - snapshotPluginId, - stateStoreMode, - rememberEntitiesStore, - passivationStrategySettings, - shardRegionQueryTimeout, - tuningParameters, - true, - coordinatorSingletonSettings, - leaseSettings) - - @deprecated( - "Use the ClusterShardingSettings factory methods or the constructor including passivationStrategySettings instead", - "Akka 2.6.18") - def this( - role: Option[String], - rememberEntities: Boolean, - journalPluginId: String, - snapshotPluginId: String, - stateStoreMode: String, - rememberEntitiesStore: String, - passivateIdleEntityAfter: FiniteDuration, - shardRegionQueryTimeout: FiniteDuration, - tuningParameters: ClusterShardingSettings.TuningParameters, - coordinatorSingletonSettings: ClusterSingletonManagerSettings, - leaseSettings: Option[LeaseUsageSettings]) = - this( - role, - rememberEntities, - journalPluginId, - snapshotPluginId, - stateStoreMode, - rememberEntitiesStore, - ClusterShardingSettings.PassivationStrategySettings.oldDefault(passivateIdleEntityAfter), - shardRegionQueryTimeout, - tuningParameters, - true, - coordinatorSingletonSettings, - leaseSettings) - - @deprecated( - "Use the ClusterShardingSettings factory methods or the constructor including rememberedEntitiesStore instead", - "Akka 2.6.7") - def this( - role: Option[String], - rememberEntities: Boolean, - journalPluginId: String, - snapshotPluginId: String, - stateStoreMode: String, - passivateIdleEntityAfter: FiniteDuration, - shardRegionQueryTimeout: FiniteDuration, - tuningParameters: ClusterShardingSettings.TuningParameters, - coordinatorSingletonSettings: ClusterSingletonManagerSettings, - leaseSettings: Option[LeaseUsageSettings]) = - this( - role, - rememberEntities, - journalPluginId, - snapshotPluginId, - stateStoreMode, - "ddata", - passivateIdleEntityAfter, - shardRegionQueryTimeout, - tuningParameters, - coordinatorSingletonSettings, - leaseSettings) - - // bin compat for Akka 2.5.23 - @deprecated( - "Use the ClusterShardingSettings factory methods or the constructor including shardRegionQueryTimeout instead", - since = "Akka 2.6.0") - def this( - role: Option[String], - rememberEntities: Boolean, - journalPluginId: String, - snapshotPluginId: String, - stateStoreMode: String, - passivateIdleEntityAfter: FiniteDuration, - tuningParameters: ClusterShardingSettings.TuningParameters, - coordinatorSingletonSettings: ClusterSingletonManagerSettings, - leaseSettings: Option[LeaseUsageSettings]) = - this( - role, - rememberEntities, - journalPluginId, - snapshotPluginId, - stateStoreMode, - passivateIdleEntityAfter, - 3.seconds, - tuningParameters, - coordinatorSingletonSettings, - leaseSettings) - - // bin compat for Akka 2.5.21 - @deprecated( - "Use the ClusterShardingSettings factory methods or the constructor including shardRegionQueryTimeout instead", - since = "Akka 2.5.21") - def this( - role: Option[String], - rememberEntities: Boolean, - journalPluginId: String, - snapshotPluginId: String, - stateStoreMode: String, - passivateIdleEntityAfter: FiniteDuration, - tuningParameters: ClusterShardingSettings.TuningParameters, - coordinatorSingletonSettings: ClusterSingletonManagerSettings) = - this( - role, - rememberEntities, - journalPluginId, - snapshotPluginId, - stateStoreMode, - passivateIdleEntityAfter, - 3.seconds, - tuningParameters, - coordinatorSingletonSettings, - None) - - // included for binary compatibility reasons - @deprecated( - "Use the ClusterShardingSettings factory methods or the constructor including passivateIdleEntityAfter instead", - since = "Akka 2.5.18") - def this( - role: Option[String], - rememberEntities: Boolean, - journalPluginId: String, - snapshotPluginId: String, - stateStoreMode: String, - tuningParameters: ClusterShardingSettings.TuningParameters, - coordinatorSingletonSettings: ClusterSingletonManagerSettings) = - this( - role, - rememberEntities, - journalPluginId, - snapshotPluginId, - stateStoreMode, - Duration.Zero, - tuningParameters, - coordinatorSingletonSettings) import ClusterShardingSettings.{ RememberEntitiesStoreCustom, StateStoreModeDData, StateStoreModePersistence } require( diff --git a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala index 75ad99be19..c71309f555 100644 --- a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala +++ b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala @@ -1305,8 +1305,7 @@ abstract class ShardCoordinator( * * @see [[ClusterSharding$ ClusterSharding extension]] */ -@deprecated("Use `ddata` mode, persistence mode is deprecated.", "Akka 2.6.0") -class PersistentShardCoordinator( +private class PersistentShardCoordinator( override val typeName: String, settings: ClusterShardingSettings, allocationStrategy: ShardCoordinator.ShardAllocationStrategy) diff --git a/docs/src/main/paradox/stream/operators/FileIO/fromFile.md b/docs/src/main/paradox/stream/operators/FileIO/fromFile.md deleted file mode 100644 index 06c5a115cd..0000000000 --- a/docs/src/main/paradox/stream/operators/FileIO/fromFile.md +++ /dev/null @@ -1,22 +0,0 @@ -# FileIO.fromFile - -Emits the contents of a file. - -@ref[File IO Sinks and Sources](../index.md#file-io-sinks-and-sources) - -@@@ warning - -The `fromFile` operator has been deprecated, use @ref:[fromPath](./fromPath.md) instead. - -@@@ - -## Signature - -@apidoc[FileIO.fromFile](FileIO$) { scala="#fromFile(f:java.io.File,chunkSize:Int):org.apache.pekko.stream.scaladsl.Source[org.apache.pekko.util.ByteString,scala.concurrent.Future[org.apache.pekko.stream.IOResult]]" java="#fromFile(java.io.File)" java="#fromFile(java.io.File,int)" } - - -## Description - -Emits the contents of a file, as `ByteString`s, materializes into a @scala[`Future`] @java[`CompletionStage`] which will be completed with -a `IOResult` upon reaching the end of the file or if there is a failure. - diff --git a/docs/src/main/paradox/stream/operators/FileIO/toFile.md b/docs/src/main/paradox/stream/operators/FileIO/toFile.md deleted file mode 100644 index c75ed1f7d7..0000000000 --- a/docs/src/main/paradox/stream/operators/FileIO/toFile.md +++ /dev/null @@ -1,21 +0,0 @@ -# FileIO.toFile - -Create a sink which will write incoming `ByteString` s to a given file. - -@ref[File IO Sinks and Sources](../index.md#file-io-sinks-and-sources) - -@@@ warning - -The `toFile` operator has been deprecated, use @ref:[toPath](./toPath.md) instead. - -@@@ - -## Signature - -@apidoc[FileIO.toFile](FileIO$) { scala="#toFile(f:java.io.File,options:Set[java.nio.file.OpenOption]):org.apache.pekko.stream.scaladsl.Sink[org.apache.pekko.util.ByteString,scala.concurrent.Future[org.apache.pekko.stream.IOResult]]" java="#toFile(java.io.File,java.util.Set)" } - - -## Description - -Creates a Sink which writes incoming `ByteString` elements to the given file path. Overwrites existing files by truncating their contents as default. -Materializes a @scala[`Future`] @java[`CompletionStage`] of `IOResult` that will be completed with the size of the file (in bytes) at the streams completion, and a possible exception if IO operation was not completed successfully. diff --git a/docs/src/main/paradox/stream/operators/Sink/foreachParallel.md b/docs/src/main/paradox/stream/operators/Sink/foreachParallel.md deleted file mode 100644 index 8514e770f3..0000000000 --- a/docs/src/main/paradox/stream/operators/Sink/foreachParallel.md +++ /dev/null @@ -1,16 +0,0 @@ -# Sink.foreachParallel - -Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel. - -@ref[Sink operators](../index.md#sink-operators) - -## Reactive Streams semantics - -@@@div { .callout } - -**cancels** never - -**backpressures** when the previous parallel procedure invocations has not yet completed - -@@@ - diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index 8d0dfabcba..ad61511663 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -68,7 +68,6 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl |Sink|@ref[forall](Sink/forall.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.| |Sink|@ref[foreach](Sink/foreach.md)|Invoke a given procedure for each element received.| |Sink|@ref[foreachAsync](Sink/foreachAsync.md)|Invoke a given procedure asynchronously for each element received.| -|Sink|@ref[foreachParallel](Sink/foreachParallel.md)|Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.| |Sink|@ref[fromMaterializer](Sink/fromMaterializer.md)|Defer the creation of a `Sink` until materialization and access `Materializer` and `Attributes`| |Sink|@ref[fromSubscriber](Sink/fromSubscriber.md)|Integration with Reactive Streams, wraps a `org.reactivestreams.Subscriber` as a sink.| |Sink|@ref[futureSink](Sink/futureSink.md)|Streams the elements to the given future sink once it successfully completes. | @@ -133,9 +132,7 @@ Sources and sinks for reading and writing files can be found on `FileIO`. | |Operator|Description| |--|--|--| -|FileIO|@ref[fromFile](FileIO/fromFile.md)|Emits the contents of a file.| |FileIO|@ref[fromPath](FileIO/fromPath.md)|Emits the contents of a file from the given path.| -|FileIO|@ref[toFile](FileIO/toFile.md)|Create a sink which will write incoming `ByteString` s to a given file.| |FileIO|@ref[toPath](FileIO/toPath.md)|Create a sink which will write incoming `ByteString` s to a given file path.| ## Simple operators @@ -475,11 +472,9 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [forall](Sink/forall.md) * [foreach](Sink/foreach.md) * [foreachAsync](Sink/foreachAsync.md) -* [foreachParallel](Sink/foreachParallel.md) * [from](Source/from.md) * [fromArray](Source/fromArray.md) * [fromCompletionStage](Source/fromCompletionStage.md) -* [fromFile](FileIO/fromFile.md) * [fromFuture](Source/fromFuture.md) * [fromFutureSource](Source/fromFutureSource.md) * [fromInputStream](StreamConverters/fromInputStream.md) @@ -607,7 +602,6 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [takeWithin](Source-or-Flow/takeWithin.md) * [throttle](Source-or-Flow/throttle.md) * [tick](Source/tick.md) -* [toFile](FileIO/toFile.md) * [toPath](FileIO/toPath.md) * [unfold](Source/unfold.md) * [unfoldAsync](Source/unfoldAsync.md) diff --git a/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala b/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala index 0cdbafea71..46bf05c5ac 100644 --- a/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -54,9 +54,6 @@ private[pekko] object NoMaterializer extends Materializer { def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable = throw new UnsupportedOperationException("NoMaterializer cannot schedule a single event") - def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable = - throw new UnsupportedOperationException("NoMaterializer cannot schedule a repeated event") - override def scheduleWithFixedDelay( initialDelay: FiniteDuration, delay: FiniteDuration, diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala index 0d3de6261f..65f8157750 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala @@ -170,6 +170,7 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with Matchers { Ignore(_ == pekko.stream.scaladsl.Source.getClass, _ == "actorRefWithAck", _ => true, _ => true), // Internal in scaladsl Ignore(_ == pekko.stream.scaladsl.Source.getClass, _ == "actorRefWithBackpressure", _ => true, _ => true), // Internal in scaladsl Ignore(_ == pekko.stream.scaladsl.BidiFlow.getClass, _ == "apply", _ == 24, _ => true), + Ignore(_ == pekko.stream.scaladsl.BidiFlow.getClass, _ == "bidirectionalIdleTimeout", _ => true, _ => true), Ignore(_ == pekko.stream.scaladsl.GraphDSL.getClass, _ == "runnable", _ == 24, _ => true), Ignore(_ == pekko.stream.scaladsl.GraphDSL.getClass, _ == "create", _ == 24, _ => true), // all generated methods like scaladsl.Sink$.akka$stream$scaladsl$Sink$$newOnCompleteStage$1 diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkForeachParallelSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkForeachParallelSpec.scala deleted file mode 100644 index 4f1310b3ff..0000000000 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkForeachParallelSpec.scala +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2015-2022 Lightbend Inc. - */ - -package org.apache.pekko.stream.scaladsl - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit - -import scala.annotation.nowarn -import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.util.control.NoStackTrace - -import org.apache.pekko -import pekko.stream.ActorAttributes._ -import pekko.stream.Supervision._ -import pekko.stream.testkit.StreamSpec -import pekko.testkit.TestLatch -import pekko.testkit.TestProbe - -@nowarn // tests deprecated APIs -class SinkForeachParallelSpec extends StreamSpec { - - "A ForeachParallel" must { - "produce elements in the order they are ready" in { - import system.dispatcher - - val probe = TestProbe() - val latch = (1 to 4).map(_ -> TestLatch(1)).toMap - val p = Source(1 to 4).runWith(Sink.foreachParallel(4)((n: Int) => { - Await.ready(latch(n), 5.seconds) - probe.ref ! n - })) - latch(2).countDown() - probe.expectMsg(2) - latch(4).countDown() - probe.expectMsg(4) - latch(3).countDown() - probe.expectMsg(3) - - assert(!p.isCompleted) - - latch(1).countDown() - probe.expectMsg(1) - - Await.result(p, 4.seconds) - assert(p.isCompleted) - } - - "not run more functions in parallel then specified" in { - import system.dispatcher - - val probe = TestProbe() - val latch = (1 to 5).map(_ -> TestLatch()).toMap - - val p = Source(1 to 5).runWith(Sink.foreachParallel(4)((n: Int) => { - probe.ref ! n - Await.ready(latch(n), 5.seconds) - })) - probe.expectMsgAllOf(1, 2, 3, 4) - probe.expectNoMessage(200.millis) - - assert(!p.isCompleted) - - for (i <- 1 to 4) latch(i).countDown() - - latch(5).countDown() - probe.expectMsg(5) - - Await.result(p, 5.seconds) - assert(p.isCompleted) - - } - - "resume after function failure" in { - import system.dispatcher - - val probe = TestProbe() - val latch = TestLatch(1) - - val p = Source(1 to 5).runWith( - Sink - .foreachParallel(4)((n: Int) => { - if (n == 3) throw new RuntimeException("err1") with NoStackTrace - else { - probe.ref ! n - Await.ready(latch, 10.seconds) - } - }) - .withAttributes(supervisionStrategy(resumingDecider))) - - latch.countDown() - probe.expectMsgAllOf(1, 2, 4, 5) - - Await.result(p, 5.seconds) - } - - "finish after function thrown exception" in { - import system.dispatcher - - val probe = TestProbe() - val element4Latch = new CountDownLatch(1) - val errorLatch = new CountDownLatch(2) - - val p = Source - .fromIterator(() => Iterator.from(1)) - .runWith( - Sink - .foreachParallel(3)((n: Int) => { - if (n == 3) { - // Error will happen only after elements 1, 2 has been processed - errorLatch.await(5, TimeUnit.SECONDS) - throw new RuntimeException("err2") with NoStackTrace - } else { - probe.ref ! n - errorLatch.countDown() - element4Latch.await(5, TimeUnit.SECONDS) // Block element 4, 5, 6, ... from entering - } - }) - .withAttributes(supervisionStrategy(stoppingDecider))) - - // Only the first two messages are guaranteed to arrive due to their enforced ordering related to the time - // of failure. - probe.expectMsgAllOf(1, 2) - element4Latch.countDown() // Release elements 4, 5, 6, ... - - a[RuntimeException] must be thrownBy Await.result(p, 3.seconds) - } - - "handle empty source" in { - import system.dispatcher - - val p = Source(List.empty[Int]).runWith(Sink.foreachParallel(3)(a => ())) - - Await.result(p, 200.seconds) - } - - } - -} diff --git a/stream-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/stream-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes new file mode 100644 index 0000000000..87ece02dd0 --- /dev/null +++ b/stream-typed/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Remove deprecated methods +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.typed.javadsl.ActorSource.actorRefWithAck") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.typed.scaladsl.ActorSource.actorRefWithAck") diff --git a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala index 0bb3667931..5e31b916e6 100644 --- a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala +++ b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala @@ -118,46 +118,4 @@ object ActorSource { } }) .asJava - - /** - * Creates a `Source` that is materialized as an [[pekko.actor.ActorRef]]. - * Messages sent to this actor will be emitted to the stream if there is demand from downstream, - * and a new message will only be accepted after the previous messages has been consumed and acknowledged back. - * The stream will complete with failure if a message is sent before the acknowledgement has been replied back. - * - * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted - * [[java.lang.Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received - * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`, - * the failure will be signaled downstream immediately (instead of the completion signal). - * - * The actor will be stopped when the stream is completed, failed or canceled from downstream, - * i.e. you can watch it to get notified when that happens. - * - * @deprecated Use actorRefWithBackpressure instead - */ - @deprecated("Use actorRefWithBackpressure instead", "Akka 2.6.0") - def actorRefWithAck[T, Ack]( - ackTo: ActorRef[Ack], - ackMessage: Ack, - completionMatcher: pekko.japi.function.Function[T, java.util.Optional[CompletionStrategy]], - failureMatcher: pekko.japi.function.Function[T, java.util.Optional[Throwable]]): Source[T, ActorRef[T]] = - pekko.stream.typed.scaladsl.ActorSource - .actorRefWithBackpressure[T, Ack]( - ackTo, - ackMessage, - new JavaPartialFunction[T, CompletionStrategy] { - override def apply(x: T, isCheck: Boolean): CompletionStrategy = { - val result = completionMatcher(x) - if (!result.isPresent) throw JavaPartialFunction.noMatch() - else result.get() - } - }, - new JavaPartialFunction[T, Throwable] { - override def apply(x: T, isCheck: Boolean): Throwable = { - val result = failureMatcher(x) - if (!result.isPresent) throw JavaPartialFunction.noMatch() - else result.get() - } - }) - .asJava } diff --git a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorSource.scala b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorSource.scala index b86083b7b0..388c0f396a 100644 --- a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorSource.scala +++ b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorSource.scala @@ -105,32 +105,4 @@ object ActorSource { completionMatcher.asInstanceOf[PartialFunction[Any, CompletionStrategy]], failureMatcher.asInstanceOf[PartialFunction[Any, Throwable]]) .mapMaterializedValue(actorRefAdapter) - - /** - * Creates a `Source` that is materialized as an [[pekko.actor.typed.ActorRef]]. - * Messages sent to this actor will be emitted to the stream if there is demand from downstream, - * and a new message will only be accepted after the previous messages has been consumed and acknowledged back. - * The stream will complete with failure if a message is sent before the acknowledgement has been replied back. - * - * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted - * [[java.lang.Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received - * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`, - * the failure will be signaled downstream immediately (instead of the completion signal). - * - * The actor will be stopped when the stream is completed, failed or canceled from downstream, - * i.e. you can watch it to get notified when that happens. - */ - @deprecated("Use actorRefWithBackpressure instead", "Akka 2.6.0") - def actorRefWithAck[T, Ack]( - ackTo: ActorRef[Ack], - ackMessage: Ack, - completionMatcher: PartialFunction[T, CompletionStrategy], - failureMatcher: PartialFunction[T, Throwable]): Source[T, ActorRef[T]] = - Source - .actorRefWithAck[T]( - Some(ackTo.toClassic), - ackMessage, - completionMatcher.asInstanceOf[PartialFunction[Any, CompletionStrategy]], - failureMatcher.asInstanceOf[PartialFunction[Any, Throwable]]) - .mapMaterializedValue(actorRefAdapter) } diff --git a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes index feb063120b..0a1be3f84e 100644 --- a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes +++ b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes @@ -17,6 +17,7 @@ # Remove deprecated methods ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.FanInShape1N") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.Materializer.schedulePeriodically") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.UniformFanInShape.inSeq") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.UniformFanOutShape.outArray") ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ConstantFun*") @@ -25,13 +26,21 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.impl ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.io.ByteStringParser$ParsingException") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.impl.io.ByteStringParser#ParsingLogic.onDownstreamFinish") ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.javadsl.CoupledTerminationFlow*") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.BidiFlow.bidirectionalIdleTimeout") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.FileIO.fromFile") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.FileIO.toFile") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartFlow.onFailuresWithBackoff") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartFlow.withBackoff") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartSink.withBackoff") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartSource.onFailuresWithBackoff") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.RestartSource.withBackoff") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.Sink.actorRefWithAck") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.Sink.foreachParallel") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.Source.actorRefWithAck") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Balance.this") ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.scaladsl.CoupledTerminationFlow*") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FileIO.fromFile*") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FileIO.toFile*") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.GraphApply.create") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.GraphDSL.create") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.MergeHub#MergedSourceLogic.onDownstreamFinish") @@ -41,6 +50,9 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scal ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.RestartSink.withBackoff") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.RestartSource.onFailuresWithBackoff") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.RestartSource.withBackoff") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Sink.foreachParallel") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Sink.actorRefWithAck*") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Source.actorRefWithAck") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.ZipWithN.inSeq") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.AbstractInOutHandler.onDownstreamFinish") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.AbstractOutHandler.onDownstreamFinish") diff --git a/stream/src/main/scala/org/apache/pekko/stream/Materializer.scala b/stream/src/main/scala/org/apache/pekko/stream/Materializer.scala index 35c04210ff..dd5c56b92b 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/Materializer.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/Materializer.scala @@ -143,19 +143,6 @@ abstract class Materializer { */ def scheduleAtFixedRate(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable - /** - * Interface for operators that need timer services for their functionality. Schedules a - * repeated task with the given interval between invocations. - * - * @return A [[pekko.actor.Cancellable]] that allows cancelling the timer. Cancelling is best effort, if the event - * has been already enqueued it will not have an effect. - */ - @deprecated( - "Use scheduleWithFixedDelay or scheduleAtFixedRate instead. This has the same semantics as " + - "scheduleAtFixedRate, but scheduleWithFixedDelay is often preferred.", - since = "Akka 2.6.0") - def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable - /** * Shuts down this materializer and all the operators that have been materialized through this materializer. After * having shut down, this materializer cannot be used again. Any attempt to materialize operators after having diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala index 214f333345..84e66b6744 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala @@ -150,12 +150,6 @@ private[pekko] class SubFusingActorMaterializerImpl( task: Runnable): Cancellable = delegate.scheduleAtFixedRate(initialDelay, interval, task) - override def schedulePeriodically( - initialDelay: FiniteDuration, - interval: FiniteDuration, - task: Runnable): Cancellable = - scheduleAtFixedRate(initialDelay, interval, task) - override def withNamePrefix(name: String): SubFusingActorMaterializerImpl = new SubFusingActorMaterializerImpl(delegate.withNamePrefix(name), registerShell) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala index fbff399df9..fc5b35bd1c 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala @@ -442,12 +442,6 @@ private final case class SavedIslandData( task: Runnable): Cancellable = system.scheduler.scheduleAtFixedRate(initialDelay, interval)(task)(executionContext) - override def schedulePeriodically( - initialDelay: FiniteDuration, - interval: FiniteDuration, - task: Runnable): Cancellable = - system.scheduler.scheduleAtFixedRate(initialDelay, interval)(task)(executionContext) - override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable = system.scheduler.scheduleOnce(delay, task)(executionContext) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala index 98dd59fcbb..f7bece9f45 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala @@ -13,9 +13,6 @@ package org.apache.pekko.stream.javadsl -import scala.annotation.nowarn -import scala.concurrent.duration.FiniteDuration - import org.apache.pekko import pekko.NotUsed import pekko.japi.function @@ -103,23 +100,9 @@ object BidiFlow { * every second in one direction, but no elements are flowing in the other direction. I.e. this operator considers * the *joint* frequencies of the elements in both directions. */ - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12") - def bidirectionalIdleTimeout[I, O](timeout: FiniteDuration): BidiFlow[I, I, O, O, NotUsed] = - new BidiFlow(scaladsl.BidiFlow.bidirectionalIdleTimeout(timeout)) - - /** - * If the time between two processed elements *in any direction* exceed the provided timeout, the stream is failed - * with a [[org.apache.pekko.stream.BackpressureTimeoutException]]. - * - * There is a difference between this operator and having two idleTimeout Flows assembled into a BidiStage. - * If the timeout is configured to be 1 seconds, then this operator will not fail even though there are elements flowing - * every second in one direction, but no elements are flowing in the other direction. I.e. this operator considers - * the *joint* frequencies of the elements in both directions. - */ - @nowarn("msg=deprecated") def bidirectionalIdleTimeout[I, O](timeout: java.time.Duration): BidiFlow[I, I, O, O, NotUsed] = { import pekko.util.JavaDurationConverters._ - bidirectionalIdleTimeout(timeout.asScala) + new BidiFlow(scaladsl.BidiFlow.bidirectionalIdleTimeout(timeout.asScala)) } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FileIO.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FileIO.scala index ebab976d0e..74c941eb79 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FileIO.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FileIO.scala @@ -13,7 +13,6 @@ package org.apache.pekko.stream.javadsl -import java.io.File import java.nio.file.{ OpenOption, Path } import java.util import java.util.concurrent.CompletionStage @@ -30,22 +29,6 @@ import pekko.util.ccompat.JavaConverters._ */ object FileIO { - /** - * Creates a Sink that writes incoming [[pekko.util.ByteString]] elements to the given file. - * Overwrites existing files by truncating their contents, if you want to append to an existing file use - * [[toFile(File, util.Set[OpenOption])]] with [[java.nio.file.StandardOpenOption.APPEND]]. - * - * Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, - * and a possible exception if IO operation was not completed successfully. - * - * You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or - * set it for a given Source by using [[pekko.stream.ActorAttributes]]. - * - * @param f The file to write to - */ - @deprecated("Use `toPath` instead.", "Akka 2.4.5") - def toFile(f: File): javadsl.Sink[ByteString, CompletionStage[IOResult]] = toPath(f.toPath) - /** * Creates a Sink that writes incoming [[pekko.util.ByteString]] elements to the given file path. * Overwrites existing files by truncating their contents, if you want to append to an existing file @@ -68,22 +51,6 @@ object FileIO { def toPath(f: Path): javadsl.Sink[ByteString, CompletionStage[IOResult]] = new Sink(scaladsl.FileIO.toPath(f).toCompletionStage()) - /** - * Creates a Sink that writes incoming [[pekko.util.ByteString]] elements to the given file. - * - * Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, - * and a possible exception if IO operation was not completed successfully. - * - * You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or - * set it for a given Source by using [[pekko.stream.ActorAttributes]]. - * - * @param f The file to write to - * @param options File open options, see [[java.nio.file.StandardOpenOption]] - */ - @deprecated("Use `toPath` instead.", "Akka 2.4.5") - def toFile[Opt <: OpenOption](f: File, options: util.Set[Opt]): javadsl.Sink[ByteString, CompletionStage[IOResult]] = - toPath(f.toPath) - /** * Creates a Sink that writes incoming [[pekko.util.ByteString]] elements to the given file path. * @@ -130,23 +97,6 @@ object FileIO { startPosition: Long): javadsl.Sink[ByteString, CompletionStage[IOResult]] = new Sink(scaladsl.FileIO.toPath(f, options.asScala.toSet, startPosition).toCompletionStage()) - /** - * Creates a Source from a files contents. - * Emitted elements are [[pekko.util.ByteString]] elements, chunked by default by 8192 bytes, - * except the last element, which will be up to 8192 in size. - * - * You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or - * set it for a given Source by using [[pekko.stream.ActorAttributes]]. - * - * It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, - * and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does - * not give any guarantee that the bytes were seen by downstream stages. - * - * @param f the file to read from - */ - @deprecated("Use `fromPath` instead.", "Akka 2.4.5") - def fromFile(f: File): javadsl.Source[ByteString, CompletionStage[IOResult]] = fromPath(f.toPath) - /** * Creates a Source from a files contents. * Emitted elements are [[pekko.util.ByteString]] elements, chunked by default by 8192 bytes, @@ -163,25 +113,6 @@ object FileIO { */ def fromPath(f: Path): javadsl.Source[ByteString, CompletionStage[IOResult]] = fromPath(f, 8192) - /** - * Creates a synchronous Source from a files contents. - * Emitted elements are `chunkSize` sized [[pekko.util.ByteString]] elements, - * except the last element, which will be up to `chunkSize` in size. - * - * You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or - * set it for a given Source by using [[pekko.stream.ActorAttributes]]. - * - * It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, - * and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does - * not give any guarantee that the bytes were seen by downstream stages. - * - * @param f the file to read from - * @param chunkSize the size of each read operation - */ - @deprecated("Use `fromPath` instead.", "Akka 2.4.5") - def fromFile(f: File, chunkSize: Int): javadsl.Source[ByteString, CompletionStage[IOResult]] = - fromPath(f.toPath, chunkSize) - /** * Creates a synchronous Source from a files contents. * Emitted elements are `chunkSize` sized [[pekko.util.ByteString]] elements, diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index 6aaa8fc252..1f550c2a0a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -20,7 +20,6 @@ import java.util.stream.Collector import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable -import scala.concurrent.ExecutionContext import scala.util.Try import org.apache.pekko @@ -235,24 +234,6 @@ object Sink { .foreachAsync(parallelism)((x: T) => f(x).asScala.map(_ => ())(ExecutionContexts.parasitic)) .toCompletionStage()) - /** - * A `Sink` that will invoke the given procedure for each received element in parallel. The sink is materialized - * into a [[java.util.concurrent.CompletionStage]]. - * - * If `f` throws an exception and the supervision decision is - * [[pekko.stream.Supervision.Stop]] the `CompletionStage` will be completed with failure. - * - * If `f` throws an exception and the supervision decision is - * [[pekko.stream.Supervision.Resume]] or [[pekko.stream.Supervision.Restart]] the - * element is dropped and the stream continues. - */ - @deprecated( - "Use `foreachAsync` instead, it allows you to choose how to run the procedure, by calling some other API returning a CompletionStage or using CompletableFuture.supplyAsync.", - since = "Akka 2.5.17") - def foreachParallel[T](parallel: Int, f: function.Procedure[T], - ec: ExecutionContext): Sink[T, CompletionStage[Done]] = - new Sink(scaladsl.Sink.foreachParallel(parallel)(f.apply)(ec).toCompletionStage()) - /** * A `Sink` that when the flow is completed, either through a failure or normal * completion, apply the provided function with [[scala.util.Success]] @@ -394,32 +375,6 @@ object Sink { new Sink( scaladsl.Sink.actorRefWithBackpressure[In](ref, onInitMessage, onCompleteMessage, t => onFailureMessage(t))) - /** - * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. - * First element is always `onInitMessage`, then stream is waiting for acknowledgement message - * `ackMessage` from the given actor which means that it is ready to process - * elements. It also requires `ackMessage` message after each stream element - * to make backpressure work. - * - * If the target actor terminates the stream will be canceled. - * When the stream is completed successfully the given `onCompleteMessage` - * will be sent to the destination actor. - * When the stream is completed with failure - result of `onFailureMessage(throwable)` - * message will be sent to the destination actor. - * - * @deprecated Use actorRefWithBackpressure instead - */ - @deprecated("Use actorRefWithBackpressure instead", "Akka 2.6.0") - def actorRefWithAck[In]( - ref: ActorRef, - onInitMessage: Any, - ackMessage: Any, - onCompleteMessage: Any, - onFailureMessage: function.Function[Throwable, Any]): Sink[In, NotUsed] = - new Sink( - scaladsl.Sink - .actorRefWithBackpressure[In](ref, onInitMessage, ackMessage, onCompleteMessage, t => onFailureMessage(t))) - /** * A graph with the shape of a sink logically is a sink, this method makes * it so also in type. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 33674833da..d8cdef99f0 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -614,71 +614,6 @@ object Source { } })) - /** - * Creates a `Source` that is materialized as an [[pekko.actor.ActorRef]]. - * Messages sent to this actor will be emitted to the stream if there is demand from downstream, - * and a new message will only be accepted after the previous messages has been consumed and acknowledged back. - * The stream will complete with failure if a message is sent before the acknowledgement has been replied back. - * - * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted - * [[java.lang.Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received - * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`, - * the failure will be signaled downstream immediately (instead of the completion signal). - * - * The actor will be stopped when the stream is completed, failed or canceled from downstream, - * i.e. you can watch it to get notified when that happens. - * - * @deprecated Use actorRefWithBackpressure instead - */ - @deprecated("Use actorRefWithBackpressure instead", "Akka 2.6.0") - def actorRefWithAck[T]( - ackMessage: Any, - completionMatcher: pekko.japi.function.Function[Any, java.util.Optional[CompletionStrategy]], - failureMatcher: pekko.japi.function.Function[Any, java.util.Optional[Throwable]]): Source[T, ActorRef] = - new Source(scaladsl.Source.actorRefWithBackpressure(ackMessage, - new JavaPartialFunction[Any, CompletionStrategy] { - override def apply(x: Any, isCheck: Boolean): CompletionStrategy = { - val result = completionMatcher(x) - if (!result.isPresent) throw JavaPartialFunction.noMatch() - else result.get() - } - }, - new JavaPartialFunction[Any, Throwable] { - override def apply(x: Any, isCheck: Boolean): Throwable = { - val result = failureMatcher(x) - if (!result.isPresent) throw JavaPartialFunction.noMatch() - else result.get() - } - })) - - /** - * Creates a `Source` that is materialized as an [[pekko.actor.ActorRef]]. - * Messages sent to this actor will be emitted to the stream if there is demand from downstream, - * and a new message will only be accepted after the previous messages has been consumed and acknowledged back. - * The stream will complete with failure if a message is sent before the acknowledgement has been replied back. - * - * The stream can be completed successfully by sending the actor reference a [[pekko.actor.Status.Success]]. - * If the content is [[pekko.stream.CompletionStrategy.immediately]] the completion will be signaled immediately, - * otherwise if the content is [[pekko.stream.CompletionStrategy.draining]] (or anything else) - * already buffered element will be signaled before signaling completion. - * - * The stream can be completed with failure by sending a [[pekko.actor.Status.Failure]] to the - * actor reference. In case the Actor is still draining its internal buffer (after having received - * a [[pekko.actor.Status.Success]]) before signaling completion and it receives a [[pekko.actor.Status.Failure]], - * the failure will be signaled downstream immediately (instead of the completion signal). - * - * The actor will be stopped when the stream is completed, failed or canceled from downstream, - * i.e. you can watch it to get notified when that happens. - */ - @deprecated("Use actorRefWithBackpressure accepting completion and failure matchers", "Akka 2.6.0") - def actorRefWithAck[T](ackMessage: Any): Source[T, ActorRef] = - new Source(scaladsl.Source.actorRefWithBackpressure(ackMessage, - { - case pekko.actor.Status.Success(s: CompletionStrategy) => s - case pekko.actor.Status.Success(_) => CompletionStrategy.Draining - case pekko.actor.Status.Success => CompletionStrategy.Draining - }, { case pekko.actor.Status.Failure(cause) => cause })) - /** * A graph with the shape of a source logically is a source, this method makes * it so also in type. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FileIO.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FileIO.scala index 7aa24b65bb..f090acf487 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FileIO.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FileIO.scala @@ -13,7 +13,6 @@ package org.apache.pekko.stream.scaladsl -import java.io.File import java.nio.file.{ OpenOption, Path } import java.nio.file.StandardOpenOption._ @@ -30,25 +29,6 @@ import pekko.util.ByteString */ object FileIO { - /** - * Creates a Source from a files contents. - * Emitted elements are `chunkSize` sized [[pekko.util.ByteString]] elements, - * except the final element, which will be up to `chunkSize` in size. - * - * You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or - * set it for a given Source by using [[pekko.stream.ActorAttributes]]. - * - * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, - * and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does - * not give any guarantee that the bytes were seen by downstream stages. - * - * @param f the file to read from - * @param chunkSize the size of each read operation, defaults to 8192 - */ - @deprecated("Use `fromPath` instead", "Akka 2.4.5") - def fromFile(f: File, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] = - fromPath(f.toPath, chunkSize) - /** * Creates a Source from a files contents. * Emitted elements are `chunkSize` sized [[pekko.util.ByteString]] elements, @@ -86,25 +66,6 @@ object FileIO { def fromPath(f: Path, chunkSize: Int, startPosition: Long): Source[ByteString, Future[IOResult]] = Source.fromGraph(new FileSource(f, chunkSize, startPosition)).withAttributes(DefaultAttributes.fileSource) - /** - * Creates a Sink which writes incoming [[pekko.util.ByteString]] elements to the given file. Overwrites existing files - * by truncating their contents as default. - * - * Materializes a [[Future]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, - * and a possible exception if IO operation was not completed successfully. - * - * This source is backed by an Actor which will use the dedicated `pekko.stream.blocking-io-dispatcher`, - * unless configured otherwise by using [[pekko.stream.ActorAttributes]]. - * - * @param f the file to write to - * @param options File open options, see [[java.nio.file.StandardOpenOption]], defaults to Set(WRITE, TRUNCATE_EXISTING, CREATE) - */ - @deprecated("Use `toPath` instead", "Akka 2.4.5") - def toFile( - f: File, - options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]] = - toPath(f.toPath, options) - /** * Creates a Sink which writes incoming [[pekko.util.ByteString]] elements to the given file path. Overwrites existing files * by truncating their contents as default. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index 9cb9f08296..cfd78106fb 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -16,7 +16,7 @@ package org.apache.pekko.stream.scaladsl import scala.annotation.tailrec import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable -import scala.concurrent.{ ExecutionContext, Future } +import scala.concurrent.Future import scala.util.{ Failure, Success, Try } import org.apache.pekko @@ -387,25 +387,6 @@ object Sink { }) } - /** - * A `Sink` that will invoke the given function to each of the elements - * as they pass in. The sink is materialized into a [[scala.concurrent.Future]] - * - * If `f` throws an exception and the supervision decision is - * [[pekko.stream.Supervision.Stop]] the `Future` will be completed with failure. - * - * If `f` throws an exception and the supervision decision is - * [[pekko.stream.Supervision.Resume]] or [[pekko.stream.Supervision.Restart]] the - * element is dropped and the stream continues. - * - * See also [[Flow.mapAsyncUnordered]] - */ - @deprecated( - "Use `foreachAsync` instead, it allows you to choose how to run the procedure, by calling some other API returning a Future or spawning a new Future.", - since = "Akka 2.5.17") - def foreachParallel[T](parallelism: Int)(f: T => Unit)(implicit ec: ExecutionContext): Sink[T, Future[Done]] = - Flow[T].mapAsyncUnordered(parallelism)(t => Future(f(t))).toMat(Sink.ignore)(Keep.right) - /** * A `Sink` that will invoke the given function for every received element, giving it its previous * output (or the given `zero` value) and the element as input. @@ -697,28 +678,6 @@ object Sink { onFailureMessage: Throwable => Any): Sink[T, NotUsed] = actorRefWithAck(ref, _ => identity, _ => onInitMessage, None, onCompleteMessage, onFailureMessage) - /** - * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. - * First element is always `onInitMessage`, then stream is waiting for acknowledgement message - * `ackMessage` from the given actor which means that it is ready to process - * elements. It also requires `ackMessage` message after each stream element - * to make backpressure work. - * - * If the target actor terminates the stream will be canceled. - * When the stream is completed successfully the given `onCompleteMessage` - * will be sent to the destination actor. - * When the stream is completed with failure - result of `onFailureMessage(throwable)` - * function will be sent to the destination actor. - */ - @deprecated("Use actorRefWithBackpressure accepting completion and failure matchers instead", "Akka 2.6.0") - def actorRefWithAck[T]( - ref: ActorRef, - onInitMessage: Any, - ackMessage: Any, - onCompleteMessage: Any, - onFailureMessage: (Throwable) => Any = Status.Failure.apply): Sink[T, NotUsed] = - actorRefWithAck(ref, _ => identity, _ => onInitMessage, Some(ackMessage), onCompleteMessage, onFailureMessage) - /** * Creates a `Sink` that is materialized as an [[pekko.stream.scaladsl.SinkQueueWithCancel]]. * [[pekko.stream.scaladsl.SinkQueueWithCancel.pull]] method is pulling element from the stream and returns ``Future[Option[T]``. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index e8b185fa39..0bd946cd9e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -799,34 +799,6 @@ object Source { Source.fromGraph(new ActorRefBackpressureSource(None, ackMessage, completionMatcher, failureMatcher)) } - /** - * Creates a `Source` that is materialized as an [[pekko.actor.ActorRef]]. - * Messages sent to this actor will be emitted to the stream if there is demand from downstream, - * and a new message will only be accepted after the previous messages has been consumed and acknowledged back. - * The stream will complete with failure if a message is sent before the acknowledgement has been replied back. - * - * The stream can be completed successfully by sending the actor reference a [[pekko.actor.Status.Success]]. - * If the content is [[pekko.stream.CompletionStrategy.immediately]] the completion will be signaled immediately, - * otherwise if the content is [[pekko.stream.CompletionStrategy.draining]] (or anything else) - * already buffered element will be signaled before signaling completion. - * - * The stream can be completed with failure by sending a [[pekko.actor.Status.Failure]] to the - * actor reference. In case the Actor is still draining its internal buffer (after having received - * a [[pekko.actor.Status.Success]]) before signaling completion and it receives a [[pekko.actor.Status.Failure]], - * the failure will be signaled downstream immediately (instead of the completion signal). - * - * The actor will be stopped when the stream is completed, failed or canceled from downstream, - * i.e. you can watch it to get notified when that happens. - */ - @deprecated("Use actorRefWithBackpressure accepting completion and failure matchers instead", "Akka 2.6.0") - def actorRefWithAck[T](ackMessage: Any): Source[T, ActorRef] = - actorRefWithAck(None, ackMessage, - { - case pekko.actor.Status.Success(s: CompletionStrategy) => s - case pekko.actor.Status.Success(_) => CompletionStrategy.Draining - case pekko.actor.Status.Success => CompletionStrategy.Draining - }, { case pekko.actor.Status.Failure(cause) => cause }) - /** * Combines several sources with fan-in strategy like [[Merge]] or [[Concat]] into a single [[Source]]. */