diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 342a8d6f6f..e0aa88350c 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -246,3 +246,7 @@ Akka Typed APIs are still marked as [may change](../common/may-change.md) and th * New abstract class `EventSourcedEntityWithEnforcedReplies` in Java API for Akka Cluster Sharding Typed and corresponding factory method `Entity.ofEventSourcedEntityWithEnforcedReplies` to ease the creation of `EventSourcedBehavior` with enforced replies. * New method `EventSourcedEntity.withEnforcedReplies` added to Scala API to ease the creation of `EventSourcedBehavior` with enforced replies. * `Routers.pool` now take a factory function rather than a `Behavior` to protect against accidentally sharing same behavior instance and state across routees. + +### Akka Typed Stream API changes + +* `ActorSoruce.actorRef` relying on `PartialFunction` has been replaced in the Java API with a variant more suitable to be called by Java. diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala index fa0cc33e13..95b4cd856f 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala @@ -7,6 +7,7 @@ package akka.stream.typed.javadsl import java.util.function.Predicate import akka.actor.typed._ +import akka.japi.JavaPartialFunction import akka.stream.javadsl._ import akka.stream.{ CompletionStrategy, OverflowStrategy } @@ -49,13 +50,19 @@ object ActorSource { */ def actorRef[T]( completionMatcher: Predicate[T], - failureMatcher: PartialFunction[T, Throwable], + failureMatcher: akka.japi.function.Function[T, java.util.Optional[Throwable]], bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] = { akka.stream.typed.scaladsl.ActorSource .actorRef( { case m if completionMatcher.test(m) => }: PartialFunction[T, Unit], - failureMatcher, + new JavaPartialFunction[T, Throwable] { + override def apply(x: T, isCheck: Boolean): Throwable = { + val result = failureMatcher(x) + if (!result.isPresent) throw JavaPartialFunction.noMatch() + else result.get() + } + }, bufferSize, overflowStrategy) .asJava @@ -78,13 +85,25 @@ object ActorSource { def actorRefWithAck[T, Ack]( ackTo: ActorRef[Ack], ackMessage: Ack, - completionMatcher: PartialFunction[T, CompletionStrategy], - failureMatcher: PartialFunction[T, Throwable]): Source[T, ActorRef[T]] = + completionMatcher: akka.japi.function.Function[T, java.util.Optional[CompletionStrategy]], + failureMatcher: akka.japi.function.Function[T, java.util.Optional[Throwable]]): Source[T, ActorRef[T]] = akka.stream.typed.scaladsl.ActorSource .actorRefWithAck[T, Ack]( ackTo, ackMessage, - completionMatcher.asInstanceOf[PartialFunction[Any, CompletionStrategy]], - failureMatcher.asInstanceOf[PartialFunction[Any, Throwable]]) + new JavaPartialFunction[T, CompletionStrategy] { + override def apply(x: T, isCheck: Boolean): CompletionStrategy = { + val result = completionMatcher(x) + if (!result.isPresent) throw JavaPartialFunction.noMatch() + else result.get() + } + }, + new JavaPartialFunction[T, Throwable] { + override def apply(x: T, isCheck: Boolean): Throwable = { + val result = failureMatcher(x) + if (!result.isPresent) throw JavaPartialFunction.noMatch() + else result.get() + } + }) .asJava } diff --git a/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java b/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java index cec92321d2..e1d7138940 100644 --- a/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java +++ b/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java @@ -12,6 +12,8 @@ import akka.stream.OverflowStrategy; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; +import java.util.Optional; + public class ActorSourceSinkCompileTest { interface Protocol {} @@ -55,32 +57,16 @@ public class ActorSourceSinkCompileTest { { ActorSource.actorRef( - (m) -> m == "complete", - new JavaPartialFunction() { - @Override - public Throwable apply(String x, boolean isCheck) throws Exception { - throw noMatch(); - } - }, - 10, - OverflowStrategy.dropBuffer()) + (m) -> m == "complete", (m) -> Optional.empty(), 10, OverflowStrategy.dropBuffer()) .to(Sink.seq()); } { - final JavaPartialFunction failureMatcher = - new JavaPartialFunction() { - @Override - public Throwable apply(Protocol p, boolean isCheck) throws Exception { - if (p instanceof Failure) { - return ((Failure) p).ex; - } else { - throw noMatch(); - } - } - }; - - ActorSource.actorRef((m) -> false, failureMatcher, 10, OverflowStrategy.dropBuffer()) + ActorSource.actorRef( + (m) -> false, + (m) -> (m instanceof Failure) ? Optional.of(((Failure) m).ex) : Optional.empty(), + 10, + OverflowStrategy.dropBuffer()) .to(Sink.seq()); } } diff --git a/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceExample.java b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceExample.java index d099faabab..e2c2a91d61 100644 --- a/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceExample.java +++ b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceExample.java @@ -12,6 +12,8 @@ import akka.stream.OverflowStrategy; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.stream.typed.javadsl.ActorSource; + +import java.util.Optional; // #actor-source-ref public class ActorSourceExample { @@ -44,20 +46,12 @@ public class ActorSourceExample { { // #actor-source-ref - final JavaPartialFunction failureMatcher = - new JavaPartialFunction() { - public Throwable apply(Protocol p, boolean isCheck) { - if (p instanceof Fail) { - return ((Fail) p).ex; - } else { - throw noMatch(); - } - } - }; - final Source> source = ActorSource.actorRef( - (m) -> m instanceof Complete, failureMatcher, 8, OverflowStrategy.fail()); + (m) -> m instanceof Complete, + (m) -> (m instanceof Fail) ? Optional.of(((Fail) m).ex) : Optional.empty(), + 8, + OverflowStrategy.fail()); final ActorRef ref = source