diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/EffectfulActorContext.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/EffectfulActorContext.scala index c261bcf3b5..09969396a5 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/EffectfulActorContext.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/EffectfulActorContext.scala @@ -5,7 +5,6 @@ package akka.actor.testkit.typed.internal import java.util.concurrent.ConcurrentLinkedQueue -import java.util.function import akka.actor.{ ActorPath, Cancellable } import akka.actor.typed.{ ActorRef, Behavior, Props } @@ -15,7 +14,6 @@ import akka.actor.testkit.typed.Effect._ import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag -import scala.compat.java8.FunctionConverters._ /** * INTERNAL API @@ -47,9 +45,9 @@ import scala.compat.java8.FunctionConverters._ effectQueue.offer(MessageAdapter(implicitly[ClassTag[U]].runtimeClass.asInstanceOf[Class[U]], f)) ref } - override def messageAdapter[U](messageClass: Class[U], f: function.Function[U, T]): ActorRef[U] = { + override def messageAdapter[U](messageClass: Class[U], f: akka.japi.function.Function[U, T]): ActorRef[U] = { val ref = super.messageAdapter(messageClass, f) - effectQueue.offer(MessageAdapter[U, T](messageClass, f.asScala)) + effectQueue.offer(MessageAdapter[U, T](messageClass, f.apply)) ref } override def spawn[U](behavior: Behavior[U], name: String, props: Props = Props.empty): ActorRef[U] = { diff --git a/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/BehaviorTestKitTest.java b/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/BehaviorTestKitTest.java index 578d968895..6ad0cf1ee5 100644 --- a/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/BehaviorTestKitTest.java +++ b/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/BehaviorTestKitTest.java @@ -70,10 +70,10 @@ public class BehaviorTestKitTest extends JUnitSuite { public static class CreateMessageAdapter implements Command { private final Class clazz; - private final Function f; + private final akka.japi.function.Function f; @SuppressWarnings("unchecked") - public CreateMessageAdapter(Class clazz, Function f) { + public CreateMessageAdapter(Class clazz, akka.japi.function.Function f) { this.clazz = clazz; this.f = f; } diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java index e045bb5829..dc8fb0b6a5 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java @@ -170,4 +170,50 @@ public class ActorCompile { .onFailure(IllegalStateException.class, strategy6)) .onFailure(RuntimeException.class, strategy1); } + + // actor context + { + final ActorRef otherActor = null; + Behavior behavior = + Behaviors.setup( + context -> { + context.ask( + String.class, + otherActor, + Duration.ofSeconds(10), + (ActorRef respRef) -> new Object(), + (String res, Throwable failure) -> { + // checked exception should be ok + if (failure != null) throw new Exception(failure); + else return "success"; + }); + + ActorRef adapter = + context.messageAdapter( + Integer.class, + (number) -> { + // checked exception should be ok + if (number < 10) throw new Exception("too small number"); + else return number.toString(); + }); + + return Behaviors.empty(); + }); + } + + // stash buffer + { + Behavior behavior = + Behaviors.withStash( + 5, + stash -> { + stash.forEach( + msg -> { + // checked is ok + throw new Exception("checked"); + }); + + return Behaviors.empty(); + }); + } } diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ReceiveBuilderTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ReceiveBuilderTest.java index 21dc6385a4..50c67bc5a5 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ReceiveBuilderTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ReceiveBuilderTest.java @@ -8,6 +8,7 @@ import akka.actor.testkit.typed.javadsl.LogCapturing; import akka.actor.testkit.typed.javadsl.TestKitJunitResource; import akka.actor.testkit.typed.javadsl.TestProbe; import akka.actor.typed.ActorRef; +import akka.actor.typed.PostStop; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -139,4 +140,42 @@ public class ReceiveBuilderTest extends JUnitSuite { ref.tell("message"); probe.expectMessage("message"); } + + public void compileOnlyHandlerAllowedToThrowCheckedException() { + Behavior behavior = + ReceiveBuilder.create() + .onMessageEquals( + "exactly", + () -> { + throw new Exception("checked"); + }) + .onMessage( + String.class, + msg -> { + throw new Exception("checked"); + }) + .onMessage( + Integer.class, + msg -> true, + msg -> { + throw new Exception("checked"); + }) + .onSignalEquals( + PostStop.instance(), + () -> { + throw new Exception("checked"); + }) + .onSignal( + PostStop.class, + (signal) -> { + throw new Exception("checked"); + }) + .onSignal( + PostStop.class, + signal -> true, + signal -> { + throw new Exception("checked"); + }) + .build(); + } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRefResolver.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRefResolver.scala index f90187027f..5c55db8be2 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRefResolver.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRefResolver.scala @@ -81,9 +81,7 @@ abstract class ActorRefResolver extends Extension { object ActorRefResolverSetup { def apply[T <: Extension](createExtension: ActorSystem[_] => ActorRefResolver): ActorRefResolverSetup = - new ActorRefResolverSetup(new java.util.function.Function[ActorSystem[_], ActorRefResolver] { - override def apply(sys: ActorSystem[_]): ActorRefResolver = createExtension(sys) - }) // TODO can be simplified when compiled only with Scala >= 2.12 + new ActorRefResolverSetup(sys => createExtension(sys)) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Extensions.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Extensions.scala index 34fb4644a8..2ebdc91252 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Extensions.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Extensions.scala @@ -174,6 +174,4 @@ abstract class ExtensionSetup[T <: Extension]( * extension with stub/mock implementations. */ abstract class AbstractExtensionSetup[T <: Extension](extId: ExtensionId[T], createExtension: ActorSystem[_] => T) - extends ExtensionSetup[T](extId, new java.util.function.Function[ActorSystem[_], T] { - override def apply(sys: ActorSystem[_]): T = createExtension.apply(sys) - }) // TODO can be simplified when compiled only with Scala >= 2.12 + extends ExtensionSetup[T](extId, createExtension.apply) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala index a33455a314..5ac9bbdf54 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala @@ -6,21 +6,18 @@ package akka.actor.typed package internal import java.time.Duration -import java.util.function.{ Function => JFunction } import java.util.ArrayList import java.util.Optional import java.util.concurrent.CompletionStage -import java.util.function.BiConsumer -import java.util.function.BiFunction import scala.concurrent.{ ExecutionContextExecutor, Future } import scala.reflect.ClassTag import scala.util.Try - import akka.annotation.InternalApi import akka.util.OptionVal import akka.util.Timeout import akka.util.JavaDurationConverters._ +import com.github.ghik.silencer.silent import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -139,17 +136,15 @@ import org.slf4j.LoggerFactory } // Java API impl - def ask[Req, Res]( + @silent("never used") // resClass is just a pretend param + override def ask[Req, Res]( resClass: Class[Res], target: RecipientRef[Req], responseTimeout: Duration, - createRequest: JFunction[ActorRef[Res], Req], - applyToResponse: BiFunction[Res, Throwable, T]): Unit = { + createRequest: akka.japi.function.Function[ActorRef[Res], Req], + applyToResponse: akka.japi.function.Function2[Res, Throwable, T]): Unit = { import akka.actor.typed.javadsl.AskPattern - val message = new akka.japi.function.Function[ActorRef[Res], Req] { - def apply(ref: ActorRef[Res]): Req = createRequest(ref) - } - pipeToSelf(AskPattern.ask(target, message, responseTimeout, system.scheduler), applyToResponse) + pipeToSelf(AskPattern.ask(target, (ref) => createRequest(ref), responseTimeout, system.scheduler), applyToResponse) } // Scala API impl @@ -158,14 +153,14 @@ import org.slf4j.LoggerFactory } // Java API impl - def pipeToSelf[Value](future: CompletionStage[Value], applyToResult: BiFunction[Value, Throwable, T]): Unit = { - future.whenComplete(new BiConsumer[Value, Throwable] { - def accept(value: Value, ex: Throwable): Unit = { - if (value != null) self.unsafeUpcast ! AdaptMessage(value, applyToResult.apply(_: Value, null)) - if (ex != null) - self.unsafeUpcast ! AdaptMessage(ex, applyToResult.apply(null.asInstanceOf[Value], _: Throwable)) - } - }) + def pipeToSelf[Value]( + future: CompletionStage[Value], + applyToResult: akka.japi.function.Function2[Value, Throwable, T]): Unit = { + future.whenComplete { (value, ex) => + if (value != null) self.unsafeUpcast ! AdaptMessage(value, applyToResult.apply(_: Value, null)) + if (ex != null) + self.unsafeUpcast ! AdaptMessage(ex, applyToResult.apply(null.asInstanceOf[Value], _: Throwable)) + } } private[akka] override def spawnMessageAdapter[U](f: U => T, name: String): ActorRef[U] = @@ -185,7 +180,7 @@ import org.slf4j.LoggerFactory internalMessageAdapter(messageClass, f) } - override def messageAdapter[U](messageClass: Class[U], f: JFunction[U, T]): ActorRef[U] = + override def messageAdapter[U](messageClass: Class[U], f: akka.japi.function.Function[U, T]): ActorRef[U] = internalMessageAdapter(messageClass, f.apply) private def internalMessageAdapter[U](messageClass: Class[U], f: U => T): ActorRef[U] = { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala index d6edb93333..d70e8c5f6b 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala @@ -4,7 +4,6 @@ package akka.actor.typed.internal -import java.util.function.Consumer import java.util.function.{ Function => JFunction } import akka.actor.DeadLetter @@ -18,6 +17,7 @@ import akka.actor.typed.javadsl import akka.actor.typed.scaladsl import akka.actor.typed.scaladsl.ActorContext import akka.annotation.{ InternalApi, InternalStableApi } +import akka.japi.function.Procedure import akka.util.{ unused, ConstantFun } /** @@ -107,7 +107,7 @@ import akka.util.{ unused, ConstantFun } } } - override def forEach(f: Consumer[T]): Unit = foreach(f.accept) + override def forEach(f: Procedure[T]): Unit = foreach(f.apply) override def unstashAll(behavior: Behavior[T]): Behavior[T] = { val behav = unstash(behavior, size, ConstantFun.scalaIdentityFunction[T]) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala index 70628d41ff..f367687dbc 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala @@ -5,7 +5,6 @@ package akka.actor.typed.javadsl import java.time.Duration -import java.util.function.{ BiFunction, Function => JFunction } import akka.annotation.DoNotInherit import akka.actor.ClassicActorContextProvider @@ -267,7 +266,7 @@ trait ActorContext[T] extends TypedActorContext[T] with ClassicActorContextProvi * *Warning*: This method is not thread-safe and must not be accessed from threads other * than the ordinary actor message processing thread, such as [[java.util.concurrent.CompletionStage]] callbacks. */ - def messageAdapter[U](messageClass: Class[U], f: JFunction[U, T]): ActorRef[U] + def messageAdapter[U](messageClass: Class[U], f: akka.japi.function.Function[U, T]): ActorRef[U] /** * Perform a single request-response message interaction with another actor, and transform the messages back to @@ -299,8 +298,8 @@ trait ActorContext[T] extends TypedActorContext[T] with ClassicActorContextProvi resClass: Class[Res], target: RecipientRef[Req], responseTimeout: Duration, - createRequest: java.util.function.Function[ActorRef[Res], Req], - applyToResponse: BiFunction[Res, Throwable, T]): Unit + createRequest: akka.japi.function.Function[ActorRef[Res], Req], + applyToResponse: akka.japi.function.Function2[Res, Throwable, T]): Unit /** * Sends the result of the given `CompletionStage` to this Actor (“`self`”), after adapted it with @@ -309,6 +308,8 @@ trait ActorContext[T] extends TypedActorContext[T] with ClassicActorContextProvi * This method is thread-safe and can be called from other threads than the ordinary * actor message processing thread, such as [[java.util.concurrent.CompletionStage]] callbacks. */ - def pipeToSelf[Value](future: CompletionStage[Value], applyToResult: BiFunction[Value, Throwable, T]): Unit + def pipeToSelf[Value]( + future: CompletionStage[Value], + applyToResult: akka.japi.function.Function2[Value, Throwable, T]): Unit } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/AskPattern.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/AskPattern.scala index 6bf2e76d93..0e5e3ef90a 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/AskPattern.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/AskPattern.scala @@ -37,8 +37,8 @@ object AskPattern { */ def ask[Req, Res]( actor: RecipientRef[Req], - message: JFunction[ActorRef[Res], Req], + messageFactory: JFunction[ActorRef[Res], Req], timeout: Duration, scheduler: Scheduler): CompletionStage[Res] = - (actor.ask(message.apply)(timeout.asScala, scheduler)).toJava + (actor.ask(messageFactory.apply)(timeout.asScala, scheduler)).toJava } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/BehaviorBuilder.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/BehaviorBuilder.scala index f11150e207..20cc188e80 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/BehaviorBuilder.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/BehaviorBuilder.scala @@ -4,11 +4,10 @@ package akka.actor.typed.javadsl -import java.util.function.Supplier - import scala.annotation.tailrec -import akka.japi.function.{ Function => JFunction } +import akka.japi.function.Function +import akka.japi.function.Creator import akka.japi.function.{ Predicate => JPredicate } import akka.annotation.InternalApi import akka.actor.typed.Behavior @@ -24,6 +23,9 @@ import akka.util.OptionVal * When handling a message or signal, this [[Behavior]] will consider all handlers in the order they were added, * looking for the first handler for which both the type and the (optional) predicate match. * + * Akka `akka.japi.function` lambda types are used throughout to allow handlers to throw checked exceptions + * (which will fail the actor). + * * @tparam T the common superclass of all supported messages. */ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signalHandlers: List[Case[T, Signal]]) { @@ -41,7 +43,7 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa * @tparam M type of message to match * @return a new behavior builder with the specified handling appended */ - def onMessage[M <: T](`type`: Class[M], handler: JFunction[M, Behavior[T]]): BehaviorBuilder[T] = + def onMessage[M <: T](`type`: Class[M], handler: Function[M, Behavior[T]]): BehaviorBuilder[T] = withMessage(OptionVal.Some(`type`), OptionVal.None, handler) /** @@ -53,7 +55,7 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa * @tparam M type of message to match * @return a new behavior builder with the specified handling appended */ - def onMessage[M <: T](`type`: Class[M], test: JPredicate[M], handler: JFunction[M, Behavior[T]]): BehaviorBuilder[T] = + def onMessage[M <: T](`type`: Class[M], test: JPredicate[M], handler: Function[M, Behavior[T]]): BehaviorBuilder[T] = withMessage(OptionVal.Some(`type`), OptionVal.Some((t: T) => test.test(t.asInstanceOf[M])), handler) /** @@ -66,7 +68,7 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa * @param handler action to apply when the type matches * @return a new behavior builder with the specified handling appended */ - def onMessageUnchecked[M <: T](`type`: Class[_ <: T], handler: JFunction[M, Behavior[T]]): BehaviorBuilder[T] = + def onMessageUnchecked[M <: T](`type`: Class[_ <: T], handler: Function[M, Behavior[T]]): BehaviorBuilder[T] = withMessage[M](OptionVal.Some(`type`.asInstanceOf[Class[M]]), OptionVal.None, handler) /** @@ -76,13 +78,11 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa * @param handler action to apply when the message matches * @return a new behavior builder with the specified handling appended */ - def onMessageEquals(msg: T, handler: Supplier[Behavior[T]]): BehaviorBuilder[T] = + def onMessageEquals(msg: T, handler: Creator[Behavior[T]]): BehaviorBuilder[T] = withMessage[T]( OptionVal.Some(msg.getClass.asInstanceOf[Class[T]]), OptionVal.Some(_.equals(msg)), - new JFunction[T, Behavior[T]] { - override def apply(msg: T): Behavior[T] = handler.get() - }) + (_: T) => handler.create()) /** * Add a new case to the message handling matching any message. Subsequent `onMessage` clauses will @@ -91,7 +91,7 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa * @param handler action to apply for any message * @return a new behavior builder with the specified handling appended */ - def onAnyMessage(handler: JFunction[T, Behavior[T]]): BehaviorBuilder[T] = + def onAnyMessage(handler: Function[T, Behavior[T]]): BehaviorBuilder[T] = withMessage(OptionVal.None, OptionVal.None, handler) /** @@ -102,8 +102,8 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa * @tparam M type of signal to match * @return a new behavior builder with the specified handling appended */ - def onSignal[M <: Signal](`type`: Class[M], handler: JFunction[M, Behavior[T]]): BehaviorBuilder[T] = - withSignal(`type`, OptionVal.None, handler.asInstanceOf[JFunction[Signal, Behavior[T]]]) + def onSignal[M <: Signal](`type`: Class[M], handler: Function[M, Behavior[T]]): BehaviorBuilder[T] = + withSignal(`type`, OptionVal.None, handler.asInstanceOf[Function[Signal, Behavior[T]]]) /** * Add a new predicated case to the signal handling. @@ -117,11 +117,11 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa def onSignal[M <: Signal]( `type`: Class[M], test: JPredicate[M], - handler: JFunction[M, Behavior[T]]): BehaviorBuilder[T] = + handler: Function[M, Behavior[T]]): BehaviorBuilder[T] = withSignal( `type`, OptionVal.Some((t: Signal) => test.test(t.asInstanceOf[M])), - handler.asInstanceOf[JFunction[Signal, Behavior[T]]]) + handler.asInstanceOf[Function[Signal, Behavior[T]]]) /** * Add a new case to the signal handling matching equal signals. @@ -130,17 +130,13 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa * @param handler action to apply when the message matches * @return a new behavior builder with the specified handling appended */ - def onSignalEquals(signal: Signal, handler: Supplier[Behavior[T]]): BehaviorBuilder[T] = - withSignal(signal.getClass, OptionVal.Some(_.equals(signal)), new JFunction[Signal, Behavior[T]] { - override def apply(signal: Signal): Behavior[T] = { - handler.get() - } - }) + def onSignalEquals(signal: Signal, handler: Creator[Behavior[T]]): BehaviorBuilder[T] = + withSignal(signal.getClass, OptionVal.Some(_.equals(signal)), (_: Signal) => handler.create()) private def withMessage[M <: T]( clazz: OptionVal[Class[M]], test: OptionVal[M => Boolean], - handler: JFunction[M, Behavior[T]]): BehaviorBuilder[T] = { + handler: Function[M, Behavior[T]]): BehaviorBuilder[T] = { val newCase = Case(clazz, test, handler) new BehaviorBuilder[T](newCase.asInstanceOf[Case[T, T]] +: messageHandlers, signalHandlers) } @@ -148,7 +144,7 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa private def withSignal[M <: Signal]( `type`: Class[M], test: OptionVal[Signal => Boolean], - handler: JFunction[Signal, Behavior[T]]): BehaviorBuilder[T] = { + handler: Function[Signal, Behavior[T]]): BehaviorBuilder[T] = { new BehaviorBuilder[T]( messageHandlers, Case(OptionVal.Some(`type`), test, handler).asInstanceOf[Case[T, Signal]] +: signalHandlers) @@ -165,7 +161,7 @@ object BehaviorBuilder { private[javadsl] final case class Case[BT, MT]( `type`: OptionVal[Class[_ <: MT]], test: OptionVal[MT => Boolean], - handler: JFunction[MT, Behavior[BT]]) + handler: Function[MT, Behavior[BT]]) /** * @return new empty immutable behavior builder. diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/StashBuffer.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/StashBuffer.scala index 768b6813e6..671d1775a1 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/StashBuffer.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/StashBuffer.scala @@ -4,12 +4,12 @@ package akka.actor.typed.javadsl -import java.util.function.Consumer import java.util.function.{ Function => JFunction } import akka.actor.typed.Behavior import akka.actor.typed.scaladsl import akka.annotation.DoNotInherit +import akka.japi.function.Procedure /** * A non thread safe mutable message buffer that can be used to buffer messages inside actors @@ -72,7 +72,7 @@ import akka.annotation.DoNotInherit * * @param f the function to apply to each element */ - def forEach(f: Consumer[T]): Unit + def forEach(f: Procedure[T]): Unit /** * Process all stashed messages with the `behavior` and the returned diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/ReplicatorMessageAdapter.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/ReplicatorMessageAdapter.scala index b265d1f5ee..44003f64e4 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/ReplicatorMessageAdapter.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/ReplicatorMessageAdapter.scala @@ -64,7 +64,7 @@ class ReplicatorMessageAdapter[A, B <: ReplicatedData]( * the replicator are transformed to the message protocol of the requesting actor with * the given `responseAdapter` function. */ - def subscribe(key: Key[B], responseAdapter: JFunction[Replicator.SubscribeResponse[B], A]): Unit = { + def subscribe(key: Key[B], responseAdapter: akka.japi.function.Function[Replicator.SubscribeResponse[B], A]): Unit = { // unsubscribe in case it's called more than once per key unsubscribe(key) changedMessageAdapters.get(key).foreach { subscriber =>