* Use function instead of scala partial function for java api #26970 * Remove deprecated function #26970
This commit is contained in:
parent
814cfa286c
commit
dd6924465b
4 changed files with 43 additions and 40 deletions
|
|
@ -246,3 +246,7 @@ Akka Typed APIs are still marked as [may change](../common/may-change.md) and th
|
||||||
* New abstract class `EventSourcedEntityWithEnforcedReplies` in Java API for Akka Cluster Sharding Typed and corresponding factory method `Entity.ofEventSourcedEntityWithEnforcedReplies` to ease the creation of `EventSourcedBehavior` with enforced replies.
|
* New abstract class `EventSourcedEntityWithEnforcedReplies` in Java API for Akka Cluster Sharding Typed and corresponding factory method `Entity.ofEventSourcedEntityWithEnforcedReplies` to ease the creation of `EventSourcedBehavior` with enforced replies.
|
||||||
* New method `EventSourcedEntity.withEnforcedReplies` added to Scala API to ease the creation of `EventSourcedBehavior` with enforced replies.
|
* New method `EventSourcedEntity.withEnforcedReplies` added to Scala API to ease the creation of `EventSourcedBehavior` with enforced replies.
|
||||||
* `Routers.pool` now take a factory function rather than a `Behavior` to protect against accidentally sharing same behavior instance and state across routees.
|
* `Routers.pool` now take a factory function rather than a `Behavior` to protect against accidentally sharing same behavior instance and state across routees.
|
||||||
|
|
||||||
|
### Akka Typed Stream API changes
|
||||||
|
|
||||||
|
* `ActorSoruce.actorRef` relying on `PartialFunction` has been replaced in the Java API with a variant more suitable to be called by Java.
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ package akka.stream.typed.javadsl
|
||||||
import java.util.function.Predicate
|
import java.util.function.Predicate
|
||||||
|
|
||||||
import akka.actor.typed._
|
import akka.actor.typed._
|
||||||
|
import akka.japi.JavaPartialFunction
|
||||||
import akka.stream.javadsl._
|
import akka.stream.javadsl._
|
||||||
import akka.stream.{ CompletionStrategy, OverflowStrategy }
|
import akka.stream.{ CompletionStrategy, OverflowStrategy }
|
||||||
|
|
||||||
|
|
@ -49,13 +50,19 @@ object ActorSource {
|
||||||
*/
|
*/
|
||||||
def actorRef[T](
|
def actorRef[T](
|
||||||
completionMatcher: Predicate[T],
|
completionMatcher: Predicate[T],
|
||||||
failureMatcher: PartialFunction[T, Throwable],
|
failureMatcher: akka.japi.function.Function[T, java.util.Optional[Throwable]],
|
||||||
bufferSize: Int,
|
bufferSize: Int,
|
||||||
overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] = {
|
overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] = {
|
||||||
akka.stream.typed.scaladsl.ActorSource
|
akka.stream.typed.scaladsl.ActorSource
|
||||||
.actorRef(
|
.actorRef(
|
||||||
{ case m if completionMatcher.test(m) => }: PartialFunction[T, Unit],
|
{ case m if completionMatcher.test(m) => }: PartialFunction[T, Unit],
|
||||||
failureMatcher,
|
new JavaPartialFunction[T, Throwable] {
|
||||||
|
override def apply(x: T, isCheck: Boolean): Throwable = {
|
||||||
|
val result = failureMatcher(x)
|
||||||
|
if (!result.isPresent) throw JavaPartialFunction.noMatch()
|
||||||
|
else result.get()
|
||||||
|
}
|
||||||
|
},
|
||||||
bufferSize,
|
bufferSize,
|
||||||
overflowStrategy)
|
overflowStrategy)
|
||||||
.asJava
|
.asJava
|
||||||
|
|
@ -78,13 +85,25 @@ object ActorSource {
|
||||||
def actorRefWithAck[T, Ack](
|
def actorRefWithAck[T, Ack](
|
||||||
ackTo: ActorRef[Ack],
|
ackTo: ActorRef[Ack],
|
||||||
ackMessage: Ack,
|
ackMessage: Ack,
|
||||||
completionMatcher: PartialFunction[T, CompletionStrategy],
|
completionMatcher: akka.japi.function.Function[T, java.util.Optional[CompletionStrategy]],
|
||||||
failureMatcher: PartialFunction[T, Throwable]): Source[T, ActorRef[T]] =
|
failureMatcher: akka.japi.function.Function[T, java.util.Optional[Throwable]]): Source[T, ActorRef[T]] =
|
||||||
akka.stream.typed.scaladsl.ActorSource
|
akka.stream.typed.scaladsl.ActorSource
|
||||||
.actorRefWithAck[T, Ack](
|
.actorRefWithAck[T, Ack](
|
||||||
ackTo,
|
ackTo,
|
||||||
ackMessage,
|
ackMessage,
|
||||||
completionMatcher.asInstanceOf[PartialFunction[Any, CompletionStrategy]],
|
new JavaPartialFunction[T, CompletionStrategy] {
|
||||||
failureMatcher.asInstanceOf[PartialFunction[Any, Throwable]])
|
override def apply(x: T, isCheck: Boolean): CompletionStrategy = {
|
||||||
|
val result = completionMatcher(x)
|
||||||
|
if (!result.isPresent) throw JavaPartialFunction.noMatch()
|
||||||
|
else result.get()
|
||||||
|
}
|
||||||
|
},
|
||||||
|
new JavaPartialFunction[T, Throwable] {
|
||||||
|
override def apply(x: T, isCheck: Boolean): Throwable = {
|
||||||
|
val result = failureMatcher(x)
|
||||||
|
if (!result.isPresent) throw JavaPartialFunction.noMatch()
|
||||||
|
else result.get()
|
||||||
|
}
|
||||||
|
})
|
||||||
.asJava
|
.asJava
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,8 @@ import akka.stream.OverflowStrategy;
|
||||||
import akka.stream.javadsl.Sink;
|
import akka.stream.javadsl.Sink;
|
||||||
import akka.stream.javadsl.Source;
|
import akka.stream.javadsl.Source;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
public class ActorSourceSinkCompileTest {
|
public class ActorSourceSinkCompileTest {
|
||||||
|
|
||||||
interface Protocol {}
|
interface Protocol {}
|
||||||
|
|
@ -55,32 +57,16 @@ public class ActorSourceSinkCompileTest {
|
||||||
|
|
||||||
{
|
{
|
||||||
ActorSource.actorRef(
|
ActorSource.actorRef(
|
||||||
(m) -> m == "complete",
|
(m) -> m == "complete", (m) -> Optional.empty(), 10, OverflowStrategy.dropBuffer())
|
||||||
new JavaPartialFunction<String, Throwable>() {
|
|
||||||
@Override
|
|
||||||
public Throwable apply(String x, boolean isCheck) throws Exception {
|
|
||||||
throw noMatch();
|
|
||||||
}
|
|
||||||
},
|
|
||||||
10,
|
|
||||||
OverflowStrategy.dropBuffer())
|
|
||||||
.to(Sink.seq());
|
.to(Sink.seq());
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
final JavaPartialFunction<Protocol, Throwable> failureMatcher =
|
ActorSource.actorRef(
|
||||||
new JavaPartialFunction<Protocol, Throwable>() {
|
(m) -> false,
|
||||||
@Override
|
(m) -> (m instanceof Failure) ? Optional.of(((Failure) m).ex) : Optional.empty(),
|
||||||
public Throwable apply(Protocol p, boolean isCheck) throws Exception {
|
10,
|
||||||
if (p instanceof Failure) {
|
OverflowStrategy.dropBuffer())
|
||||||
return ((Failure) p).ex;
|
|
||||||
} else {
|
|
||||||
throw noMatch();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
ActorSource.actorRef((m) -> false, failureMatcher, 10, OverflowStrategy.dropBuffer())
|
|
||||||
.to(Sink.seq());
|
.to(Sink.seq());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,8 @@ import akka.stream.OverflowStrategy;
|
||||||
import akka.stream.javadsl.Sink;
|
import akka.stream.javadsl.Sink;
|
||||||
import akka.stream.javadsl.Source;
|
import akka.stream.javadsl.Source;
|
||||||
import akka.stream.typed.javadsl.ActorSource;
|
import akka.stream.typed.javadsl.ActorSource;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
// #actor-source-ref
|
// #actor-source-ref
|
||||||
|
|
||||||
public class ActorSourceExample {
|
public class ActorSourceExample {
|
||||||
|
|
@ -44,20 +46,12 @@ public class ActorSourceExample {
|
||||||
{
|
{
|
||||||
// #actor-source-ref
|
// #actor-source-ref
|
||||||
|
|
||||||
final JavaPartialFunction<Protocol, Throwable> failureMatcher =
|
|
||||||
new JavaPartialFunction<Protocol, Throwable>() {
|
|
||||||
public Throwable apply(Protocol p, boolean isCheck) {
|
|
||||||
if (p instanceof Fail) {
|
|
||||||
return ((Fail) p).ex;
|
|
||||||
} else {
|
|
||||||
throw noMatch();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
final Source<Protocol, ActorRef<Protocol>> source =
|
final Source<Protocol, ActorRef<Protocol>> source =
|
||||||
ActorSource.actorRef(
|
ActorSource.actorRef(
|
||||||
(m) -> m instanceof Complete, failureMatcher, 8, OverflowStrategy.fail());
|
(m) -> m instanceof Complete,
|
||||||
|
(m) -> (m instanceof Fail) ? Optional.of(((Fail) m).ex) : Optional.empty(),
|
||||||
|
8,
|
||||||
|
OverflowStrategy.fail());
|
||||||
|
|
||||||
final ActorRef<Protocol> ref =
|
final ActorRef<Protocol> ref =
|
||||||
source
|
source
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue