Use japi lambdas where it makes sense (#27790)

* Use akka.japi.function types for adapter lambdas

The Java lambda types does not allow for throwing exceptions but
because of how we run the adapter lambdas in-actor that is fine, so
use our own akka.japi.function types to allow for that.

* Use akka.japi.function types for lambdas where it makes sense
This commit is contained in:
Johan Andrén 2019-09-26 11:07:56 +02:00 committed by Arnout Engelen
parent bd7a264b2f
commit e74831d78b
13 changed files with 139 additions and 68 deletions

View file

@ -5,7 +5,6 @@
package akka.actor.testkit.typed.internal
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.function
import akka.actor.{ ActorPath, Cancellable }
import akka.actor.typed.{ ActorRef, Behavior, Props }
@ -15,7 +14,6 @@ import akka.actor.testkit.typed.Effect._
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
import scala.compat.java8.FunctionConverters._
/**
* INTERNAL API
@ -47,9 +45,9 @@ import scala.compat.java8.FunctionConverters._
effectQueue.offer(MessageAdapter(implicitly[ClassTag[U]].runtimeClass.asInstanceOf[Class[U]], f))
ref
}
override def messageAdapter[U](messageClass: Class[U], f: function.Function[U, T]): ActorRef[U] = {
override def messageAdapter[U](messageClass: Class[U], f: akka.japi.function.Function[U, T]): ActorRef[U] = {
val ref = super.messageAdapter(messageClass, f)
effectQueue.offer(MessageAdapter[U, T](messageClass, f.asScala))
effectQueue.offer(MessageAdapter[U, T](messageClass, f.apply))
ref
}
override def spawn[U](behavior: Behavior[U], name: String, props: Props = Props.empty): ActorRef[U] = {

View file

@ -70,10 +70,10 @@ public class BehaviorTestKitTest extends JUnitSuite {
public static class CreateMessageAdapter implements Command {
private final Class<Object> clazz;
private final Function<Object, Command> f;
private final akka.japi.function.Function<Object, Command> f;
@SuppressWarnings("unchecked")
public CreateMessageAdapter(Class clazz, Function<Object, Command> f) {
public CreateMessageAdapter(Class clazz, akka.japi.function.Function<Object, Command> f) {
this.clazz = clazz;
this.f = f;
}

View file

@ -170,4 +170,50 @@ public class ActorCompile {
.onFailure(IllegalStateException.class, strategy6))
.onFailure(RuntimeException.class, strategy1);
}
// actor context
{
final ActorRef<Object> otherActor = null;
Behavior<String> behavior =
Behaviors.setup(
context -> {
context.ask(
String.class,
otherActor,
Duration.ofSeconds(10),
(ActorRef<String> respRef) -> new Object(),
(String res, Throwable failure) -> {
// checked exception should be ok
if (failure != null) throw new Exception(failure);
else return "success";
});
ActorRef<Integer> adapter =
context.messageAdapter(
Integer.class,
(number) -> {
// checked exception should be ok
if (number < 10) throw new Exception("too small number");
else return number.toString();
});
return Behaviors.empty();
});
}
// stash buffer
{
Behavior<String> behavior =
Behaviors.withStash(
5,
stash -> {
stash.forEach(
msg -> {
// checked is ok
throw new Exception("checked");
});
return Behaviors.empty();
});
}
}

View file

@ -8,6 +8,7 @@ import akka.actor.testkit.typed.javadsl.LogCapturing;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import akka.actor.typed.PostStop;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
@ -139,4 +140,42 @@ public class ReceiveBuilderTest extends JUnitSuite {
ref.tell("message");
probe.expectMessage("message");
}
public void compileOnlyHandlerAllowedToThrowCheckedException() {
Behavior<Object> behavior =
ReceiveBuilder.create()
.onMessageEquals(
"exactly",
() -> {
throw new Exception("checked");
})
.onMessage(
String.class,
msg -> {
throw new Exception("checked");
})
.onMessage(
Integer.class,
msg -> true,
msg -> {
throw new Exception("checked");
})
.onSignalEquals(
PostStop.instance(),
() -> {
throw new Exception("checked");
})
.onSignal(
PostStop.class,
(signal) -> {
throw new Exception("checked");
})
.onSignal(
PostStop.class,
signal -> true,
signal -> {
throw new Exception("checked");
})
.build();
}
}

View file

@ -81,9 +81,7 @@ abstract class ActorRefResolver extends Extension {
object ActorRefResolverSetup {
def apply[T <: Extension](createExtension: ActorSystem[_] => ActorRefResolver): ActorRefResolverSetup =
new ActorRefResolverSetup(new java.util.function.Function[ActorSystem[_], ActorRefResolver] {
override def apply(sys: ActorSystem[_]): ActorRefResolver = createExtension(sys)
}) // TODO can be simplified when compiled only with Scala >= 2.12
new ActorRefResolverSetup(sys => createExtension(sys))
}

View file

@ -174,6 +174,4 @@ abstract class ExtensionSetup[T <: Extension](
* extension with stub/mock implementations.
*/
abstract class AbstractExtensionSetup[T <: Extension](extId: ExtensionId[T], createExtension: ActorSystem[_] => T)
extends ExtensionSetup[T](extId, new java.util.function.Function[ActorSystem[_], T] {
override def apply(sys: ActorSystem[_]): T = createExtension.apply(sys)
}) // TODO can be simplified when compiled only with Scala >= 2.12
extends ExtensionSetup[T](extId, createExtension.apply)

View file

@ -6,21 +6,18 @@ package akka.actor.typed
package internal
import java.time.Duration
import java.util.function.{ Function => JFunction }
import java.util.ArrayList
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.BiConsumer
import java.util.function.BiFunction
import scala.concurrent.{ ExecutionContextExecutor, Future }
import scala.reflect.ClassTag
import scala.util.Try
import akka.annotation.InternalApi
import akka.util.OptionVal
import akka.util.Timeout
import akka.util.JavaDurationConverters._
import com.github.ghik.silencer.silent
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@ -139,17 +136,15 @@ import org.slf4j.LoggerFactory
}
// Java API impl
def ask[Req, Res](
@silent("never used") // resClass is just a pretend param
override def ask[Req, Res](
resClass: Class[Res],
target: RecipientRef[Req],
responseTimeout: Duration,
createRequest: JFunction[ActorRef[Res], Req],
applyToResponse: BiFunction[Res, Throwable, T]): Unit = {
createRequest: akka.japi.function.Function[ActorRef[Res], Req],
applyToResponse: akka.japi.function.Function2[Res, Throwable, T]): Unit = {
import akka.actor.typed.javadsl.AskPattern
val message = new akka.japi.function.Function[ActorRef[Res], Req] {
def apply(ref: ActorRef[Res]): Req = createRequest(ref)
}
pipeToSelf(AskPattern.ask(target, message, responseTimeout, system.scheduler), applyToResponse)
pipeToSelf(AskPattern.ask(target, (ref) => createRequest(ref), responseTimeout, system.scheduler), applyToResponse)
}
// Scala API impl
@ -158,14 +153,14 @@ import org.slf4j.LoggerFactory
}
// Java API impl
def pipeToSelf[Value](future: CompletionStage[Value], applyToResult: BiFunction[Value, Throwable, T]): Unit = {
future.whenComplete(new BiConsumer[Value, Throwable] {
def accept(value: Value, ex: Throwable): Unit = {
def pipeToSelf[Value](
future: CompletionStage[Value],
applyToResult: akka.japi.function.Function2[Value, Throwable, T]): Unit = {
future.whenComplete { (value, ex) =>
if (value != null) self.unsafeUpcast ! AdaptMessage(value, applyToResult.apply(_: Value, null))
if (ex != null)
self.unsafeUpcast ! AdaptMessage(ex, applyToResult.apply(null.asInstanceOf[Value], _: Throwable))
}
})
}
private[akka] override def spawnMessageAdapter[U](f: U => T, name: String): ActorRef[U] =
@ -185,7 +180,7 @@ import org.slf4j.LoggerFactory
internalMessageAdapter(messageClass, f)
}
override def messageAdapter[U](messageClass: Class[U], f: JFunction[U, T]): ActorRef[U] =
override def messageAdapter[U](messageClass: Class[U], f: akka.japi.function.Function[U, T]): ActorRef[U] =
internalMessageAdapter(messageClass, f.apply)
private def internalMessageAdapter[U](messageClass: Class[U], f: U => T): ActorRef[U] = {

View file

@ -4,7 +4,6 @@
package akka.actor.typed.internal
import java.util.function.Consumer
import java.util.function.{ Function => JFunction }
import akka.actor.DeadLetter
@ -18,6 +17,7 @@ import akka.actor.typed.javadsl
import akka.actor.typed.scaladsl
import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.japi.function.Procedure
import akka.util.{ unused, ConstantFun }
/**
@ -107,7 +107,7 @@ import akka.util.{ unused, ConstantFun }
}
}
override def forEach(f: Consumer[T]): Unit = foreach(f.accept)
override def forEach(f: Procedure[T]): Unit = foreach(f.apply)
override def unstashAll(behavior: Behavior[T]): Behavior[T] = {
val behav = unstash(behavior, size, ConstantFun.scalaIdentityFunction[T])

View file

@ -5,7 +5,6 @@
package akka.actor.typed.javadsl
import java.time.Duration
import java.util.function.{ BiFunction, Function => JFunction }
import akka.annotation.DoNotInherit
import akka.actor.ClassicActorContextProvider
@ -267,7 +266,7 @@ trait ActorContext[T] extends TypedActorContext[T] with ClassicActorContextProvi
* *Warning*: This method is not thread-safe and must not be accessed from threads other
* than the ordinary actor message processing thread, such as [[java.util.concurrent.CompletionStage]] callbacks.
*/
def messageAdapter[U](messageClass: Class[U], f: JFunction[U, T]): ActorRef[U]
def messageAdapter[U](messageClass: Class[U], f: akka.japi.function.Function[U, T]): ActorRef[U]
/**
* Perform a single request-response message interaction with another actor, and transform the messages back to
@ -299,8 +298,8 @@ trait ActorContext[T] extends TypedActorContext[T] with ClassicActorContextProvi
resClass: Class[Res],
target: RecipientRef[Req],
responseTimeout: Duration,
createRequest: java.util.function.Function[ActorRef[Res], Req],
applyToResponse: BiFunction[Res, Throwable, T]): Unit
createRequest: akka.japi.function.Function[ActorRef[Res], Req],
applyToResponse: akka.japi.function.Function2[Res, Throwable, T]): Unit
/**
* Sends the result of the given `CompletionStage` to this Actor (`self`), after adapted it with
@ -309,6 +308,8 @@ trait ActorContext[T] extends TypedActorContext[T] with ClassicActorContextProvi
* This method is thread-safe and can be called from other threads than the ordinary
* actor message processing thread, such as [[java.util.concurrent.CompletionStage]] callbacks.
*/
def pipeToSelf[Value](future: CompletionStage[Value], applyToResult: BiFunction[Value, Throwable, T]): Unit
def pipeToSelf[Value](
future: CompletionStage[Value],
applyToResult: akka.japi.function.Function2[Value, Throwable, T]): Unit
}

View file

@ -37,8 +37,8 @@ object AskPattern {
*/
def ask[Req, Res](
actor: RecipientRef[Req],
message: JFunction[ActorRef[Res], Req],
messageFactory: JFunction[ActorRef[Res], Req],
timeout: Duration,
scheduler: Scheduler): CompletionStage[Res] =
(actor.ask(message.apply)(timeout.asScala, scheduler)).toJava
(actor.ask(messageFactory.apply)(timeout.asScala, scheduler)).toJava
}

View file

@ -4,11 +4,10 @@
package akka.actor.typed.javadsl
import java.util.function.Supplier
import scala.annotation.tailrec
import akka.japi.function.{ Function => JFunction }
import akka.japi.function.Function
import akka.japi.function.Creator
import akka.japi.function.{ Predicate => JPredicate }
import akka.annotation.InternalApi
import akka.actor.typed.Behavior
@ -24,6 +23,9 @@ import akka.util.OptionVal
* When handling a message or signal, this [[Behavior]] will consider all handlers in the order they were added,
* looking for the first handler for which both the type and the (optional) predicate match.
*
* Akka `akka.japi.function` lambda types are used throughout to allow handlers to throw checked exceptions
* (which will fail the actor).
*
* @tparam T the common superclass of all supported messages.
*/
final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signalHandlers: List[Case[T, Signal]]) {
@ -41,7 +43,7 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa
* @tparam M type of message to match
* @return a new behavior builder with the specified handling appended
*/
def onMessage[M <: T](`type`: Class[M], handler: JFunction[M, Behavior[T]]): BehaviorBuilder[T] =
def onMessage[M <: T](`type`: Class[M], handler: Function[M, Behavior[T]]): BehaviorBuilder[T] =
withMessage(OptionVal.Some(`type`), OptionVal.None, handler)
/**
@ -53,7 +55,7 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa
* @tparam M type of message to match
* @return a new behavior builder with the specified handling appended
*/
def onMessage[M <: T](`type`: Class[M], test: JPredicate[M], handler: JFunction[M, Behavior[T]]): BehaviorBuilder[T] =
def onMessage[M <: T](`type`: Class[M], test: JPredicate[M], handler: Function[M, Behavior[T]]): BehaviorBuilder[T] =
withMessage(OptionVal.Some(`type`), OptionVal.Some((t: T) => test.test(t.asInstanceOf[M])), handler)
/**
@ -66,7 +68,7 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa
* @param handler action to apply when the type matches
* @return a new behavior builder with the specified handling appended
*/
def onMessageUnchecked[M <: T](`type`: Class[_ <: T], handler: JFunction[M, Behavior[T]]): BehaviorBuilder[T] =
def onMessageUnchecked[M <: T](`type`: Class[_ <: T], handler: Function[M, Behavior[T]]): BehaviorBuilder[T] =
withMessage[M](OptionVal.Some(`type`.asInstanceOf[Class[M]]), OptionVal.None, handler)
/**
@ -76,13 +78,11 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa
* @param handler action to apply when the message matches
* @return a new behavior builder with the specified handling appended
*/
def onMessageEquals(msg: T, handler: Supplier[Behavior[T]]): BehaviorBuilder[T] =
def onMessageEquals(msg: T, handler: Creator[Behavior[T]]): BehaviorBuilder[T] =
withMessage[T](
OptionVal.Some(msg.getClass.asInstanceOf[Class[T]]),
OptionVal.Some(_.equals(msg)),
new JFunction[T, Behavior[T]] {
override def apply(msg: T): Behavior[T] = handler.get()
})
(_: T) => handler.create())
/**
* Add a new case to the message handling matching any message. Subsequent `onMessage` clauses will
@ -91,7 +91,7 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa
* @param handler action to apply for any message
* @return a new behavior builder with the specified handling appended
*/
def onAnyMessage(handler: JFunction[T, Behavior[T]]): BehaviorBuilder[T] =
def onAnyMessage(handler: Function[T, Behavior[T]]): BehaviorBuilder[T] =
withMessage(OptionVal.None, OptionVal.None, handler)
/**
@ -102,8 +102,8 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa
* @tparam M type of signal to match
* @return a new behavior builder with the specified handling appended
*/
def onSignal[M <: Signal](`type`: Class[M], handler: JFunction[M, Behavior[T]]): BehaviorBuilder[T] =
withSignal(`type`, OptionVal.None, handler.asInstanceOf[JFunction[Signal, Behavior[T]]])
def onSignal[M <: Signal](`type`: Class[M], handler: Function[M, Behavior[T]]): BehaviorBuilder[T] =
withSignal(`type`, OptionVal.None, handler.asInstanceOf[Function[Signal, Behavior[T]]])
/**
* Add a new predicated case to the signal handling.
@ -117,11 +117,11 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa
def onSignal[M <: Signal](
`type`: Class[M],
test: JPredicate[M],
handler: JFunction[M, Behavior[T]]): BehaviorBuilder[T] =
handler: Function[M, Behavior[T]]): BehaviorBuilder[T] =
withSignal(
`type`,
OptionVal.Some((t: Signal) => test.test(t.asInstanceOf[M])),
handler.asInstanceOf[JFunction[Signal, Behavior[T]]])
handler.asInstanceOf[Function[Signal, Behavior[T]]])
/**
* Add a new case to the signal handling matching equal signals.
@ -130,17 +130,13 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa
* @param handler action to apply when the message matches
* @return a new behavior builder with the specified handling appended
*/
def onSignalEquals(signal: Signal, handler: Supplier[Behavior[T]]): BehaviorBuilder[T] =
withSignal(signal.getClass, OptionVal.Some(_.equals(signal)), new JFunction[Signal, Behavior[T]] {
override def apply(signal: Signal): Behavior[T] = {
handler.get()
}
})
def onSignalEquals(signal: Signal, handler: Creator[Behavior[T]]): BehaviorBuilder[T] =
withSignal(signal.getClass, OptionVal.Some(_.equals(signal)), (_: Signal) => handler.create())
private def withMessage[M <: T](
clazz: OptionVal[Class[M]],
test: OptionVal[M => Boolean],
handler: JFunction[M, Behavior[T]]): BehaviorBuilder[T] = {
handler: Function[M, Behavior[T]]): BehaviorBuilder[T] = {
val newCase = Case(clazz, test, handler)
new BehaviorBuilder[T](newCase.asInstanceOf[Case[T, T]] +: messageHandlers, signalHandlers)
}
@ -148,7 +144,7 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa
private def withSignal[M <: Signal](
`type`: Class[M],
test: OptionVal[Signal => Boolean],
handler: JFunction[Signal, Behavior[T]]): BehaviorBuilder[T] = {
handler: Function[Signal, Behavior[T]]): BehaviorBuilder[T] = {
new BehaviorBuilder[T](
messageHandlers,
Case(OptionVal.Some(`type`), test, handler).asInstanceOf[Case[T, Signal]] +: signalHandlers)
@ -165,7 +161,7 @@ object BehaviorBuilder {
private[javadsl] final case class Case[BT, MT](
`type`: OptionVal[Class[_ <: MT]],
test: OptionVal[MT => Boolean],
handler: JFunction[MT, Behavior[BT]])
handler: Function[MT, Behavior[BT]])
/**
* @return new empty immutable behavior builder.

View file

@ -4,12 +4,12 @@
package akka.actor.typed.javadsl
import java.util.function.Consumer
import java.util.function.{ Function => JFunction }
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl
import akka.annotation.DoNotInherit
import akka.japi.function.Procedure
/**
* A non thread safe mutable message buffer that can be used to buffer messages inside actors
@ -72,7 +72,7 @@ import akka.annotation.DoNotInherit
*
* @param f the function to apply to each element
*/
def forEach(f: Consumer[T]): Unit
def forEach(f: Procedure[T]): Unit
/**
* Process all stashed messages with the `behavior` and the returned

View file

@ -64,7 +64,7 @@ class ReplicatorMessageAdapter[A, B <: ReplicatedData](
* the replicator are transformed to the message protocol of the requesting actor with
* the given `responseAdapter` function.
*/
def subscribe(key: Key[B], responseAdapter: JFunction[Replicator.SubscribeResponse[B], A]): Unit = {
def subscribe(key: Key[B], responseAdapter: akka.japi.function.Function[Replicator.SubscribeResponse[B], A]): Unit = {
// unsubscribe in case it's called more than once per key
unsubscribe(key)
changedMessageAdapters.get(key).foreach { subscriber =>